001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.BufferedReader;
020    import java.io.IOException;
021    import java.io.InputStream;
022    import java.io.InputStreamReader;
023    import java.io.OutputStreamWriter;
024    import java.io.PrintWriter;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.Map;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    
031    import javax.jms.JMSException;
032    
033    import org.apache.activemq.broker.BrokerContext;
034    import org.apache.activemq.broker.BrokerContextAware;
035    import org.apache.activemq.command.ActiveMQDestination;
036    import org.apache.activemq.command.ActiveMQMessage;
037    import org.apache.activemq.command.ActiveMQTempQueue;
038    import org.apache.activemq.command.ActiveMQTempTopic;
039    import org.apache.activemq.command.Command;
040    import org.apache.activemq.command.CommandTypes;
041    import org.apache.activemq.command.ConnectionError;
042    import org.apache.activemq.command.ConnectionId;
043    import org.apache.activemq.command.ConnectionInfo;
044    import org.apache.activemq.command.ConsumerControl;
045    import org.apache.activemq.command.ConsumerId;
046    import org.apache.activemq.command.ConsumerInfo;
047    import org.apache.activemq.command.DestinationInfo;
048    import org.apache.activemq.command.ExceptionResponse;
049    import org.apache.activemq.command.LocalTransactionId;
050    import org.apache.activemq.command.MessageAck;
051    import org.apache.activemq.command.MessageDispatch;
052    import org.apache.activemq.command.MessageId;
053    import org.apache.activemq.command.ProducerId;
054    import org.apache.activemq.command.ProducerInfo;
055    import org.apache.activemq.command.RemoveSubscriptionInfo;
056    import org.apache.activemq.command.Response;
057    import org.apache.activemq.command.SessionId;
058    import org.apache.activemq.command.SessionInfo;
059    import org.apache.activemq.command.ShutdownInfo;
060    import org.apache.activemq.command.TransactionId;
061    import org.apache.activemq.command.TransactionInfo;
062    import org.apache.activemq.util.ByteArrayOutputStream;
063    import org.apache.activemq.util.FactoryFinder;
064    import org.apache.activemq.util.IOExceptionSupport;
065    import org.apache.activemq.util.IdGenerator;
066    import org.apache.activemq.util.IntrospectionSupport;
067    import org.apache.activemq.util.LongSequenceGenerator;
068    import org.slf4j.Logger;
069    import org.slf4j.LoggerFactory;
070    
071    /**
072     * @author <a href="http://hiramchirino.com">chirino</a>
073     */
074    public class ProtocolConverter {
075    
076        private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
077    
078        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
079    
080        private static final String BROKER_VERSION;
081        private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
082    
083        static {
084            InputStream in = null;
085            String version = "5.6.0";
086            if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
087                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
088                try {
089                    version = reader.readLine();
090                } catch(Exception e) {
091                }
092            }
093            BROKER_VERSION = version;
094        }
095    
096        private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
097        private final SessionId sessionId = new SessionId(connectionId, -1);
098        private final ProducerId producerId = new ProducerId(sessionId, 1);
099    
100        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
101        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
102        private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
103        private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
104    
105        private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
106        private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
107        private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
108        private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
109        private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
110        private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
111        private final StompTransport stompTransport;
112    
113        private final Object commnadIdMutex = new Object();
114        private int lastCommandId;
115        private final AtomicBoolean connected = new AtomicBoolean(false);
116        private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
117        private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
118        private final BrokerContext brokerContext;
119        private String version = "1.0";
120        private long hbReadInterval;
121        private long hbWriteInterval;
122        private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
123    
124        public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
125            this.stompTransport = stompTransport;
126            this.brokerContext = brokerContext;
127        }
128    
129        protected int generateCommandId() {
130            synchronized (commnadIdMutex) {
131                return lastCommandId++;
132            }
133        }
134    
135        protected ResponseHandler createResponseHandler(final StompFrame command) {
136            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
137            if (receiptId != null) {
138                return new ResponseHandler() {
139                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
140                        if (response.isException()) {
141                            // Generally a command can fail.. but that does not invalidate the connection.
142                            // We report back the failure but we don't close the connection.
143                            Throwable exception = ((ExceptionResponse)response).getException();
144                            handleException(exception, command);
145                        } else {
146                            StompFrame sc = new StompFrame();
147                            sc.setAction(Stomp.Responses.RECEIPT);
148                            sc.setHeaders(new HashMap<String, String>(1));
149                            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
150                            stompTransport.sendToStomp(sc);
151                        }
152                    }
153                };
154            }
155            return null;
156        }
157    
158        protected void sendToActiveMQ(Command command, ResponseHandler handler) {
159            command.setCommandId(generateCommandId());
160            if (handler != null) {
161                command.setResponseRequired(true);
162                resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
163            }
164            stompTransport.sendToActiveMQ(command);
165        }
166    
167        protected void sendToStomp(StompFrame command) throws IOException {
168            stompTransport.sendToStomp(command);
169        }
170    
171        protected FrameTranslator findTranslator(String header) {
172            FrameTranslator translator = frameTranslator;
173            try {
174                if (header != null) {
175                    translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
176                            .newInstance(header);
177                    if (translator instanceof BrokerContextAware) {
178                        ((BrokerContextAware)translator).setBrokerContext(brokerContext);
179                    }
180                }
181            } catch (Exception ignore) {
182                // if anything goes wrong use the default translator
183            }
184    
185            return translator;
186        }
187    
188        /**
189         * Convert a stomp command
190         *
191         * @param command
192         */
193        public void onStompCommand(StompFrame command) throws IOException, JMSException {
194            try {
195    
196                if (command.getClass() == StompFrameError.class) {
197                    throw ((StompFrameError)command).getException();
198                }
199    
200                String action = command.getAction();
201                if (action.startsWith(Stomp.Commands.SEND)) {
202                    onStompSend(command);
203                } else if (action.startsWith(Stomp.Commands.ACK)) {
204                    onStompAck(command);
205                } else if (action.startsWith(Stomp.Commands.NACK)) {
206                    onStompNack(command);
207                } else if (action.startsWith(Stomp.Commands.BEGIN)) {
208                    onStompBegin(command);
209                } else if (action.startsWith(Stomp.Commands.COMMIT)) {
210                    onStompCommit(command);
211                } else if (action.startsWith(Stomp.Commands.ABORT)) {
212                    onStompAbort(command);
213                } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
214                    onStompSubscribe(command);
215                } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
216                    onStompUnsubscribe(command);
217                } else if (action.startsWith(Stomp.Commands.CONNECT) ||
218                           action.startsWith(Stomp.Commands.STOMP)) {
219                    onStompConnect(command);
220                } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
221                    onStompDisconnect(command);
222                } else {
223                    throw new ProtocolException("Unknown STOMP action: " + action);
224                }
225    
226            } catch (ProtocolException e) {
227                handleException(e, command);
228                // Some protocol errors can cause the connection to get closed.
229                if (e.isFatal()) {
230                   getStompTransport().onException(e);
231                }
232            }
233        }
234    
235        protected void handleException(Throwable exception, StompFrame command) throws IOException {
236            LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
237            if (LOG.isDebugEnabled()) {
238                LOG.debug("Exception detail", exception);
239            }
240    
241            // Let the stomp client know about any protocol errors.
242            ByteArrayOutputStream baos = new ByteArrayOutputStream();
243            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
244            exception.printStackTrace(stream);
245            stream.close();
246    
247            HashMap<String, String> headers = new HashMap<String, String>();
248            headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
249            headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
250    
251            if (command != null) {
252                final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
253                if (receiptId != null) {
254                    headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
255                }
256            }
257    
258            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
259            sendToStomp(errorMessage);
260        }
261    
262        protected void onStompSend(StompFrame command) throws IOException, JMSException {
263            checkConnected();
264    
265            Map<String, String> headers = command.getHeaders();
266            String destination = headers.get(Stomp.Headers.Send.DESTINATION);
267            if (destination == null) {
268                throw new ProtocolException("SEND received without a Destination specified!");
269            }
270    
271            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
272            headers.remove("transaction");
273    
274            ActiveMQMessage message = convertMessage(command);
275    
276            message.setProducerId(producerId);
277            MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
278            message.setMessageId(id);
279            message.setJMSTimestamp(System.currentTimeMillis());
280    
281            if (stompTx != null) {
282                TransactionId activemqTx = transactions.get(stompTx);
283                if (activemqTx == null) {
284                    throw new ProtocolException("Invalid transaction id: " + stompTx);
285                }
286                message.setTransactionId(activemqTx);
287            }
288    
289            message.onSend();
290            sendToActiveMQ(message, createResponseHandler(command));
291        }
292    
293        protected void onStompNack(StompFrame command) throws ProtocolException {
294    
295            checkConnected();
296    
297            if (this.version.equals(Stomp.V1_0)) {
298                throw new ProtocolException("NACK received but connection is in v1.0 mode.");
299            }
300    
301            Map<String, String> headers = command.getHeaders();
302    
303            String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
304            if (subscriptionId == null) {
305                throw new ProtocolException("NACK received without a subscription id for acknowledge!");
306            }
307    
308            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
309            if (messageId == null) {
310                throw new ProtocolException("NACK received without a message-id to acknowledge!");
311            }
312    
313            TransactionId activemqTx = null;
314            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
315            if (stompTx != null) {
316                activemqTx = transactions.get(stompTx);
317                if (activemqTx == null) {
318                    throw new ProtocolException("Invalid transaction id: " + stompTx);
319                }
320            }
321    
322            if (subscriptionId != null) {
323                StompSubscription sub = this.subscriptions.get(subscriptionId);
324                if (sub != null) {
325                    MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
326                    if (ack != null) {
327                        sendToActiveMQ(ack, createResponseHandler(command));
328                    } else {
329                        throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
330                    }
331                }
332            }
333        }
334    
335        protected void onStompAck(StompFrame command) throws ProtocolException {
336            checkConnected();
337    
338            Map<String, String> headers = command.getHeaders();
339            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
340            if (messageId == null) {
341                throw new ProtocolException("ACK received without a message-id to acknowledge!");
342            }
343    
344            String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
345            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
346                throw new ProtocolException("ACK received without a subscription id for acknowledge!");
347            }
348    
349            TransactionId activemqTx = null;
350            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
351            if (stompTx != null) {
352                activemqTx = transactions.get(stompTx);
353                if (activemqTx == null) {
354                    throw new ProtocolException("Invalid transaction id: " + stompTx);
355                }
356            }
357    
358            boolean acked = false;
359    
360            if (subscriptionId != null) {
361    
362                StompSubscription sub = this.subscriptions.get(subscriptionId);
363                if (sub != null) {
364                    MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
365                    if (ack != null) {
366                        sendToActiveMQ(ack, createResponseHandler(command));
367                        acked = true;
368                    }
369                }
370    
371            } else {
372    
373                // TODO: acking with just a message id is very bogus since the same message id
374                // could have been sent to 2 different subscriptions on the same Stomp connection.
375                // For example, when 2 subs are created on the same topic.
376    
377                for (StompSubscription sub : subscriptionsByConsumerId.values()) {
378                    MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
379                    if (ack != null) {
380                        sendToActiveMQ(ack, createResponseHandler(command));
381                        acked = true;
382                        break;
383                    }
384                }
385            }
386    
387            if (!acked) {
388                throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
389            }
390        }
391    
392        protected void onStompBegin(StompFrame command) throws ProtocolException {
393            checkConnected();
394    
395            Map<String, String> headers = command.getHeaders();
396    
397            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
398    
399            if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
400                throw new ProtocolException("Must specify the transaction you are beginning");
401            }
402    
403            if (transactions.get(stompTx) != null) {
404                throw new ProtocolException("The transaction was allready started: " + stompTx);
405            }
406    
407            LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
408            transactions.put(stompTx, activemqTx);
409    
410            TransactionInfo tx = new TransactionInfo();
411            tx.setConnectionId(connectionId);
412            tx.setTransactionId(activemqTx);
413            tx.setType(TransactionInfo.BEGIN);
414    
415            sendToActiveMQ(tx, createResponseHandler(command));
416        }
417    
418        protected void onStompCommit(StompFrame command) throws ProtocolException {
419            checkConnected();
420    
421            Map<String, String> headers = command.getHeaders();
422    
423            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
424            if (stompTx == null) {
425                throw new ProtocolException("Must specify the transaction you are committing");
426            }
427    
428            TransactionId activemqTx = transactions.remove(stompTx);
429            if (activemqTx == null) {
430                throw new ProtocolException("Invalid transaction id: " + stompTx);
431            }
432    
433            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
434                sub.onStompCommit(activemqTx);
435            }
436    
437            TransactionInfo tx = new TransactionInfo();
438            tx.setConnectionId(connectionId);
439            tx.setTransactionId(activemqTx);
440            tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
441    
442            sendToActiveMQ(tx, createResponseHandler(command));
443        }
444    
445        protected void onStompAbort(StompFrame command) throws ProtocolException {
446            checkConnected();
447            Map<String, String> headers = command.getHeaders();
448    
449            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
450            if (stompTx == null) {
451                throw new ProtocolException("Must specify the transaction you are committing");
452            }
453    
454            TransactionId activemqTx = transactions.remove(stompTx);
455            if (activemqTx == null) {
456                throw new ProtocolException("Invalid transaction id: " + stompTx);
457            }
458            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
459                try {
460                    sub.onStompAbort(activemqTx);
461                } catch (Exception e) {
462                    throw new ProtocolException("Transaction abort failed", false, e);
463                }
464            }
465    
466            TransactionInfo tx = new TransactionInfo();
467            tx.setConnectionId(connectionId);
468            tx.setTransactionId(activemqTx);
469            tx.setType(TransactionInfo.ROLLBACK);
470    
471            sendToActiveMQ(tx, createResponseHandler(command));
472        }
473    
474        protected void onStompSubscribe(StompFrame command) throws ProtocolException {
475            checkConnected();
476            FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
477            Map<String, String> headers = command.getHeaders();
478    
479            String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
480            String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
481    
482            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
483                throw new ProtocolException("SUBSCRIBE received without a subscription id!");
484            }
485    
486            final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
487    
488            if (actualDest == null) {
489                throw new ProtocolException("Invalid Destination.");
490            }
491    
492            final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
493            ConsumerInfo consumerInfo = new ConsumerInfo(id);
494            consumerInfo.setPrefetchSize(1000);
495            consumerInfo.setDispatchAsync(true);
496    
497            String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
498            if (browser != null && browser.equals(Stomp.TRUE)) {
499    
500                if (!this.version.equals(Stomp.V1_1)) {
501                    throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
502                }
503    
504                consumerInfo.setBrowser(true);
505            }
506    
507            String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
508            consumerInfo.setSelector(selector);
509    
510            IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
511    
512            if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
513                throw new ProtocolException("Invliad Subscription: cannot durably subscribe to a Queue destination!");
514            }
515    
516            consumerInfo.setDestination(translator.convertDestination(this, destination, true));
517    
518            StompSubscription stompSubscription;
519            if (!consumerInfo.isBrowser()) {
520                stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
521            } else {
522                stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
523            }
524            stompSubscription.setDestination(actualDest);
525    
526            String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
527            if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
528                stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
529            } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
530                stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
531            } else {
532                stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
533            }
534    
535            subscriptionsByConsumerId.put(id, stompSubscription);
536            // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
537            if (subscriptionId != null) {
538                subscriptions.put(subscriptionId, stompSubscription);
539            }
540    
541            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
542            if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
543    
544                final StompFrame cmd = command;
545                final int prefetch = consumerInfo.getPrefetchSize();
546    
547                // Since dispatch could beat the receipt we set prefetch to zero to start and then
548                // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
549                // an error message.
550                consumerInfo.setPrefetchSize(0);
551    
552                final ResponseHandler handler = new ResponseHandler() {
553                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
554                        if (response.isException()) {
555                            // Generally a command can fail.. but that does not invalidate the connection.
556                            // We report back the failure but we don't close the connection.
557                            Throwable exception = ((ExceptionResponse)response).getException();
558                            handleException(exception, cmd);
559                        } else {
560                            StompFrame sc = new StompFrame();
561                            sc.setAction(Stomp.Responses.RECEIPT);
562                            sc.setHeaders(new HashMap<String, String>(1));
563                            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
564                            stompTransport.sendToStomp(sc);
565    
566                            ConsumerControl control = new ConsumerControl();
567                            control.setPrefetch(prefetch);
568                            control.setDestination(actualDest);
569                            control.setConsumerId(id);
570    
571                            sendToActiveMQ(control, null);
572                        }
573                    }
574                };
575    
576                sendToActiveMQ(consumerInfo, handler);
577            } else {
578                sendToActiveMQ(consumerInfo, createResponseHandler(command));
579            }
580        }
581    
582        protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
583            checkConnected();
584            Map<String, String> headers = command.getHeaders();
585    
586            ActiveMQDestination destination = null;
587            Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
588            if (o != null) {
589                destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
590            }
591    
592            String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
593            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
594                throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
595            }
596    
597            if (subscriptionId == null && destination == null) {
598                throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
599            }
600    
601            // check if it is a durable subscription
602            String durable = command.getHeaders().get("activemq.subscriptionName");
603            String clientId = durable;
604            if (this.version.equals(Stomp.V1_1)) {
605                clientId = connectionInfo.getClientId();
606            }
607            if (durable != null) {
608                RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
609                info.setClientId(clientId);
610                info.setSubscriptionName(durable);
611                info.setConnectionId(connectionId);
612                sendToActiveMQ(info, createResponseHandler(command));
613                return;
614            }
615    
616            if (subscriptionId != null) {
617    
618                StompSubscription sub = this.subscriptions.remove(subscriptionId);
619                if (sub != null) {
620                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
621                    return;
622                }
623    
624            } else {
625    
626                // Unsubscribing using a destination is a bit weird if multiple subscriptions
627                // are created with the same destination.
628                for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
629                    StompSubscription sub = iter.next();
630                    if (destination != null && destination.equals(sub.getDestination())) {
631                        sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
632                        iter.remove();
633                        return;
634                    }
635                }
636            }
637    
638            throw new ProtocolException("No subscription matched.");
639        }
640    
641        ConnectionInfo connectionInfo = new ConnectionInfo();
642    
643        protected void onStompConnect(final StompFrame command) throws ProtocolException {
644    
645            if (connected.get()) {
646                throw new ProtocolException("Allready connected.");
647            }
648    
649            final Map<String, String> headers = command.getHeaders();
650    
651            // allow anyone to login for now
652            String login = headers.get(Stomp.Headers.Connect.LOGIN);
653            String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
654            String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
655            String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
656    
657            if (heartBeat == null) {
658                heartBeat = defaultHeartBeat;
659            }
660    
661            this.version = StompCodec.detectVersion(headers);
662    
663            configureInactivityMonitor(heartBeat.trim());
664    
665            IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
666            connectionInfo.setConnectionId(connectionId);
667            if (clientId != null) {
668                connectionInfo.setClientId(clientId);
669            } else {
670                connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
671            }
672    
673            connectionInfo.setResponseRequired(true);
674            connectionInfo.setUserName(login);
675            connectionInfo.setPassword(passcode);
676            connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
677    
678            sendToActiveMQ(connectionInfo, new ResponseHandler() {
679                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
680    
681                    if (response.isException()) {
682                        // If the connection attempt fails we close the socket.
683                        Throwable exception = ((ExceptionResponse)response).getException();
684                        handleException(exception, command);
685                        getStompTransport().onException(IOExceptionSupport.create(exception));
686                        return;
687                    }
688    
689                    final SessionInfo sessionInfo = new SessionInfo(sessionId);
690                    sendToActiveMQ(sessionInfo, null);
691    
692                    final ProducerInfo producerInfo = new ProducerInfo(producerId);
693                    sendToActiveMQ(producerInfo, new ResponseHandler() {
694                        public void onResponse(ProtocolConverter converter, Response response) throws IOException {
695    
696                            if (response.isException()) {
697                                // If the connection attempt fails we close the socket.
698                                Throwable exception = ((ExceptionResponse)response).getException();
699                                handleException(exception, command);
700                                getStompTransport().onException(IOExceptionSupport.create(exception));
701                            }
702    
703                            connected.set(true);
704                            HashMap<String, String> responseHeaders = new HashMap<String, String>();
705    
706                            responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
707                            String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
708                            if (requestId == null) {
709                                // TODO legacy
710                                requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
711                            }
712                            if (requestId != null) {
713                                // TODO legacy
714                                responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
715                                responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
716                            }
717    
718                            responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
719                            responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
720                                                String.format("%d,%d", hbWriteInterval, hbReadInterval));
721                            responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
722    
723                            StompFrame sc = new StompFrame();
724                            sc.setAction(Stomp.Responses.CONNECTED);
725                            sc.setHeaders(responseHeaders);
726                            sendToStomp(sc);
727    
728                            StompWireFormat format = stompTransport.getWireFormat();
729                            if (format != null) {
730                                format.setStompVersion(version);
731                            }
732                        }
733                    });
734    
735                }
736            });
737        }
738    
739        protected void onStompDisconnect(StompFrame command) throws ProtocolException {
740            checkConnected();
741            sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
742            sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
743            connected.set(false);
744        }
745    
746        protected void checkConnected() throws ProtocolException {
747            if (!connected.get()) {
748                throw new ProtocolException("Not connected.");
749            }
750        }
751    
752        /**
753         * Dispatch a ActiveMQ command
754         *
755         * @param command
756         * @throws IOException
757         */
758        public void onActiveMQCommand(Command command) throws IOException, JMSException {
759            if (command.isResponse()) {
760                Response response = (Response)command;
761                ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
762                if (rh != null) {
763                    rh.onResponse(this, response);
764                } else {
765                    // Pass down any unexpected errors. Should this close the connection?
766                    if (response.isException()) {
767                        Throwable exception = ((ExceptionResponse)response).getException();
768                        handleException(exception, null);
769                    }
770                }
771            } else if (command.isMessageDispatch()) {
772                MessageDispatch md = (MessageDispatch)command;
773                StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
774                if (sub != null) {
775                    sub.onMessageDispatch(md);
776                }
777            } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
778                stompTransport.sendToStomp(ping);
779            } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
780                // Pass down any unexpected async errors. Should this close the connection?
781                Throwable exception = ((ConnectionError)command).getException();
782                handleException(exception, null);
783            }
784        }
785    
786        public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
787            ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
788            return msg;
789        }
790    
791        public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
792            if (ignoreTransformation == true) {
793                return frameTranslator.convertMessage(this, message);
794            } else {
795                return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
796            }
797        }
798    
799        public StompTransport getStompTransport() {
800            return stompTransport;
801        }
802    
803        public ActiveMQDestination createTempDestination(String name, boolean topic) {
804            ActiveMQDestination rc = tempDestinations.get(name);
805            if( rc == null ) {
806                if (topic) {
807                    rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
808                } else {
809                    rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
810                }
811                sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
812                tempDestinations.put(name, rc);
813                tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
814            }
815            return rc;
816        }
817    
818        public String getCreatedTempDestinationName(ActiveMQDestination destination) {
819            return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
820        }
821    
822        public String getDefaultHeartBeat() {
823            return defaultHeartBeat;
824        }
825    
826        public void setDefaultHeartBeat(String defaultHeartBeat) {
827            this.defaultHeartBeat = defaultHeartBeat;
828        }
829    
830        protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
831    
832            String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
833    
834            if (keepAliveOpts == null || keepAliveOpts.length != 2) {
835                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
836            } else {
837    
838                try {
839                    hbReadInterval = Long.parseLong(keepAliveOpts[0]);
840                    hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
841                } catch(NumberFormatException e) {
842                    throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
843                }
844    
845                try {
846    
847                    StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
848    
849                    monitor.setReadCheckTime(hbReadInterval);
850                    monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
851                    monitor.setWriteCheckTime(hbWriteInterval);
852    
853                    monitor.startMonitoring();
854    
855                } catch(Exception ex) {
856                    hbReadInterval = 0;
857                    hbWriteInterval = 0;
858                }
859    
860                if (LOG.isDebugEnabled()) {
861                    LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
862                }
863            }
864        }
865    
866        protected void sendReceipt(StompFrame command) {
867            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
868            if (receiptId != null) {
869                StompFrame sc = new StompFrame();
870                sc.setAction(Stomp.Responses.RECEIPT);
871                sc.setHeaders(new HashMap<String, String>(1));
872                sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
873                try {
874                    sendToStomp(sc);
875                } catch (IOException e) {
876                    LOG.warn("Could not send a receipt for " + command, e);
877                }
878            }
879        }
880    }