001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.BufferedReader;
020 import java.io.IOException;
021 import java.io.InputStream;
022 import java.io.InputStreamReader;
023 import java.io.OutputStreamWriter;
024 import java.io.PrintWriter;
025 import java.util.HashMap;
026 import java.util.Iterator;
027 import java.util.Map;
028 import java.util.concurrent.ConcurrentHashMap;
029 import java.util.concurrent.atomic.AtomicBoolean;
030
031 import javax.jms.JMSException;
032
033 import org.apache.activemq.broker.BrokerContext;
034 import org.apache.activemq.broker.BrokerContextAware;
035 import org.apache.activemq.command.ActiveMQDestination;
036 import org.apache.activemq.command.ActiveMQMessage;
037 import org.apache.activemq.command.ActiveMQTempQueue;
038 import org.apache.activemq.command.ActiveMQTempTopic;
039 import org.apache.activemq.command.Command;
040 import org.apache.activemq.command.CommandTypes;
041 import org.apache.activemq.command.ConnectionError;
042 import org.apache.activemq.command.ConnectionId;
043 import org.apache.activemq.command.ConnectionInfo;
044 import org.apache.activemq.command.ConsumerControl;
045 import org.apache.activemq.command.ConsumerId;
046 import org.apache.activemq.command.ConsumerInfo;
047 import org.apache.activemq.command.DestinationInfo;
048 import org.apache.activemq.command.ExceptionResponse;
049 import org.apache.activemq.command.LocalTransactionId;
050 import org.apache.activemq.command.MessageAck;
051 import org.apache.activemq.command.MessageDispatch;
052 import org.apache.activemq.command.MessageId;
053 import org.apache.activemq.command.ProducerId;
054 import org.apache.activemq.command.ProducerInfo;
055 import org.apache.activemq.command.RemoveSubscriptionInfo;
056 import org.apache.activemq.command.Response;
057 import org.apache.activemq.command.SessionId;
058 import org.apache.activemq.command.SessionInfo;
059 import org.apache.activemq.command.ShutdownInfo;
060 import org.apache.activemq.command.TransactionId;
061 import org.apache.activemq.command.TransactionInfo;
062 import org.apache.activemq.util.ByteArrayOutputStream;
063 import org.apache.activemq.util.FactoryFinder;
064 import org.apache.activemq.util.IOExceptionSupport;
065 import org.apache.activemq.util.IdGenerator;
066 import org.apache.activemq.util.IntrospectionSupport;
067 import org.apache.activemq.util.LongSequenceGenerator;
068 import org.slf4j.Logger;
069 import org.slf4j.LoggerFactory;
070
071 /**
072 * @author <a href="http://hiramchirino.com">chirino</a>
073 */
074 public class ProtocolConverter {
075
076 private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
077
078 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
079
080 private static final String BROKER_VERSION;
081 private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
082
083 static {
084 InputStream in = null;
085 String version = "5.6.0";
086 if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
087 BufferedReader reader = new BufferedReader(new InputStreamReader(in));
088 try {
089 version = reader.readLine();
090 } catch(Exception e) {
091 }
092 }
093 BROKER_VERSION = version;
094 }
095
096 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
097 private final SessionId sessionId = new SessionId(connectionId, -1);
098 private final ProducerId producerId = new ProducerId(sessionId, 1);
099
100 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
101 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
102 private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
103 private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
104
105 private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
106 private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
107 private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
108 private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
109 private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
110 private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
111 private final StompTransport stompTransport;
112
113 private final Object commnadIdMutex = new Object();
114 private int lastCommandId;
115 private final AtomicBoolean connected = new AtomicBoolean(false);
116 private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
117 private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
118 private final BrokerContext brokerContext;
119 private String version = "1.0";
120 private long hbReadInterval;
121 private long hbWriteInterval;
122 private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
123
124 public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
125 this.stompTransport = stompTransport;
126 this.brokerContext = brokerContext;
127 }
128
129 protected int generateCommandId() {
130 synchronized (commnadIdMutex) {
131 return lastCommandId++;
132 }
133 }
134
135 protected ResponseHandler createResponseHandler(final StompFrame command) {
136 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
137 if (receiptId != null) {
138 return new ResponseHandler() {
139 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
140 if (response.isException()) {
141 // Generally a command can fail.. but that does not invalidate the connection.
142 // We report back the failure but we don't close the connection.
143 Throwable exception = ((ExceptionResponse)response).getException();
144 handleException(exception, command);
145 } else {
146 StompFrame sc = new StompFrame();
147 sc.setAction(Stomp.Responses.RECEIPT);
148 sc.setHeaders(new HashMap<String, String>(1));
149 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
150 stompTransport.sendToStomp(sc);
151 }
152 }
153 };
154 }
155 return null;
156 }
157
158 protected void sendToActiveMQ(Command command, ResponseHandler handler) {
159 command.setCommandId(generateCommandId());
160 if (handler != null) {
161 command.setResponseRequired(true);
162 resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
163 }
164 stompTransport.sendToActiveMQ(command);
165 }
166
167 protected void sendToStomp(StompFrame command) throws IOException {
168 stompTransport.sendToStomp(command);
169 }
170
171 protected FrameTranslator findTranslator(String header) {
172 FrameTranslator translator = frameTranslator;
173 try {
174 if (header != null) {
175 translator = (FrameTranslator) FRAME_TRANSLATOR_FINDER
176 .newInstance(header);
177 if (translator instanceof BrokerContextAware) {
178 ((BrokerContextAware)translator).setBrokerContext(brokerContext);
179 }
180 }
181 } catch (Exception ignore) {
182 // if anything goes wrong use the default translator
183 }
184
185 return translator;
186 }
187
188 /**
189 * Convert a stomp command
190 *
191 * @param command
192 */
193 public void onStompCommand(StompFrame command) throws IOException, JMSException {
194 try {
195
196 if (command.getClass() == StompFrameError.class) {
197 throw ((StompFrameError)command).getException();
198 }
199
200 String action = command.getAction();
201 if (action.startsWith(Stomp.Commands.SEND)) {
202 onStompSend(command);
203 } else if (action.startsWith(Stomp.Commands.ACK)) {
204 onStompAck(command);
205 } else if (action.startsWith(Stomp.Commands.NACK)) {
206 onStompNack(command);
207 } else if (action.startsWith(Stomp.Commands.BEGIN)) {
208 onStompBegin(command);
209 } else if (action.startsWith(Stomp.Commands.COMMIT)) {
210 onStompCommit(command);
211 } else if (action.startsWith(Stomp.Commands.ABORT)) {
212 onStompAbort(command);
213 } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
214 onStompSubscribe(command);
215 } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
216 onStompUnsubscribe(command);
217 } else if (action.startsWith(Stomp.Commands.CONNECT) ||
218 action.startsWith(Stomp.Commands.STOMP)) {
219 onStompConnect(command);
220 } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
221 onStompDisconnect(command);
222 } else {
223 throw new ProtocolException("Unknown STOMP action: " + action);
224 }
225
226 } catch (ProtocolException e) {
227 handleException(e, command);
228 // Some protocol errors can cause the connection to get closed.
229 if (e.isFatal()) {
230 getStompTransport().onException(e);
231 }
232 }
233 }
234
235 protected void handleException(Throwable exception, StompFrame command) throws IOException {
236 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
237 if (LOG.isDebugEnabled()) {
238 LOG.debug("Exception detail", exception);
239 }
240
241 // Let the stomp client know about any protocol errors.
242 ByteArrayOutputStream baos = new ByteArrayOutputStream();
243 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
244 exception.printStackTrace(stream);
245 stream.close();
246
247 HashMap<String, String> headers = new HashMap<String, String>();
248 headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
249 headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
250
251 if (command != null) {
252 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
253 if (receiptId != null) {
254 headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
255 }
256 }
257
258 StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
259 sendToStomp(errorMessage);
260 }
261
262 protected void onStompSend(StompFrame command) throws IOException, JMSException {
263 checkConnected();
264
265 Map<String, String> headers = command.getHeaders();
266 String destination = headers.get(Stomp.Headers.Send.DESTINATION);
267 if (destination == null) {
268 throw new ProtocolException("SEND received without a Destination specified!");
269 }
270
271 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
272 headers.remove("transaction");
273
274 ActiveMQMessage message = convertMessage(command);
275
276 message.setProducerId(producerId);
277 MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
278 message.setMessageId(id);
279 message.setJMSTimestamp(System.currentTimeMillis());
280
281 if (stompTx != null) {
282 TransactionId activemqTx = transactions.get(stompTx);
283 if (activemqTx == null) {
284 throw new ProtocolException("Invalid transaction id: " + stompTx);
285 }
286 message.setTransactionId(activemqTx);
287 }
288
289 message.onSend();
290 sendToActiveMQ(message, createResponseHandler(command));
291 }
292
293 protected void onStompNack(StompFrame command) throws ProtocolException {
294
295 checkConnected();
296
297 if (this.version.equals(Stomp.V1_0)) {
298 throw new ProtocolException("NACK received but connection is in v1.0 mode.");
299 }
300
301 Map<String, String> headers = command.getHeaders();
302
303 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
304 if (subscriptionId == null) {
305 throw new ProtocolException("NACK received without a subscription id for acknowledge!");
306 }
307
308 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
309 if (messageId == null) {
310 throw new ProtocolException("NACK received without a message-id to acknowledge!");
311 }
312
313 TransactionId activemqTx = null;
314 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
315 if (stompTx != null) {
316 activemqTx = transactions.get(stompTx);
317 if (activemqTx == null) {
318 throw new ProtocolException("Invalid transaction id: " + stompTx);
319 }
320 }
321
322 if (subscriptionId != null) {
323 StompSubscription sub = this.subscriptions.get(subscriptionId);
324 if (sub != null) {
325 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
326 if (ack != null) {
327 sendToActiveMQ(ack, createResponseHandler(command));
328 } else {
329 throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
330 }
331 }
332 }
333 }
334
335 protected void onStompAck(StompFrame command) throws ProtocolException {
336 checkConnected();
337
338 Map<String, String> headers = command.getHeaders();
339 String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
340 if (messageId == null) {
341 throw new ProtocolException("ACK received without a message-id to acknowledge!");
342 }
343
344 String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
345 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
346 throw new ProtocolException("ACK received without a subscription id for acknowledge!");
347 }
348
349 TransactionId activemqTx = null;
350 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
351 if (stompTx != null) {
352 activemqTx = transactions.get(stompTx);
353 if (activemqTx == null) {
354 throw new ProtocolException("Invalid transaction id: " + stompTx);
355 }
356 }
357
358 boolean acked = false;
359
360 if (subscriptionId != null) {
361
362 StompSubscription sub = this.subscriptions.get(subscriptionId);
363 if (sub != null) {
364 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
365 if (ack != null) {
366 sendToActiveMQ(ack, createResponseHandler(command));
367 acked = true;
368 }
369 }
370
371 } else {
372
373 // TODO: acking with just a message id is very bogus since the same message id
374 // could have been sent to 2 different subscriptions on the same Stomp connection.
375 // For example, when 2 subs are created on the same topic.
376
377 for (StompSubscription sub : subscriptionsByConsumerId.values()) {
378 MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
379 if (ack != null) {
380 sendToActiveMQ(ack, createResponseHandler(command));
381 acked = true;
382 break;
383 }
384 }
385 }
386
387 if (!acked) {
388 throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
389 }
390 }
391
392 protected void onStompBegin(StompFrame command) throws ProtocolException {
393 checkConnected();
394
395 Map<String, String> headers = command.getHeaders();
396
397 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
398
399 if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
400 throw new ProtocolException("Must specify the transaction you are beginning");
401 }
402
403 if (transactions.get(stompTx) != null) {
404 throw new ProtocolException("The transaction was allready started: " + stompTx);
405 }
406
407 LocalTransactionId activemqTx = new LocalTransactionId(connectionId, transactionIdGenerator.getNextSequenceId());
408 transactions.put(stompTx, activemqTx);
409
410 TransactionInfo tx = new TransactionInfo();
411 tx.setConnectionId(connectionId);
412 tx.setTransactionId(activemqTx);
413 tx.setType(TransactionInfo.BEGIN);
414
415 sendToActiveMQ(tx, createResponseHandler(command));
416 }
417
418 protected void onStompCommit(StompFrame command) throws ProtocolException {
419 checkConnected();
420
421 Map<String, String> headers = command.getHeaders();
422
423 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
424 if (stompTx == null) {
425 throw new ProtocolException("Must specify the transaction you are committing");
426 }
427
428 TransactionId activemqTx = transactions.remove(stompTx);
429 if (activemqTx == null) {
430 throw new ProtocolException("Invalid transaction id: " + stompTx);
431 }
432
433 for (StompSubscription sub : subscriptionsByConsumerId.values()) {
434 sub.onStompCommit(activemqTx);
435 }
436
437 TransactionInfo tx = new TransactionInfo();
438 tx.setConnectionId(connectionId);
439 tx.setTransactionId(activemqTx);
440 tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
441
442 sendToActiveMQ(tx, createResponseHandler(command));
443 }
444
445 protected void onStompAbort(StompFrame command) throws ProtocolException {
446 checkConnected();
447 Map<String, String> headers = command.getHeaders();
448
449 String stompTx = headers.get(Stomp.Headers.TRANSACTION);
450 if (stompTx == null) {
451 throw new ProtocolException("Must specify the transaction you are committing");
452 }
453
454 TransactionId activemqTx = transactions.remove(stompTx);
455 if (activemqTx == null) {
456 throw new ProtocolException("Invalid transaction id: " + stompTx);
457 }
458 for (StompSubscription sub : subscriptionsByConsumerId.values()) {
459 try {
460 sub.onStompAbort(activemqTx);
461 } catch (Exception e) {
462 throw new ProtocolException("Transaction abort failed", false, e);
463 }
464 }
465
466 TransactionInfo tx = new TransactionInfo();
467 tx.setConnectionId(connectionId);
468 tx.setTransactionId(activemqTx);
469 tx.setType(TransactionInfo.ROLLBACK);
470
471 sendToActiveMQ(tx, createResponseHandler(command));
472 }
473
474 protected void onStompSubscribe(StompFrame command) throws ProtocolException {
475 checkConnected();
476 FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
477 Map<String, String> headers = command.getHeaders();
478
479 String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
480 String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
481
482 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
483 throw new ProtocolException("SUBSCRIBE received without a subscription id!");
484 }
485
486 final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
487
488 if (actualDest == null) {
489 throw new ProtocolException("Invalid Destination.");
490 }
491
492 final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
493 ConsumerInfo consumerInfo = new ConsumerInfo(id);
494 consumerInfo.setPrefetchSize(1000);
495 consumerInfo.setDispatchAsync(true);
496
497 String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
498 if (browser != null && browser.equals(Stomp.TRUE)) {
499
500 if (!this.version.equals(Stomp.V1_1)) {
501 throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
502 }
503
504 consumerInfo.setBrowser(true);
505 }
506
507 String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
508 consumerInfo.setSelector(selector);
509
510 IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
511
512 if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
513 throw new ProtocolException("Invliad Subscription: cannot durably subscribe to a Queue destination!");
514 }
515
516 consumerInfo.setDestination(translator.convertDestination(this, destination, true));
517
518 StompSubscription stompSubscription;
519 if (!consumerInfo.isBrowser()) {
520 stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
521 } else {
522 stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
523 }
524 stompSubscription.setDestination(actualDest);
525
526 String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
527 if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
528 stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
529 } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
530 stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
531 } else {
532 stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
533 }
534
535 subscriptionsByConsumerId.put(id, stompSubscription);
536 // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
537 if (subscriptionId != null) {
538 subscriptions.put(subscriptionId, stompSubscription);
539 }
540
541 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
542 if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
543
544 final StompFrame cmd = command;
545 final int prefetch = consumerInfo.getPrefetchSize();
546
547 // Since dispatch could beat the receipt we set prefetch to zero to start and then
548 // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
549 // an error message.
550 consumerInfo.setPrefetchSize(0);
551
552 final ResponseHandler handler = new ResponseHandler() {
553 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
554 if (response.isException()) {
555 // Generally a command can fail.. but that does not invalidate the connection.
556 // We report back the failure but we don't close the connection.
557 Throwable exception = ((ExceptionResponse)response).getException();
558 handleException(exception, cmd);
559 } else {
560 StompFrame sc = new StompFrame();
561 sc.setAction(Stomp.Responses.RECEIPT);
562 sc.setHeaders(new HashMap<String, String>(1));
563 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
564 stompTransport.sendToStomp(sc);
565
566 ConsumerControl control = new ConsumerControl();
567 control.setPrefetch(prefetch);
568 control.setDestination(actualDest);
569 control.setConsumerId(id);
570
571 sendToActiveMQ(control, null);
572 }
573 }
574 };
575
576 sendToActiveMQ(consumerInfo, handler);
577 } else {
578 sendToActiveMQ(consumerInfo, createResponseHandler(command));
579 }
580 }
581
582 protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
583 checkConnected();
584 Map<String, String> headers = command.getHeaders();
585
586 ActiveMQDestination destination = null;
587 Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
588 if (o != null) {
589 destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
590 }
591
592 String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
593 if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
594 throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
595 }
596
597 if (subscriptionId == null && destination == null) {
598 throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
599 }
600
601 // check if it is a durable subscription
602 String durable = command.getHeaders().get("activemq.subscriptionName");
603 String clientId = durable;
604 if (this.version.equals(Stomp.V1_1)) {
605 clientId = connectionInfo.getClientId();
606 }
607 if (durable != null) {
608 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
609 info.setClientId(clientId);
610 info.setSubscriptionName(durable);
611 info.setConnectionId(connectionId);
612 sendToActiveMQ(info, createResponseHandler(command));
613 return;
614 }
615
616 if (subscriptionId != null) {
617
618 StompSubscription sub = this.subscriptions.remove(subscriptionId);
619 if (sub != null) {
620 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
621 return;
622 }
623
624 } else {
625
626 // Unsubscribing using a destination is a bit weird if multiple subscriptions
627 // are created with the same destination.
628 for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
629 StompSubscription sub = iter.next();
630 if (destination != null && destination.equals(sub.getDestination())) {
631 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
632 iter.remove();
633 return;
634 }
635 }
636 }
637
638 throw new ProtocolException("No subscription matched.");
639 }
640
641 ConnectionInfo connectionInfo = new ConnectionInfo();
642
643 protected void onStompConnect(final StompFrame command) throws ProtocolException {
644
645 if (connected.get()) {
646 throw new ProtocolException("Allready connected.");
647 }
648
649 final Map<String, String> headers = command.getHeaders();
650
651 // allow anyone to login for now
652 String login = headers.get(Stomp.Headers.Connect.LOGIN);
653 String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
654 String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
655 String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
656
657 if (heartBeat == null) {
658 heartBeat = defaultHeartBeat;
659 }
660
661 this.version = StompCodec.detectVersion(headers);
662
663 configureInactivityMonitor(heartBeat.trim());
664
665 IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
666 connectionInfo.setConnectionId(connectionId);
667 if (clientId != null) {
668 connectionInfo.setClientId(clientId);
669 } else {
670 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
671 }
672
673 connectionInfo.setResponseRequired(true);
674 connectionInfo.setUserName(login);
675 connectionInfo.setPassword(passcode);
676 connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
677
678 sendToActiveMQ(connectionInfo, new ResponseHandler() {
679 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
680
681 if (response.isException()) {
682 // If the connection attempt fails we close the socket.
683 Throwable exception = ((ExceptionResponse)response).getException();
684 handleException(exception, command);
685 getStompTransport().onException(IOExceptionSupport.create(exception));
686 return;
687 }
688
689 final SessionInfo sessionInfo = new SessionInfo(sessionId);
690 sendToActiveMQ(sessionInfo, null);
691
692 final ProducerInfo producerInfo = new ProducerInfo(producerId);
693 sendToActiveMQ(producerInfo, new ResponseHandler() {
694 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
695
696 if (response.isException()) {
697 // If the connection attempt fails we close the socket.
698 Throwable exception = ((ExceptionResponse)response).getException();
699 handleException(exception, command);
700 getStompTransport().onException(IOExceptionSupport.create(exception));
701 }
702
703 connected.set(true);
704 HashMap<String, String> responseHeaders = new HashMap<String, String>();
705
706 responseHeaders.put(Stomp.Headers.Connected.SESSION, connectionInfo.getClientId());
707 String requestId = headers.get(Stomp.Headers.Connect.REQUEST_ID);
708 if (requestId == null) {
709 // TODO legacy
710 requestId = headers.get(Stomp.Headers.RECEIPT_REQUESTED);
711 }
712 if (requestId != null) {
713 // TODO legacy
714 responseHeaders.put(Stomp.Headers.Connected.RESPONSE_ID, requestId);
715 responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
716 }
717
718 responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
719 responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
720 String.format("%d,%d", hbWriteInterval, hbReadInterval));
721 responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
722
723 StompFrame sc = new StompFrame();
724 sc.setAction(Stomp.Responses.CONNECTED);
725 sc.setHeaders(responseHeaders);
726 sendToStomp(sc);
727
728 StompWireFormat format = stompTransport.getWireFormat();
729 if (format != null) {
730 format.setStompVersion(version);
731 }
732 }
733 });
734
735 }
736 });
737 }
738
739 protected void onStompDisconnect(StompFrame command) throws ProtocolException {
740 checkConnected();
741 sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
742 sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
743 connected.set(false);
744 }
745
746 protected void checkConnected() throws ProtocolException {
747 if (!connected.get()) {
748 throw new ProtocolException("Not connected.");
749 }
750 }
751
752 /**
753 * Dispatch a ActiveMQ command
754 *
755 * @param command
756 * @throws IOException
757 */
758 public void onActiveMQCommand(Command command) throws IOException, JMSException {
759 if (command.isResponse()) {
760 Response response = (Response)command;
761 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
762 if (rh != null) {
763 rh.onResponse(this, response);
764 } else {
765 // Pass down any unexpected errors. Should this close the connection?
766 if (response.isException()) {
767 Throwable exception = ((ExceptionResponse)response).getException();
768 handleException(exception, null);
769 }
770 }
771 } else if (command.isMessageDispatch()) {
772 MessageDispatch md = (MessageDispatch)command;
773 StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
774 if (sub != null) {
775 sub.onMessageDispatch(md);
776 }
777 } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
778 stompTransport.sendToStomp(ping);
779 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
780 // Pass down any unexpected async errors. Should this close the connection?
781 Throwable exception = ((ConnectionError)command).getException();
782 handleException(exception, null);
783 }
784 }
785
786 public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException {
787 ActiveMQMessage msg = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, command);
788 return msg;
789 }
790
791 public StompFrame convertMessage(ActiveMQMessage message, boolean ignoreTransformation) throws IOException, JMSException {
792 if (ignoreTransformation == true) {
793 return frameTranslator.convertMessage(this, message);
794 } else {
795 return findTranslator(message.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, message);
796 }
797 }
798
799 public StompTransport getStompTransport() {
800 return stompTransport;
801 }
802
803 public ActiveMQDestination createTempDestination(String name, boolean topic) {
804 ActiveMQDestination rc = tempDestinations.get(name);
805 if( rc == null ) {
806 if (topic) {
807 rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
808 } else {
809 rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
810 }
811 sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
812 tempDestinations.put(name, rc);
813 tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
814 }
815 return rc;
816 }
817
818 public String getCreatedTempDestinationName(ActiveMQDestination destination) {
819 return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
820 }
821
822 public String getDefaultHeartBeat() {
823 return defaultHeartBeat;
824 }
825
826 public void setDefaultHeartBeat(String defaultHeartBeat) {
827 this.defaultHeartBeat = defaultHeartBeat;
828 }
829
830 protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
831
832 String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
833
834 if (keepAliveOpts == null || keepAliveOpts.length != 2) {
835 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
836 } else {
837
838 try {
839 hbReadInterval = Long.parseLong(keepAliveOpts[0]);
840 hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
841 } catch(NumberFormatException e) {
842 throw new ProtocolException("Invalid heart-beat header:" + heartBeatConfig, true);
843 }
844
845 try {
846
847 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
848
849 monitor.setReadCheckTime(hbReadInterval);
850 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
851 monitor.setWriteCheckTime(hbWriteInterval);
852
853 monitor.startMonitoring();
854
855 } catch(Exception ex) {
856 hbReadInterval = 0;
857 hbWriteInterval = 0;
858 }
859
860 if (LOG.isDebugEnabled()) {
861 LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
862 }
863 }
864 }
865
866 protected void sendReceipt(StompFrame command) {
867 final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
868 if (receiptId != null) {
869 StompFrame sc = new StompFrame();
870 sc.setAction(Stomp.Responses.RECEIPT);
871 sc.setHeaders(new HashMap<String, String>(1));
872 sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
873 try {
874 sendToStomp(sc);
875 } catch (IOException e) {
876 LOG.warn("Could not send a receipt for " + command, e);
877 }
878 }
879 }
880 }