001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.BufferedReader;
020    import java.io.IOException;
021    import java.io.InputStream;
022    import java.io.InputStreamReader;
023    import java.io.OutputStreamWriter;
024    import java.io.PrintWriter;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.Map;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    
031    import javax.jms.JMSException;
032    
033    import org.apache.activemq.ActiveMQPrefetchPolicy;
034    import org.apache.activemq.broker.BrokerContext;
035    import org.apache.activemq.broker.BrokerContextAware;
036    import org.apache.activemq.command.ActiveMQDestination;
037    import org.apache.activemq.command.ActiveMQMessage;
038    import org.apache.activemq.command.ActiveMQTempQueue;
039    import org.apache.activemq.command.ActiveMQTempTopic;
040    import org.apache.activemq.command.Command;
041    import org.apache.activemq.command.CommandTypes;
042    import org.apache.activemq.command.ConnectionError;
043    import org.apache.activemq.command.ConnectionId;
044    import org.apache.activemq.command.ConnectionInfo;
045    import org.apache.activemq.command.ConsumerControl;
046    import org.apache.activemq.command.ConsumerId;
047    import org.apache.activemq.command.ConsumerInfo;
048    import org.apache.activemq.command.DestinationInfo;
049    import org.apache.activemq.command.ExceptionResponse;
050    import org.apache.activemq.command.LocalTransactionId;
051    import org.apache.activemq.command.MessageAck;
052    import org.apache.activemq.command.MessageDispatch;
053    import org.apache.activemq.command.MessageId;
054    import org.apache.activemq.command.ProducerId;
055    import org.apache.activemq.command.ProducerInfo;
056    import org.apache.activemq.command.RemoveSubscriptionInfo;
057    import org.apache.activemq.command.Response;
058    import org.apache.activemq.command.SessionId;
059    import org.apache.activemq.command.SessionInfo;
060    import org.apache.activemq.command.ShutdownInfo;
061    import org.apache.activemq.command.TransactionId;
062    import org.apache.activemq.command.TransactionInfo;
063    import org.apache.activemq.util.ByteArrayOutputStream;
064    import org.apache.activemq.util.FactoryFinder;
065    import org.apache.activemq.util.IOExceptionSupport;
066    import org.apache.activemq.util.IdGenerator;
067    import org.apache.activemq.util.IntrospectionSupport;
068    import org.apache.activemq.util.LongSequenceGenerator;
069    import org.slf4j.Logger;
070    import org.slf4j.LoggerFactory;
071    
072    /**
073     * @author <a href="http://hiramchirino.com">chirino</a>
074     */
075    public class ProtocolConverter {
076    
077        private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
078    
079        private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
080    
081        private static final String BROKER_VERSION;
082        private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
083    
084        static {
085            InputStream in = null;
086            String version = "5.6.0";
087            if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
088                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
089                try {
090                    version = reader.readLine();
091                } catch(Exception e) {
092                }
093            }
094            BROKER_VERSION = version;
095        }
096    
097        private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
098        private final SessionId sessionId = new SessionId(connectionId, -1);
099        private final ProducerId producerId = new ProducerId(sessionId, 1);
100    
101        private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
102        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
103        private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
104        private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
105    
106        private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
107        private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
108        private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
109        private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
110        private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
111        private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
112        private final StompTransport stompTransport;
113    
114        private final ConcurrentHashMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
115        private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
116    
117        private final Object commnadIdMutex = new Object();
118        private int lastCommandId;
119        private final AtomicBoolean connected = new AtomicBoolean(false);
120        private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
121        private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
122        private final BrokerContext brokerContext;
123        private String version = "1.0";
124        private long hbReadInterval;
125        private long hbWriteInterval;
126        private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
127    
128        private static class AckEntry {
129    
130            private String messageId;
131            private StompSubscription subscription;
132    
133            public AckEntry(String messageId, StompSubscription subscription) {
134                this.messageId = messageId;
135                this.subscription = subscription;
136            }
137    
138            public MessageAck onMessageAck(TransactionId transactionId) {
139                return subscription.onStompMessageAck(messageId, transactionId);
140            }
141    
142            public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
143                return subscription.onStompMessageNack(messageId, transactionId);
144            }
145    
146            public String getMessageId() {
147                return this.messageId;
148            }
149    
150            public StompSubscription getSubscription() {
151                return this.subscription;
152            }
153        }
154    
155        public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
156            this.stompTransport = stompTransport;
157            this.brokerContext = brokerContext;
158        }
159    
160        protected int generateCommandId() {
161            synchronized (commnadIdMutex) {
162                return lastCommandId++;
163            }
164        }
165    
166        protected ResponseHandler createResponseHandler(final StompFrame command) {
167            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
168            if (receiptId != null) {
169                return new ResponseHandler() {
170                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
171                        if (response.isException()) {
172                            // Generally a command can fail.. but that does not invalidate the connection.
173                            // We report back the failure but we don't close the connection.
174                            Throwable exception = ((ExceptionResponse)response).getException();
175                            handleException(exception, command);
176                        } else {
177                            StompFrame sc = new StompFrame();
178                            sc.setAction(Stomp.Responses.RECEIPT);
179                            sc.setHeaders(new HashMap<String, String>(1));
180                            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
181                            stompTransport.sendToStomp(sc);
182                        }
183                    }
184                };
185            }
186            return null;
187        }
188    
189        protected void sendToActiveMQ(Command command, ResponseHandler handler) {
190            command.setCommandId(generateCommandId());
191            if (handler != null) {
192                command.setResponseRequired(true);
193                resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
194            }
195            stompTransport.sendToActiveMQ(command);
196        }
197    
198        protected void sendToStomp(StompFrame command) throws IOException {
199            stompTransport.sendToStomp(command);
200        }
201    
202        protected FrameTranslator findTranslator(String header) {
203            FrameTranslator translator = frameTranslator;
204            try {
205                if (header != null) {
206                    translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
207                            .newInstance(header);
208                    if (translator instanceof BrokerContextAware) {
209                        ((BrokerContextAware)translator).setBrokerContext(brokerContext);
210                    }
211                }
212            } catch (Exception ignore) {
213                // if anything goes wrong use the default translator
214            }
215    
216            return translator;
217        }
218    
219        /**
220         * Convert a stomp command
221         *
222         * @param command
223         */
224        public void onStompCommand(StompFrame command) throws IOException, JMSException {
225            try {
226    
227                if (command.getClass() == StompFrameError.class) {
228                    throw ((StompFrameError)command).getException();
229                }
230    
231                String action = command.getAction();
232                if (action.startsWith(Stomp.Commands.SEND)) {
233                    onStompSend(command);
234                } else if (action.startsWith(Stomp.Commands.ACK)) {
235                    onStompAck(command);
236                } else if (action.startsWith(Stomp.Commands.NACK)) {
237                    onStompNack(command);
238                } else if (action.startsWith(Stomp.Commands.BEGIN)) {
239                    onStompBegin(command);
240                } else if (action.startsWith(Stomp.Commands.COMMIT)) {
241                    onStompCommit(command);
242                } else if (action.startsWith(Stomp.Commands.ABORT)) {
243                    onStompAbort(command);
244                } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
245                    onStompSubscribe(command);
246                } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
247                    onStompUnsubscribe(command);
248                } else if (action.startsWith(Stomp.Commands.CONNECT) ||
249                           action.startsWith(Stomp.Commands.STOMP)) {
250                    onStompConnect(command);
251                } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
252                    onStompDisconnect(command);
253                } else {
254                    throw new ProtocolException("Unknown STOMP action: " + action);
255                }
256    
257            } catch (ProtocolException e) {
258                handleException(e, command);
259                // Some protocol errors can cause the connection to get closed.
260                if (e.isFatal()) {
261                   getStompTransport().onException(e);
262                }
263            }
264        }
265    
266        protected void handleException(Throwable exception, StompFrame command) throws IOException {
267            LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
268            if (LOG.isDebugEnabled()) {
269                LOG.debug("Exception detail", exception);
270            }
271    
272            // Let the stomp client know about any protocol errors.
273            ByteArrayOutputStream baos = new ByteArrayOutputStream();
274            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
275            exception.printStackTrace(stream);
276            stream.close();
277    
278            HashMap<String, String> headers = new HashMap<String, String>();
279            headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
280            headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
281    
282            if (command != null) {
283                final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
284                if (receiptId != null) {
285                    headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
286                }
287            }
288    
289            StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
290            sendToStomp(errorMessage);
291        }
292    
293        protected void onStompSend(StompFrame command) throws IOException, JMSException {
294            checkConnected();
295    
296            Map<String, String> headers = command.getHeaders();
297            String destination = headers.get(Stomp.Headers.Send.DESTINATION);
298            if (destination == null) {
299                throw new ProtocolException("SEND received without a Destination specified!");
300            }
301    
302            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
303            headers.remove("transaction");
304    
305            ActiveMQMessage message = convertMessage(command);
306    
307            message.setProducerId(producerId);
308            MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
309            message.setMessageId(id);
310            message.setJMSTimestamp(System.currentTimeMillis());
311    
312            if (stompTx != null) {
313                TransactionId activemqTx = transactions.get(stompTx);
314                if (activemqTx == null) {
315                    throw new ProtocolException("Invalid transaction id: " + stompTx);
316                }
317                message.setTransactionId(activemqTx);
318            }
319    
320            message.onSend();
321            sendToActiveMQ(message, createResponseHandler(command));
322        }
323    
324        protected void onStompNack(StompFrame command) throws ProtocolException {
325    
326            checkConnected();
327    
328            if (this.version.equals(Stomp.V1_0)) {
329                throw new ProtocolException("NACK received but connection is in v1.0 mode.");
330            }
331    
332            Map<String, String> headers = command.getHeaders();
333    
334            String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
335            if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) {
336                throw new ProtocolException("NACK received without a subscription id for acknowledge!");
337            }
338    
339            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
340            if (messageId == null && !this.version.equals(Stomp.V1_2)) {
341                throw new ProtocolException("NACK received without a message-id to acknowledge!");
342            }
343    
344            String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
345            if (ackId == null && this.version.equals(Stomp.V1_2)) {
346                throw new ProtocolException("NACK received without an ack header to acknowledge!");
347            }
348    
349            TransactionId activemqTx = null;
350            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
351            if (stompTx != null) {
352                activemqTx = transactions.get(stompTx);
353                if (activemqTx == null) {
354                    throw new ProtocolException("Invalid transaction id: " + stompTx);
355                }
356            }
357    
358            boolean nacked = false;
359    
360            if (ackId != null) {
361                AckEntry pendingAck = this.pedingAcks.get(ackId);
362                if (pendingAck != null) {
363                    messageId = pendingAck.getMessageId();
364                    MessageAck ack = pendingAck.onMessageNack(activemqTx);
365                    if (ack != null) {
366                        sendToActiveMQ(ack, createResponseHandler(command));
367                        nacked = true;
368                    }
369                }
370            } else if (subscriptionId != null) {
371                StompSubscription sub = this.subscriptions.get(subscriptionId);
372                if (sub != null) {
373                    MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
374                    if (ack != null) {
375                        sendToActiveMQ(ack, createResponseHandler(command));
376                        nacked = true;
377                    }
378                }
379            }
380    
381            if (!nacked) {
382                throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
383            }
384        }
385    
386        protected void onStompAck(StompFrame command) throws ProtocolException {
387            checkConnected();
388    
389            Map<String, String> headers = command.getHeaders();
390            String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
391            if (messageId == null && !(this.version.equals(Stomp.V1_2))) {
392                throw new ProtocolException("ACK received without a message-id to acknowledge!");
393            }
394    
395            String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
396            if (subscriptionId == null && this.version.equals(Stomp.V1_1)) {
397                throw new ProtocolException("ACK received without a subscription id for acknowledge!");
398            }
399    
400            String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
401            if (ackId == null && this.version.equals(Stomp.V1_2)) {
402                throw new ProtocolException("ACK received without a ack id for acknowledge!");
403            }
404    
405            TransactionId activemqTx = null;
406            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
407            if (stompTx != null) {
408                activemqTx = transactions.get(stompTx);
409                if (activemqTx == null) {
410                    throw new ProtocolException("Invalid transaction id: " + stompTx);
411                }
412            }
413    
414            boolean acked = false;
415    
416            if (ackId != null) {
417    
418                AckEntry pendingAck = this.pedingAcks.get(ackId);
419                if (pendingAck != null) {
420                    messageId = pendingAck.getMessageId();
421                    MessageAck ack = pendingAck.onMessageAck(activemqTx);
422                    if (ack != null) {
423                        sendToActiveMQ(ack, createResponseHandler(command));
424                        acked = true;
425                    }
426                }
427    
428            } else if (subscriptionId != null) {
429    
430                StompSubscription sub = this.subscriptions.get(subscriptionId);
431                if (sub != null) {
432                    MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
433                    if (ack != null) {
434                        sendToActiveMQ(ack, createResponseHandler(command));
435                        acked = true;
436                    }
437                }
438    
439            } else {
440    
441                // STOMP v1.0: acking with just a message id is very bogus since the same message id
442                // could have been sent to 2 different subscriptions on the same Stomp connection.
443                // For example, when 2 subs are created on the same topic.
444    
445                for (StompSubscription sub : subscriptionsByConsumerId.values()) {
446                    MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
447                    if (ack != null) {
448                        sendToActiveMQ(ack, createResponseHandler(command));
449                        acked = true;
450                        break;
451                    }
452                }
453            }
454    
455            if (!acked) {
456                throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
457            }
458        }
459    
460        protected void onStompBegin(StompFrame command) throws ProtocolException {
461            checkConnected();
462    
463            Map<String, String> headers = command.getHeaders();
464    
465            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
466    
467            if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
468                throw new ProtocolException("Must specify the transaction you are beginning");
469            }
470    
471            if (transactions.get(stompTx) != null) {
472                throw new ProtocolException("The transaction was allready started: " + stompTx);
473            }
474    
475            LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
476            transactions.put(stompTx, activemqTx);
477    
478            TransactionInfo tx = new TransactionInfo();
479            tx.setConnectionId(connectionId);
480            tx.setTransactionId(activemqTx);
481            tx.setType(TransactionInfo.BEGIN);
482    
483            sendToActiveMQ(tx, createResponseHandler(command));
484        }
485    
486        protected void onStompCommit(StompFrame command) throws ProtocolException {
487            checkConnected();
488    
489            Map<String, String> headers = command.getHeaders();
490    
491            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
492            if (stompTx == null) {
493                throw new ProtocolException("Must specify the transaction you are committing");
494            }
495    
496            TransactionId activemqTx = transactions.remove(stompTx);
497            if (activemqTx == null) {
498                throw new ProtocolException("Invalid transaction id: " + stompTx);
499            }
500    
501            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
502                sub.onStompCommit(activemqTx);
503            }
504    
505            TransactionInfo tx = new TransactionInfo();
506            tx.setConnectionId(connectionId);
507            tx.setTransactionId(activemqTx);
508            tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
509    
510            sendToActiveMQ(tx, createResponseHandler(command));
511        }
512    
513        protected void onStompAbort(StompFrame command) throws ProtocolException {
514            checkConnected();
515            Map<String, String> headers = command.getHeaders();
516    
517            String stompTx = headers.get(Stomp.Headers.TRANSACTION);
518            if (stompTx == null) {
519                throw new ProtocolException("Must specify the transaction you are committing");
520            }
521    
522            TransactionId activemqTx = transactions.remove(stompTx);
523            if (activemqTx == null) {
524                throw new ProtocolException("Invalid transaction id: " + stompTx);
525            }
526            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
527                try {
528                    sub.onStompAbort(activemqTx);
529                } catch (Exception e) {
530                    throw new ProtocolException("Transaction abort failed", false, e);
531                }
532            }
533    
534            TransactionInfo tx = new TransactionInfo();
535            tx.setConnectionId(connectionId);
536            tx.setTransactionId(activemqTx);
537            tx.setType(TransactionInfo.ROLLBACK);
538    
539            sendToActiveMQ(tx, createResponseHandler(command));
540        }
541    
542        protected void onStompSubscribe(StompFrame command) throws ProtocolException {
543            checkConnected();
544            FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
545            Map<String, String> headers = command.getHeaders();
546    
547            String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
548            String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
549    
550            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
551                throw new ProtocolException("SUBSCRIBE received without a subscription id!");
552            }
553    
554            final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
555    
556            if (actualDest == null) {
557                throw new ProtocolException("Invalid 'null' Destination.");
558            }
559    
560            final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
561            ConsumerInfo consumerInfo = new ConsumerInfo(id);
562            consumerInfo.setPrefetchSize(actualDest.isQueue() ?
563                    ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH :
564                    headers.containsKey("activemq.subscriptionName") ?
565                            ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
566            consumerInfo.setDispatchAsync(true);
567    
568            String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
569            if (browser != null && browser.equals(Stomp.TRUE)) {
570    
571                if (!this.version.equals(Stomp.V1_1)) {
572                    throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
573                }
574    
575                consumerInfo.setBrowser(true);
576                consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH);
577            }
578    
579            String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
580            if (selector != null) {
581                consumerInfo.setSelector("convert_string_expressions:" + selector);
582            }
583    
584            IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
585    
586            if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
587                throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
588            }
589    
590            consumerInfo.setDestination(translator.convertDestination(this, destination, true));
591    
592            StompSubscription stompSubscription;
593            if (!consumerInfo.isBrowser()) {
594                stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
595            } else {
596                stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
597            }
598            stompSubscription.setDestination(actualDest);
599    
600            String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
601            if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
602                stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
603            } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
604                stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
605            } else {
606                stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
607            }
608    
609            subscriptionsByConsumerId.put(id, stompSubscription);
610            // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
611            if (subscriptionId != null) {
612                subscriptions.put(subscriptionId, stompSubscription);
613            }
614    
615            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
616            if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
617    
618                final StompFrame cmd = command;
619                final int prefetch = consumerInfo.getPrefetchSize();
620    
621                // Since dispatch could beat the receipt we set prefetch to zero to start and then
622                // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
623                // an error message.
624                consumerInfo.setPrefetchSize(0);
625    
626                final ResponseHandler handler = new ResponseHandler() {
627                    public void onResponse(ProtocolConverter converter, Response response) throws IOException {
628                        if (response.isException()) {
629                            // Generally a command can fail.. but that does not invalidate the connection.
630                            // We report back the failure but we don't close the connection.
631                            Throwable exception = ((ExceptionResponse)response).getException();
632                            handleException(exception, cmd);
633                        } else {
634                            StompFrame sc = new StompFrame();
635                            sc.setAction(Stomp.Responses.RECEIPT);
636                            sc.setHeaders(new HashMap<String, String>(1));
637                            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
638                            stompTransport.sendToStomp(sc);
639    
640                            ConsumerControl control = new ConsumerControl();
641                            control.setPrefetch(prefetch);
642                            control.setDestination(actualDest);
643                            control.setConsumerId(id);
644    
645                            sendToActiveMQ(control, null);
646                        }
647                    }
648                };
649    
650                sendToActiveMQ(consumerInfo, handler);
651            } else {
652                sendToActiveMQ(consumerInfo, createResponseHandler(command));
653            }
654        }
655    
656        protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
657            checkConnected();
658            Map<String, String> headers = command.getHeaders();
659    
660            ActiveMQDestination destination = null;
661            Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
662            if (o != null) {
663                destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
664            }
665    
666            String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
667            if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
668                throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
669            }
670    
671            if (subscriptionId == null && destination == null) {
672                throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
673            }
674    
675            // check if it is a durable subscription
676            String durable = command.getHeaders().get("activemq.subscriptionName");
677            String clientId = durable;
678            if (this.version.equals(Stomp.V1_1)) {
679                clientId = connectionInfo.getClientId();
680            }
681    
682            if (durable != null) {
683                RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
684                info.setClientId(clientId);
685                info.setSubscriptionName(durable);
686                info.setConnectionId(connectionId);
687                sendToActiveMQ(info, createResponseHandler(command));
688                return;
689            }
690    
691            if (subscriptionId != null) {
692    
693                StompSubscription sub = this.subscriptions.remove(subscriptionId);
694                if (sub != null) {
695                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
696                    return;
697                }
698    
699            } else {
700    
701                // Unsubscribing using a destination is a bit weird if multiple subscriptions
702                // are created with the same destination.
703                for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
704                    StompSubscription sub = iter.next();
705                    if (destination != null && destination.equals(sub.getDestination())) {
706                        sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
707                        iter.remove();
708                        return;
709                    }
710                }
711            }
712    
713            throw new ProtocolException("No subscription matched.");
714        }
715    
716        ConnectionInfo connectionInfo = new ConnectionInfo();
717    
718        protected void onStompConnect(final StompFrame command) throws ProtocolException {
719    
720            if (connected.get()) {
721                throw new ProtocolException("Allready connected.");
722            }
723    
724            final Map<String, String> headers = command.getHeaders();
725    
726            // allow anyone to login for now
727            String login = headers.get(Stomp.Headers.Connect.LOGIN);
728            String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
729            String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
730            String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
731    
732            if (heartBeat == null) {
733                heartBeat = defaultHeartBeat;
734            }
735    
736            this.version = StompCodec.detectVersion(headers);
737    
738            configureInactivityMonitor(heartBeat.trim());
739    
740            IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
741            connectionInfo.setConnectionId(connectionId);
742            if (clientId != null) {
743                connectionInfo.setClientId(clientId);
744            } else {
745                connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
746            }
747    
748            connectionInfo.setResponseRequired(true);
749            connectionInfo.setUserName(login);
750            connectionInfo.setPassword(passcode);
751            connectionInfo.setTransportContext(command.getTransportContext());
752    
753            sendToActiveMQ(connectionInfo, new ResponseHandler() {
754                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
755    
756                    if (response.isException()) {
757                        // If the connection attempt fails we close the socket.
758                        Throwable exception = ((ExceptionResponse)response).getException();
759                        handleException(exception, command);
760                        getStompTransport().onException(IOExceptionSupport.create(exception));
761                        return;
762                    }
763    
764                    final SessionInfo sessionInfo = new SessionInfo(sessionId);
765                    sendToActiveMQ(sessionInfo, null);
766    
767                    final ProducerInfo producerInfo = new ProducerInfo(producerId);
768                    sendToActiveMQ(producerInfo, new ResponseHandler() {
769                        public void onResponse(ProtocolConverter converter, Response response) throws IOException {
770    
771                            if (response.isException()) {
772                                // If the connection attempt fails we close the socket.
773                                Throwable exception = ((ExceptionResponse)response).getException();
774                                handleException(exception, command);
775                                getStompTransport().onException(IOExceptionSupport.create(exception));
776                            }
777    
778                            connected.set(true);
779                            HashMap<String, String> responseHeaders = new HashMap<String, String>();
780    
781                            responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
782                            String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
783                            if (requestId == null) {
784                                // TODO legacy
785                                requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
786                            }
787                            if (requestId != null) {
788                                // TODO legacy
789                                responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
790                                responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
791                            }
792    
793                            responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
794                            responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
795                                                String.format("%d,%d", hbWriteInterval, hbReadInterval));
796                            responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
797    
798                            StompFrame sc = new StompFrame();
799                            sc.setAction(Stomp.Responses.CONNECTED);
800                            sc.setHeaders(responseHeaders);
801                            sendToStomp(sc);
802    
803                            StompWireFormat format = stompTransport.getWireFormat();
804                            if (format != null) {
805                                format.setStompVersion(version);
806                            }
807                        }
808                    });
809                }
810            });
811        }
812    
813        protected void onStompDisconnect(StompFrame command) throws ProtocolException {
814            if (connected.get()) {
815                sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
816                sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
817                connected.set(false);
818            }
819        }
820    
821        protected void checkConnected() throws ProtocolException {
822            if (!connected.get()) {
823                throw new ProtocolException("Not connected.");
824            }
825        }
826    
827        /**
828         * Dispatch a ActiveMQ command
829         *
830         * @param command
831         * @throws IOException
832         */
833        public void onActiveMQCommand(Command command) throws IOException, JMSException {
834            if (command.isResponse()) {
835                Response response = (Response)command;
836                ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
837                if (rh != null) {
838                    rh.onResponse(this, response);
839                } else {
840                    // Pass down any unexpected errors. Should this close the connection?
841                    if (response.isException()) {
842                        Throwable exception = ((ExceptionResponse)response).getException();
843                        handleException(exception, null);
844                    }
845                }
846            } else if (command.isMessageDispatch()) {
847                MessageDispatch md = (MessageDispatch)command;
848                StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
849                if (sub != null) {
850                    String ackId = null;
851                    if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO) {
852                        AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
853                        ackId = this.ACK_ID_GENERATOR.generateId();
854                        this.pedingAcks.put(ackId, pendingAck);
855                    }
856                    try {
857                        sub.onMessageDispatch(md, ackId);
858                    } catch (Exception ex) {
859                        if (ackId != null) {
860                            this.pedingAcks.remove(ackId);
861                        }
862                    }
863                }
864            } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
865                stompTransport.sendToStomp(ping);
866            } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
867                // Pass down any unexpected async errors. Should this close the connection?
868                Throwable exception = ((ConnectionError)command).getException();
869                handleException(exception, null);
870            }
871        }
872    
873        public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
874            ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
875            return msg;
876        }
877    
878        public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
879            if (ignoreTransformation == true) {
880                return frameTranslator.convertMessage(this, message);
881            } else {
882                return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
883            }
884        }
885    
886        public StompTransport getStompTransport() {
887            return stompTransport;
888        }
889    
890        public ActiveMQDestination createTempDestination(String name, boolean topic) {
891            ActiveMQDestination rc = tempDestinations.get(name);
892            if( rc == null ) {
893                if (topic) {
894                    rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
895                } else {
896                    rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
897                }
898                sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
899                tempDestinations.put(name, rc);
900                tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
901            }
902            return rc;
903        }
904    
905        public String getCreatedTempDestinationName(ActiveMQDestination destination) {
906            return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
907        }
908    
909        public String getDefaultHeartBeat() {
910            return defaultHeartBeat;
911        }
912    
913        public void setDefaultHeartBeat(String defaultHeartBeat) {
914            this.defaultHeartBeat = defaultHeartBeat;
915        }
916    
917        protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
918    
919            String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
920    
921            if (keepAliveOpts == null || keepAliveOpts.length != 2) {
922                throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
923            } else {
924    
925                try {
926                    hbReadInterval = Long.parseLong(keepAliveOpts[0]);
927                    hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
928                } catch(NumberFormatException e) {
929                    throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
930                }
931    
932                try {
933                    StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
934                    monitor.setReadCheckTime(hbReadInterval);
935                    monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
936                    monitor.setWriteCheckTime(hbWriteInterval);
937                    monitor.startMonitoring();
938                } catch(Exception ex) {
939                    hbReadInterval = 0;
940                    hbWriteInterval = 0;
941                }
942    
943                if (LOG.isDebugEnabled()) {
944                    LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
945                }
946            }
947        }
948    
949        protected void sendReceipt(StompFrame command) {
950            final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
951            if (receiptId != null) {
952                StompFrame sc = new StompFrame();
953                sc.setAction(Stomp.Responses.RECEIPT);
954                sc.setHeaders(new HashMap<String, String>(1));
955                sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
956                try {
957                    sendToStomp(sc);
958                } catch (IOException e) {
959                    LOG.warn("Could not send a receipt for " + command, e);
960                }
961            }
962        }
963    }