001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.LinkedList;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.ConsumerInfo;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageDispatch;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.TransactionId;
036    
037    /**
038     * Keeps track of the STOMP subscription so that acking is correctly done.
039     *
040     * @author <a href="http://hiramchirino.com">chirino</a>
041     */
042    public class StompSubscription {
043    
044        public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
045        public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
046        public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
047    
048        protected final ProtocolConverter protocolConverter;
049        protected final String subscriptionId;
050        protected final ConsumerInfo consumerInfo;
051    
052        protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
053        protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
054    
055        protected String ackMode = AUTO_ACK;
056        protected ActiveMQDestination destination;
057        protected String transformation;
058    
059        public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
060            this.protocolConverter = stompTransport;
061            this.subscriptionId = subscriptionId;
062            this.consumerInfo = consumerInfo;
063            this.transformation = transformation;
064        }
065    
066        void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
067            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
068            if (ackMode == CLIENT_ACK) {
069                synchronized (this) {
070                    dispatchedMessage.put(message.getMessageId(), md);
071                }
072            } else if (ackMode == INDIVIDUAL_ACK) {
073                synchronized (this) {
074                    dispatchedMessage.put(message.getMessageId(), md);
075                }
076            } else if (ackMode == AUTO_ACK) {
077                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
078                protocolConverter.getStompTransport().sendToActiveMQ(ack);
079            }
080    
081            boolean ignoreTransformation = false;
082    
083            if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
084                message.setReadOnlyProperties(false);
085                message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
086            } else {
087                if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
088                    ignoreTransformation = true;
089                }
090            }
091    
092            StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
093    
094            command.setAction(Stomp.Responses.MESSAGE);
095            if (subscriptionId != null) {
096                command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
097            }
098    
099            if (ackId != null) {
100                command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
101            }
102    
103            protocolConverter.getStompTransport().sendToStomp(command);
104        }
105    
106        synchronized void onStompAbort(TransactionId transactionId) {
107            unconsumedMessage.clear();
108        }
109    
110        void onStompCommit(TransactionId transactionId) {
111            MessageAck ack = null;
112            synchronized (this) {
113                for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
114                    @SuppressWarnings("rawtypes")
115                    Map.Entry entry = (Entry)iter.next();
116                    MessageDispatch msg = (MessageDispatch)entry.getValue();
117                    if (unconsumedMessage.contains(msg)) {
118                        iter.remove();
119                    }
120                }
121    
122                if (!unconsumedMessage.isEmpty()) {
123                    ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
124                    unconsumedMessage.clear();
125                }
126            }
127            // avoid contention with onMessageDispatch
128            if (ack != null) {
129                protocolConverter.getStompTransport().sendToActiveMQ(ack);
130            }
131        }
132    
133        synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
134    
135            MessageId msgId = new MessageId(messageId);
136    
137            if (!dispatchedMessage.containsKey(msgId)) {
138                return null;
139            }
140    
141            MessageAck ack = new MessageAck();
142            ack.setDestination(consumerInfo.getDestination());
143            ack.setConsumerId(consumerInfo.getConsumerId());
144    
145            if (ackMode == CLIENT_ACK) {
146                if (transactionId == null) {
147                    ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
148                } else {
149                    ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
150                }
151                int count = 0;
152                for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
153    
154                    @SuppressWarnings("rawtypes")
155                    Map.Entry entry = (Entry)iter.next();
156                    MessageId id = (MessageId)entry.getKey();
157                    MessageDispatch msg = (MessageDispatch)entry.getValue();
158    
159                    if (transactionId != null) {
160                        if (!unconsumedMessage.contains(msg)) {
161                            unconsumedMessage.add(msg);
162                            count++;
163                        }
164                    } else {
165                        iter.remove();
166                        count++;
167                    }
168    
169                    if (id.equals(msgId)) {
170                        ack.setLastMessageId(id);
171                        break;
172                    }
173                }
174                ack.setMessageCount(count);
175                if (transactionId != null) {
176                    ack.setTransactionId(transactionId);
177                }
178    
179            } else if (ackMode == INDIVIDUAL_ACK) {
180                ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
181                ack.setMessageID(msgId);
182                if (transactionId != null) {
183                    unconsumedMessage.add(dispatchedMessage.get(msgId));
184                    ack.setTransactionId(transactionId);
185                }
186                dispatchedMessage.remove(msgId);
187            }
188            return ack;
189        }
190    
191        public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
192    
193            MessageId msgId = new MessageId(messageId);
194    
195            if (!dispatchedMessage.containsKey(msgId)) {
196                return null;
197            }
198    
199            MessageAck ack = new MessageAck();
200            ack.setDestination(consumerInfo.getDestination());
201            ack.setConsumerId(consumerInfo.getConsumerId());
202            ack.setAckType(MessageAck.POSION_ACK_TYPE);
203            ack.setMessageID(msgId);
204            if (transactionId != null) {
205                unconsumedMessage.add(dispatchedMessage.get(msgId));
206                ack.setTransactionId(transactionId);
207            }
208            dispatchedMessage.remove(msgId);
209    
210            return ack;
211        }
212    
213        public String getAckMode() {
214            return ackMode;
215        }
216    
217        public void setAckMode(String ackMode) {
218            this.ackMode = ackMode;
219        }
220    
221        public String getSubscriptionId() {
222            return subscriptionId;
223        }
224    
225        public void setDestination(ActiveMQDestination destination) {
226            this.destination = destination;
227        }
228    
229        public ActiveMQDestination getDestination() {
230            return destination;
231        }
232    
233        public ConsumerInfo getConsumerInfo() {
234            return consumerInfo;
235        }
236    }