001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import org.apache.activemq.command.*;
020    
021    import javax.jms.JMSException;
022    import java.io.IOException;
023    import java.util.Iterator;
024    import java.util.LinkedHashMap;
025    import java.util.LinkedList;
026    import java.util.Map;
027    import java.util.Map.Entry;
028    
029    /**
030     * Keeps track of the STOMP subscription so that acking is correctly done.
031     *
032     * @author <a href="http://hiramchirino.com">chirino</a>
033     */
034    public class StompSubscription {
035    
036        public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
037        public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
038        public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
039    
040        protected final ProtocolConverter protocolConverter;
041        protected final String subscriptionId;
042        protected final ConsumerInfo consumerInfo;
043    
044        protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
045        protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
046    
047        protected String ackMode = AUTO_ACK;
048        protected ActiveMQDestination destination;
049        protected String transformation;
050    
051        public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
052            this.protocolConverter = stompTransport;
053            this.subscriptionId = subscriptionId;
054            this.consumerInfo = consumerInfo;
055            this.transformation = transformation;
056        }
057    
058        void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
059            ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
060            if (ackMode == CLIENT_ACK) {
061                synchronized (this) {
062                    dispatchedMessage.put(message.getMessageId(), md);
063                }
064            } else if (ackMode == INDIVIDUAL_ACK) {
065                synchronized (this) {
066                    dispatchedMessage.put(message.getMessageId(), md);
067                }
068            } else if (ackMode == AUTO_ACK) {
069                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
070                protocolConverter.getStompTransport().sendToActiveMQ(ack);
071            }
072    
073            boolean ignoreTransformation = false;
074    
075            if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
076                   message.setReadOnlyProperties(false);
077                message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
078            } else {
079                if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
080                    ignoreTransformation = true;
081                }
082            }
083    
084            StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
085    
086            command.setAction(Stomp.Responses.MESSAGE);
087            if (subscriptionId != null) {
088                command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
089            }
090    
091            protocolConverter.getStompTransport().sendToStomp(command);
092        }
093    
094        synchronized void onStompAbort(TransactionId transactionId) {
095            unconsumedMessage.clear();
096        }
097    
098        synchronized void onStompCommit(TransactionId transactionId) {
099            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
100                @SuppressWarnings("rawtypes")
101                Map.Entry entry = (Entry)iter.next();
102                MessageDispatch msg = (MessageDispatch)entry.getValue();
103                if (unconsumedMessage.contains(msg)) {
104                    iter.remove();
105                }
106            }
107    
108            if (!unconsumedMessage.isEmpty()) {
109                MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
110                protocolConverter.getStompTransport().sendToActiveMQ(ack);
111                unconsumedMessage.clear();
112            }
113        }
114    
115        synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
116    
117            MessageId msgId = new MessageId(messageId);
118    
119            if (!dispatchedMessage.containsKey(msgId)) {
120                return null;
121            }
122    
123            MessageAck ack = new MessageAck();
124            ack.setDestination(consumerInfo.getDestination());
125            ack.setConsumerId(consumerInfo.getConsumerId());
126    
127            if (ackMode == CLIENT_ACK) {
128                if (transactionId == null) {
129                    ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
130                } else {
131                    ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
132                }
133                int count = 0;
134                for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
135    
136                    @SuppressWarnings("rawtypes")
137                    Map.Entry entry = (Entry)iter.next();
138                    MessageId id = (MessageId)entry.getKey();
139                    MessageDispatch msg = (MessageDispatch)entry.getValue();
140    
141                    if (transactionId != null) {
142                        if (!unconsumedMessage.contains(msg)) {
143                            unconsumedMessage.add(msg);
144                            count++;
145                        }
146                    } else {
147                        iter.remove();
148                        count++;
149                    }
150    
151                    if (id.equals(msgId)) {
152                        ack.setLastMessageId(id);
153                        break;
154                    }
155                }
156                ack.setMessageCount(count);
157                if (transactionId != null) {
158                    ack.setTransactionId(transactionId);
159                }
160    
161            } else if (ackMode == INDIVIDUAL_ACK) {
162                ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
163                ack.setMessageID(msgId);
164                if (transactionId != null) {
165                    unconsumedMessage.add(dispatchedMessage.get(msgId));
166                    ack.setTransactionId(transactionId);
167                }
168                dispatchedMessage.remove(msgId);
169            }
170            return ack;
171        }
172    
173        public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
174    
175            MessageId msgId = new MessageId(messageId);
176    
177            if (!dispatchedMessage.containsKey(msgId)) {
178                return null;
179            }
180    
181            MessageAck ack = new MessageAck();
182            ack.setDestination(consumerInfo.getDestination());
183            ack.setConsumerId(consumerInfo.getConsumerId());
184            ack.setAckType(MessageAck.POSION_ACK_TYPE);
185            ack.setMessageID(msgId);
186            if (transactionId != null) {
187                unconsumedMessage.add(dispatchedMessage.get(msgId));
188                ack.setTransactionId(transactionId);
189            }
190            dispatchedMessage.remove(msgId);
191    
192            return ack;
193        }
194    
195        public String getAckMode() {
196            return ackMode;
197        }
198    
199        public void setAckMode(String ackMode) {
200            this.ackMode = ackMode;
201        }
202    
203        public String getSubscriptionId() {
204            return subscriptionId;
205        }
206    
207        public void setDestination(ActiveMQDestination destination) {
208            this.destination = destination;
209        }
210    
211        public ActiveMQDestination getDestination() {
212            return destination;
213        }
214    
215        public ConsumerInfo getConsumerInfo() {
216            return consumerInfo;
217        }
218    }