001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.LinkedHashMap;
022 import java.util.LinkedList;
023 import java.util.Map;
024 import java.util.Map.Entry;
025
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.command.ActiveMQBytesMessage;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQMessage;
031 import org.apache.activemq.command.ConsumerInfo;
032 import org.apache.activemq.command.MessageAck;
033 import org.apache.activemq.command.MessageDispatch;
034 import org.apache.activemq.command.MessageId;
035 import org.apache.activemq.command.TransactionId;
036
037 /**
038 * Keeps track of the STOMP subscription so that acking is correctly done.
039 *
040 * @author <a href="http://hiramchirino.com">chirino</a>
041 */
042 public class StompSubscription {
043
044 public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
045 public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
046 public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
047
048 protected final ProtocolConverter protocolConverter;
049 protected final String subscriptionId;
050 protected final ConsumerInfo consumerInfo;
051
052 protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
053 protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
054
055 protected String ackMode = AUTO_ACK;
056 protected ActiveMQDestination destination;
057 protected String transformation;
058
059 public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
060 this.protocolConverter = stompTransport;
061 this.subscriptionId = subscriptionId;
062 this.consumerInfo = consumerInfo;
063 this.transformation = transformation;
064 }
065
066 void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
067 ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
068 if (ackMode == CLIENT_ACK) {
069 synchronized (this) {
070 dispatchedMessage.put(message.getMessageId(), md);
071 }
072 } else if (ackMode == INDIVIDUAL_ACK) {
073 synchronized (this) {
074 dispatchedMessage.put(message.getMessageId(), md);
075 }
076 } else if (ackMode == AUTO_ACK) {
077 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
078 protocolConverter.getStompTransport().sendToActiveMQ(ack);
079 }
080
081 boolean ignoreTransformation = false;
082
083 if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
084 message.setReadOnlyProperties(false);
085 message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
086 } else {
087 if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
088 ignoreTransformation = true;
089 }
090 }
091
092 StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
093
094 command.setAction(Stomp.Responses.MESSAGE);
095 if (subscriptionId != null) {
096 command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
097 }
098
099 if (ackId != null) {
100 command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
101 }
102
103 protocolConverter.getStompTransport().sendToStomp(command);
104 }
105
106 synchronized void onStompAbort(TransactionId transactionId) {
107 unconsumedMessage.clear();
108 }
109
110 void onStompCommit(TransactionId transactionId) {
111 MessageAck ack = null;
112 synchronized (this) {
113 for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
114 @SuppressWarnings("rawtypes")
115 Map.Entry entry = (Entry)iter.next();
116 MessageDispatch msg = (MessageDispatch)entry.getValue();
117 if (unconsumedMessage.contains(msg)) {
118 iter.remove();
119 }
120 }
121
122 if (!unconsumedMessage.isEmpty()) {
123 ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
124 unconsumedMessage.clear();
125 }
126 }
127 // avoid contention with onMessageDispatch
128 if (ack != null) {
129 protocolConverter.getStompTransport().sendToActiveMQ(ack);
130 }
131 }
132
133 synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
134
135 MessageId msgId = new MessageId(messageId);
136
137 if (!dispatchedMessage.containsKey(msgId)) {
138 return null;
139 }
140
141 MessageAck ack = new MessageAck();
142 ack.setDestination(consumerInfo.getDestination());
143 ack.setConsumerId(consumerInfo.getConsumerId());
144
145 if (ackMode == CLIENT_ACK) {
146 if (transactionId == null) {
147 ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
148 } else {
149 ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
150 }
151 int count = 0;
152 for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
153
154 @SuppressWarnings("rawtypes")
155 Map.Entry entry = (Entry)iter.next();
156 MessageId id = (MessageId)entry.getKey();
157 MessageDispatch msg = (MessageDispatch)entry.getValue();
158
159 if (transactionId != null) {
160 if (!unconsumedMessage.contains(msg)) {
161 unconsumedMessage.add(msg);
162 count++;
163 }
164 } else {
165 iter.remove();
166 count++;
167 }
168
169 if (id.equals(msgId)) {
170 ack.setLastMessageId(id);
171 break;
172 }
173 }
174 ack.setMessageCount(count);
175 if (transactionId != null) {
176 ack.setTransactionId(transactionId);
177 }
178
179 } else if (ackMode == INDIVIDUAL_ACK) {
180 ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
181 ack.setMessageID(msgId);
182 if (transactionId != null) {
183 unconsumedMessage.add(dispatchedMessage.get(msgId));
184 ack.setTransactionId(transactionId);
185 }
186 dispatchedMessage.remove(msgId);
187 }
188 return ack;
189 }
190
191 public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
192
193 MessageId msgId = new MessageId(messageId);
194
195 if (!dispatchedMessage.containsKey(msgId)) {
196 return null;
197 }
198
199 MessageAck ack = new MessageAck();
200 ack.setDestination(consumerInfo.getDestination());
201 ack.setConsumerId(consumerInfo.getConsumerId());
202 ack.setAckType(MessageAck.POSION_ACK_TYPE);
203 ack.setMessageID(msgId);
204 if (transactionId != null) {
205 unconsumedMessage.add(dispatchedMessage.get(msgId));
206 ack.setTransactionId(transactionId);
207 }
208 dispatchedMessage.remove(msgId);
209
210 return ack;
211 }
212
213 public String getAckMode() {
214 return ackMode;
215 }
216
217 public void setAckMode(String ackMode) {
218 this.ackMode = ackMode;
219 }
220
221 public String getSubscriptionId() {
222 return subscriptionId;
223 }
224
225 public void setDestination(ActiveMQDestination destination) {
226 this.destination = destination;
227 }
228
229 public ActiveMQDestination getDestination() {
230 return destination;
231 }
232
233 public ConsumerInfo getConsumerInfo() {
234 return consumerInfo;
235 }
236 }