001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.xmpp;
018    
019    import java.io.IOException;
020    import java.io.PrintWriter;
021    import java.io.StringWriter;
022    import java.util.HashMap;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.FutureTask;
028    import java.util.concurrent.ScheduledThreadPoolExecutor;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    
031    import javax.jms.JMSException;
032    import org.w3c.dom.Element;
033    
034    import ietf.params.xml.ns.xmpp_sasl.Auth;
035    import ietf.params.xml.ns.xmpp_sasl.Challenge;
036    import ietf.params.xml.ns.xmpp_sasl.Success;
037    import ietf.params.xml.ns.xmpp_tls.Proceed;
038    import ietf.params.xml.ns.xmpp_tls.Starttls;
039    import jabber.client.Body;
040    import jabber.client.Error;
041    import jabber.client.Iq;
042    import jabber.client.Message;
043    import jabber.client.Presence;
044    import jabber.iq.auth.Query;
045    
046    import org.apache.activemq.advisory.AdvisorySupport;
047    import org.apache.activemq.command.ActiveMQDestination;
048    import org.apache.activemq.command.ActiveMQMessage;
049    import org.apache.activemq.command.ActiveMQTempQueue;
050    import org.apache.activemq.command.ActiveMQTextMessage;
051    import org.apache.activemq.command.ActiveMQTopic;
052    import org.apache.activemq.command.Command;
053    import org.apache.activemq.command.ConnectionId;
054    import org.apache.activemq.command.ConnectionInfo;
055    import org.apache.activemq.command.ConsumerId;
056    import org.apache.activemq.command.ConsumerInfo;
057    import org.apache.activemq.command.DestinationInfo;
058    import org.apache.activemq.command.ExceptionResponse;
059    import org.apache.activemq.command.MessageAck;
060    import org.apache.activemq.command.MessageDispatch;
061    import org.apache.activemq.command.MessageId;
062    import org.apache.activemq.command.ProducerId;
063    import org.apache.activemq.command.ProducerInfo;
064    import org.apache.activemq.command.Response;
065    import org.apache.activemq.command.SessionId;
066    import org.apache.activemq.command.SessionInfo;
067    import org.apache.activemq.transport.xmpp.command.Handler;
068    import org.apache.activemq.transport.xmpp.command.HandlerRegistry;
069    import org.apache.activemq.util.IdGenerator;
070    import org.apache.activemq.util.IntSequenceGenerator;
071    import org.apache.activemq.util.LongSequenceGenerator;
072    import org.slf4j.Logger;
073    import org.slf4j.LoggerFactory;
074    import org.jabber.protocol.disco_info.Feature;
075    import org.jabber.protocol.disco_info.Identity;
076    import org.jabber.protocol.disco_items.Item;
077    import org.jabber.protocol.muc_user.X;
078    
079    /**
080     * TODO lots of this code could be shared with Stomp
081     */
082    public class ProtocolConverter {
083        private static final transient Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
084        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
085        private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator("xmpp");
086    
087        private HandlerRegistry registry = new HandlerRegistry();
088        private XmppTransport transport;
089    
090        private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
091        private final SessionId sessionId = new SessionId(connectionId, -1);
092        private final ProducerId producerId = new ProducerId(sessionId, 1);
093    
094        private final ConnectionInfo connectionInfo = new ConnectionInfo(connectionId);
095        private final SessionInfo sessionInfo = new SessionInfo(sessionId);
096        private final ProducerInfo producerInfo = new ProducerInfo(producerId);
097    
098        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
099        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
100        private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator();
101    
102        private final Map<Integer, Handler<Response>> responseHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
103        private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
104        private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
105        private final Map<String, ConsumerInfo> jidToInboxConsumerMap = new HashMap<String, ConsumerInfo>();
106    
107        private final Object commnadIdMutex = new Object();
108        private int lastCommandId;
109        private final AtomicBoolean connected = new AtomicBoolean(false);
110        private ActiveMQTempQueue inboxDestination;
111    
112        //to avoid calling into sendToActiveMq from a handler
113        private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
114    
115        public ProtocolConverter(XmppTransport transport) {
116            this.transport = transport;
117            initialiseRegistry();
118        }
119    
120        protected int generateCommandId() {
121            synchronized (commnadIdMutex) {
122                return lastCommandId++;
123            }
124        }
125    
126        protected void initialiseRegistry() {
127            // this kinda wiring muck is soooo much cleaner in C# :(
128            registry.registerHandler(Message.class, new Handler<Message>() {
129                public void handle(Message event) throws Exception {
130                    onMessage(event);
131                }
132            });
133            registry.registerHandler(Auth.class, new Handler<Auth>() {
134                public void handle(Auth event) throws Exception {
135                    onAuth(event);
136                }
137            });
138            registry.registerHandler(Starttls.class, new Handler<Starttls>() {
139                public void handle(Starttls event) throws Exception {
140                    onStarttls(event);
141                }
142            });
143            registry.registerHandler(Iq.class, new Handler<Iq>() {
144                public void handle(Iq event) throws Exception {
145                    onIq(event);
146                }
147            });
148            registry.registerHandler(Presence.class, new Handler<Presence>() {
149                public void handle(Presence event) throws Exception {
150                    onPresence(event);
151                }
152            });
153        }
154    
155        public void onXmppCommand(Object command) throws Exception {
156            // TODO we could do some nice code generation to boost performance
157            // by autogenerating the bytecode to statically lookup a handler from a
158            // registry maybe?
159    
160            Handler handler = registry.getHandler(command.getClass());
161            if (handler == null) {
162                unknownCommand(command);
163            } else {
164                handler.handle(command);
165            }
166        }
167    
168        public void onActiveMQCommand(Command command) throws Exception {
169            if (command.isResponse()) {
170                Response response = (Response)command;
171                Handler<Response> handler = responseHandlers.remove(new Integer(response.getCorrelationId()));
172                if (handler != null) {
173                    handler.handle(response);
174                } else {
175                    LOG.warn("No handler for response: " + response);
176                }
177            } else if (command.isMessageDispatch()) {
178                MessageDispatch md = (MessageDispatch)command;
179                Handler<MessageDispatch> handler = subscriptionsByConsumerId.get(md.getConsumerId());
180                if (handler != null) {
181                    handler.handle(md);
182                } else {
183                    LOG.warn("No handler for message: " + md);
184                }
185            }
186        }
187    
188        protected void unknownCommand(Object command) throws Exception {
189            LOG.warn("Unkown command: " + command + " of type: " + command.getClass().getName());
190        }
191    
192        protected void onIq(final Iq iq) throws Exception {
193            Object any = iq.getAny();
194    
195            if (any instanceof Query) {
196                onAuthQuery(any, iq);
197    
198            } else if (any instanceof jabber.iq._private.Query) {
199                jabber.iq._private.Query query = (jabber.iq._private.Query)any;
200    
201                if (LOG.isDebugEnabled()) {
202                    LOG.debug("Iq Private " + debugString(iq) + " any: " + query.getAny());
203                }
204    
205                Iq result = createResult(iq);
206                jabber.iq._private.Query answer = new jabber.iq._private.Query();
207                result.setAny(answer);
208                transport.marshall(result);
209            } else if (any instanceof jabber.iq.roster.Query) {
210                jabber.iq.roster.Query query = (jabber.iq.roster.Query)any;
211    
212                if (LOG.isDebugEnabled()) {
213                    LOG.debug("Iq Roster " + debugString(iq) + " item: " + query.getItem());
214                }
215    
216                Iq result = createResult(iq);
217                jabber.iq.roster.Query roster = new jabber.iq.roster.Query();
218                result.setAny(roster);
219                transport.marshall(result);
220            } else if (any instanceof org.jabber.protocol.disco_items.Query) {
221                onDiscoItems(iq, (org.jabber.protocol.disco_items.Query)any);
222            } else if (any instanceof org.jabber.protocol.disco_info.Query) {
223                onDiscoInfo(iq, (org.jabber.protocol.disco_info.Query)any);
224            } else {
225                if (any instanceof Element) {
226                    Element element = (Element)any;
227                    LOG.warn("Iq Unknown " + debugString(iq) + " element namespace: " + element.getNamespaceURI() + " localName: " + element.getLocalName());
228                } else {
229                    LOG.warn("Iq Unknown " + debugString(iq) + " any: " + any + " of type: " + any.getClass().getName());
230                }
231                Iq result = createResult(iq);
232                jabber.client.Error error = new Error();
233                error.setUnexpectedRequest("Don't understand: " + any.toString());
234                result.setAny(error);
235                transport.marshall(result);
236            }
237        }
238    
239        protected void onAuthQuery(Object any, final Iq iq) throws IOException, JMSException {
240            Query query = (Query)any;
241            if (LOG.isDebugEnabled()) {
242                LOG.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
243            }
244            if (query.getPassword() == null) {
245                Iq result = createResult(iq);
246                Query required = new Query();
247                required.setPassword("");
248                required.setUsername("");
249                result.setAny(required);
250                transport.marshall(result);
251                return;
252            }
253    
254            // connectionInfo.setClientId(query.getResource());
255            connectionInfo.setUserName(query.getUsername());
256            connectionInfo.setPassword(query.getPassword());
257    
258            // TODO support digest?
259    
260            if (connectionInfo.getClientId() == null) {
261                connectionInfo.setClientId(CLIENT_ID_GENERATOR.generateId());
262            }
263    
264            sendToActiveMQ(connectionInfo, new Handler<Response>() {
265                public void handle(Response response) throws Exception {
266    
267                    Iq result = createResult(iq);
268    
269                    if (response instanceof ExceptionResponse) {
270                        ExceptionResponse exceptionResponse = (ExceptionResponse)response;
271                        Throwable exception = exceptionResponse.getException();
272    
273                        LOG.warn("Failed to create connection: " + exception, exception);
274    
275                        Error error = new Error();
276                        result.setError(error);
277    
278                        StringWriter buffer = new StringWriter();
279                        exception.printStackTrace(new PrintWriter(buffer));
280                        error.setInternalServerError(buffer.toString());
281                    } else {
282                        connected.set(true);
283                    }
284                    transport.marshall(result);
285    
286                    sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion"));
287                    sendToActiveMQ(producerInfo, createErrorHandler("create producer"));
288                }
289            });
290    
291            // create a destination for this client
292            final String to = query.getUsername();
293            createDestination(to);
294        }
295    
296        public void createDestination(String to) throws IOException, JMSException {
297            ActiveMQDestination destination = createActiveMQDestination(to);
298            if (destination == null) {
299                LOG.debug("Unable to create destination for " + to);
300                return;
301            }
302            subscribe(to, destination, jidToConsumerMap);
303    
304            // lets subscribe to a personal inbox for replies
305    
306            // Check if Destination info is of temporary type.
307            if (inboxDestination == null) {
308                inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
309    
310                DestinationInfo info = new DestinationInfo();
311                info.setConnectionId(connectionInfo.getConnectionId());
312                info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
313                info.setDestination(inboxDestination);
314                sendToActiveMQ(info, null);
315    
316                subscribe(to, inboxDestination, jidToInboxConsumerMap);
317            }
318        }
319    
320        protected String debugString(Iq iq) {
321            return "to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
322        }
323    
324        protected void onDiscoItems(Iq iq, org.jabber.protocol.disco_items.Query query) throws IOException {
325            String to = iq.getTo();
326    
327            if (LOG.isDebugEnabled()) {
328                LOG.debug("Iq Disco Items query " + debugString(iq) + " node: " + query.getNode() + " item: " + query.getItem());
329            }
330    
331            Iq result = createResult(iq);
332            org.jabber.protocol.disco_items.Query answer = new org.jabber.protocol.disco_items.Query();
333            if (to == null || to.length() == 0) {
334                answer.getItem().add(createItem("queues", "Queues", "queues"));
335                answer.getItem().add(createItem("topics", "Topics", "topics"));
336            } else {
337                // lets not add anything?
338            }
339    
340            result.setAny(answer);
341            transport.marshall(result);
342        }
343    
344        protected void onDiscoInfo(Iq iq, org.jabber.protocol.disco_info.Query query) throws IOException {
345            String to = iq.getTo();
346    
347            // TODO lets create the topic 'to'
348    
349            if (LOG.isDebugEnabled()) {
350                LOG.debug("Iq Disco Info query " + debugString(iq) + " node: " + query.getNode() + " features: " + query.getFeature() + " identity: " + query.getIdentity());
351            }
352    
353            Iq result = createResult(iq);
354            org.jabber.protocol.disco_info.Query answer = new org.jabber.protocol.disco_info.Query();
355            answer.setNode(to);
356            answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#info"));
357            answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#items"));
358            if (to == null || to.length() == 0) {
359                answer.getIdentity().add(createIdentity("directory", "chatroom", "queues"));
360                answer.getIdentity().add(createIdentity("directory", "chatroom", "topics"));
361                /*
362                 * answer.getIdentity().add(createIdentity("hierarchy", "queues",
363                 * "branch")); answer.getIdentity().add(createIdentity("hierarchy",
364                 * "topics", "branch"));
365                 */
366            } else {
367                // for queues/topics
368                if (to.equals("queues")) {
369                    answer.getIdentity().add(createIdentity("conference", "queue.a", "text"));
370                    answer.getIdentity().add(createIdentity("conference", "queue.b", "text"));
371                } else if (to.equals("topics")) {
372                    answer.getIdentity().add(createIdentity("conference", "topic.x", "text"));
373                    answer.getIdentity().add(createIdentity("conference", "topic.y", "text"));
374                    answer.getIdentity().add(createIdentity("conference", "topic.z", "text"));
375                } else {
376                    // lets reply to an actual room
377                    answer.getIdentity().add(createIdentity("conference", to, "text"));
378                    answer.getFeature().add(createFeature("http://jabber.org/protocol/muc"));
379                    answer.getFeature().add(createFeature("muc-open"));
380                }
381            }
382    
383            result.setAny(answer);
384            transport.marshall(result);
385        }
386    
387        protected void onPresence(Presence presence) throws IOException, JMSException {
388            if (LOG.isDebugEnabled()) {
389                LOG.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType() + " showOrStatusOrPriority: "
390                          + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny());
391            }
392            org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
393            item.setAffiliation("owner");
394            item.setRole("moderator");
395            item.setNick("broker");
396            sendPresence(presence, item);
397    
398            /*
399             * item = new org.jabber.protocol.muc_user.Item();
400             * item.setAffiliation("admin"); item.setRole("moderator");
401             * sendPresence(presence, item);
402             */
403    
404            // lets create a subscription for the room, Jabber clients would use
405            // "room/nickname", so we need to strip off the nickname
406            String to = presence.getTo();
407            if ( to != null ) {
408                to = to.substring(0, to.indexOf("/"));
409            }
410            createDestination(to);
411        }
412    
413        protected void subscribe(final String to, ActiveMQDestination destination, Map<String, ConsumerInfo> consumerMap) {
414            boolean createConsumer = false;
415            ConsumerInfo consumerInfo = null;
416            synchronized (consumerMap) {
417                consumerInfo = consumerMap.get(to);
418                if (consumerInfo == null) {
419                    consumerInfo = new ConsumerInfo();
420                    consumerMap.put(to, consumerInfo);
421    
422                    ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
423                    consumerInfo.setConsumerId(consumerId);
424                    consumerInfo.setPrefetchSize(10);
425                    consumerInfo.setNoLocal(true);
426                    createConsumer = true;
427                }
428            }
429            if (!createConsumer) {
430                return;
431            }
432    
433            consumerInfo.setDestination(destination);
434    
435            subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
436                public void handle(final MessageDispatch messageDispatch) throws Exception {
437                    // processing the inbound message
438                    if (LOG.isDebugEnabled()) {
439                        LOG.debug("Receiving inbound: " + messageDispatch.getMessage());
440                    }
441    
442                    // lets send back an ACK
443                    final MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
444    
445                    FutureTask<Void> task = new FutureTask<Void>(new Callable<Void>() {
446                        public Void call() {
447                            sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
448                            return null;
449                        }
450                    });
451    
452                    scheduledThreadPoolExecutor.submit(task);
453    
454                    Message message = createXmppMessage(to, messageDispatch);
455                    if (message != null) {
456                        if (LOG.isDebugEnabled()) {
457                            LOG.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny());
458                        }
459                        transport.marshall(message);
460                    }
461                }
462            });
463            sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination));
464        }
465    
466        protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws IOException, JMSException {
467            
468            org.apache.activemq.command.Message message = messageDispatch.getMessage();
469    
470            Message answer = new Message();
471            String from = (String)message.getProperty("XMPPFrom");
472            if ( from == null ) {
473                from = to;
474                int idx = from.indexOf('/');
475                if (idx > 0) {
476                    from = from.substring(0, idx) + "/broker";
477                }
478                answer.setType("groupchat");
479            } else {
480                answer.setType("chat");
481            }
482            LOG.debug("Sending message from " + from + " and to " + to);
483            answer.setFrom(from);
484            answer.setTo(to);
485    
486            // answer.setType(message.getType());
487            if (message instanceof ActiveMQTextMessage) {
488                ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)message;
489                Body body = new Body();
490                String text = activeMQTextMessage.getText();
491                LOG.info("Setting the body text to be: " + text);
492                body.setValue(text);
493                answer.getAny().add(body);
494            } else {
495                // TODO support other message types
496                LOG.warn("Could not convert the message to a complete Jabber message: " + message);
497            }
498            return answer;
499        }
500    
501        protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException {
502            Presence answer = new Presence();
503            answer.setFrom(presence.getTo());
504            answer.setType(presence.getType());
505            answer.setTo(presence.getFrom());
506            X x = new X();
507            x.getDeclineOrDestroyOrInvite().add(item);
508            answer.getShowOrStatusOrPriority().add(x);
509            transport.marshall(answer);
510        }
511    
512        protected Item createItem(String jid, String name, String node) {
513            Item answer = new Item();
514            answer.setJid(jid);
515            answer.setName(name);
516            answer.setNode(node);
517            return answer;
518        }
519    
520        protected Identity createIdentity(String category, String type, String name) {
521            Identity answer = new Identity();
522            answer.setCategory(category);
523            answer.setName(name);
524            answer.setType(type);
525            return answer;
526        }
527    
528        protected Feature createFeature(String var) {
529            Feature feature = new Feature();
530            feature.setVar(var);
531            return feature;
532        }
533    
534        /**
535         * Creates a result command from the input
536         */
537        protected Iq createResult(Iq iq) {
538            Iq result = new Iq();
539            result.setId(iq.getId());
540            result.setFrom(transport.getFrom());
541            result.setTo(iq.getFrom());
542            result.setLang(iq.getLang());
543            result.setType("result");
544            return result;
545        }
546    
547        protected void sendToActiveMQ(Command command, Handler<Response> handler) {
548            command.setCommandId(generateCommandId());
549            if (handler != null) {
550                command.setResponseRequired(true);
551                responseHandlers.put(command.getCommandId(), handler);
552            }
553            transport.getTransportListener().onCommand(command);
554        }
555    
556        protected void onStarttls(Starttls starttls) throws Exception {
557            LOG.debug("Starttls");
558            transport.marshall(new Proceed());
559        }
560    
561        protected void onMessage(Message message) throws Exception {
562            if (LOG.isDebugEnabled()) {
563                LOG.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread());
564            }
565    
566            final ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
567    
568            ActiveMQDestination destination = createActiveMQDestination(message.getTo());
569    
570            activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId()));
571            activeMQMessage.setDestination(destination);
572            activeMQMessage.setProducerId(producerId);
573            activeMQMessage.setTimestamp(System.currentTimeMillis());
574            addActiveMQMessageHeaders(activeMQMessage, message);
575    
576            /*
577             * MessageDispatch dispatch = new MessageDispatch();
578             * dispatch.setDestination(destination);
579             * dispatch.setMessage(activeMQMessage);
580             */
581    
582            if (LOG.isDebugEnabled()) {
583                LOG.debug("Sending ActiveMQ message: " + activeMQMessage);
584            }
585            sendToActiveMQ(activeMQMessage, createErrorHandler("send message"));
586        }
587    
588        protected Handler<Response> createErrorHandler(final String text) {
589            return new Handler<Response>() {
590                public void handle(Response event) throws Exception {
591                    if (event instanceof ExceptionResponse) {
592                        ExceptionResponse exceptionResponse = (ExceptionResponse)event;
593                        Throwable exception = exceptionResponse.getException();
594                        LOG.error("Failed to " + text + ". Reason: " + exception, exception);
595                    } else if (LOG.isDebugEnabled()) {
596                        LOG.debug("Completed " + text);
597                    }
598                }
599            };
600        }
601    
602        /**
603         * Converts the Jabber destination name into a destination in ActiveMQ
604         */
605        protected ActiveMQDestination createActiveMQDestination(String jabberDestination) throws JMSException {
606            if (jabberDestination == null) {
607                return null;
608            }
609            String name = jabberDestination;
610            int idx = jabberDestination.indexOf('@');
611            if (idx > 0) {
612                name = name.substring(0, idx);
613            }
614            // lets support lower-case versions of the agent topic
615            if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) {
616                name = AdvisorySupport.AGENT_TOPIC;
617            }
618            return new ActiveMQTopic(name);
619        }
620    
621        protected ActiveMQMessage createActiveMQMessage(Message message) throws JMSException {
622            ActiveMQTextMessage answer = new ActiveMQTextMessage();
623            String text = "";
624            List<Object> list = message.getSubjectOrBodyOrThread();
625            for (Object object : list) {
626                if (object instanceof Body) {
627                    Body body = (Body)object;
628                    text = body.getValue();
629                    break;
630                }
631            }
632            answer.setText(text);
633            return answer;
634        }
635    
636        protected void addActiveMQMessageHeaders(ActiveMQMessage answer, Message message) throws JMSException {
637            answer.setStringProperty("XMPPFrom", message.getFrom());
638            answer.setStringProperty("XMPPID", message.getId());
639            answer.setStringProperty("XMPPLang", message.getLang());
640            answer.setStringProperty("XMPPTo", message.getTo());
641            answer.setJMSType(message.getType());
642            ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom());
643            if (replyTo == null) {
644                replyTo = inboxDestination;
645            }
646            LOG.info("Setting reply to destination to: " + replyTo);
647            answer.setJMSReplyTo(replyTo);
648        }
649    
650        protected void onAuth(Auth auth) throws Exception {
651            if (LOG.isDebugEnabled()) {
652                LOG.debug("Auth mechanism: " + auth.getMechanism() + " value: " + auth.getValue());
653            }
654            String value = createChallengeValue(auth);
655            if (value != null) {
656                Challenge challenge = new Challenge();
657                challenge.setValue(value);
658                transport.marshall(challenge);
659            } else {
660                transport.marshall(new Success());
661            }
662        }
663    
664        protected String createChallengeValue(Auth auth) {
665            // TODO implement the challenge
666            return null;
667        }
668    
669    }