001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.LinkedList;
023import java.util.Map;
024import java.util.Map.Entry;
025
026import javax.jms.JMSException;
027
028import org.apache.activemq.command.ActiveMQBytesMessage;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQMessage;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.command.MessageAck;
033import org.apache.activemq.command.MessageDispatch;
034import org.apache.activemq.command.MessageId;
035import org.apache.activemq.command.TransactionId;
036
037/**
038 * Keeps track of the STOMP subscription so that acking is correctly done.
039 *
040 * @author <a href="http://hiramchirino.com">chirino</a>
041 */
042public class StompSubscription {
043
044    public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
045    public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
046    public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
047
048    protected final ProtocolConverter protocolConverter;
049    protected final String subscriptionId;
050    protected final ConsumerInfo consumerInfo;
051
052    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
053    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
054
055    protected String ackMode = AUTO_ACK;
056    protected ActiveMQDestination destination;
057    protected String transformation;
058
059    public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
060        this.protocolConverter = stompTransport;
061        this.subscriptionId = subscriptionId;
062        this.consumerInfo = consumerInfo;
063        this.transformation = transformation;
064    }
065
066    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
067        ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
068        if (ackMode == CLIENT_ACK) {
069            synchronized (this) {
070                dispatchedMessage.put(message.getMessageId(), md);
071            }
072        } else if (ackMode == INDIVIDUAL_ACK) {
073            synchronized (this) {
074                dispatchedMessage.put(message.getMessageId(), md);
075            }
076        } else if (ackMode == AUTO_ACK) {
077            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
078            protocolConverter.getStompTransport().sendToActiveMQ(ack);
079        }
080
081        boolean ignoreTransformation = false;
082
083        if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
084            message.setReadOnlyProperties(false);
085            message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
086        } else {
087            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
088                ignoreTransformation = true;
089            }
090        }
091
092        StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
093
094        command.setAction(Stomp.Responses.MESSAGE);
095        if (subscriptionId != null) {
096            command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
097        }
098
099        if (ackId != null) {
100            command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
101        }
102
103        protocolConverter.getStompTransport().sendToStomp(command);
104    }
105
106    synchronized void onStompAbort(TransactionId transactionId) {
107        unconsumedMessage.clear();
108    }
109
110    void onStompCommit(TransactionId transactionId) {
111        MessageAck ack = null;
112        synchronized (this) {
113            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
114                @SuppressWarnings("rawtypes")
115                Map.Entry entry = (Entry)iter.next();
116                MessageDispatch msg = (MessageDispatch)entry.getValue();
117                if (unconsumedMessage.contains(msg)) {
118                    iter.remove();
119                }
120            }
121
122            if (!unconsumedMessage.isEmpty()) {
123                ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
124                unconsumedMessage.clear();
125            }
126        }
127        // avoid contention with onMessageDispatch
128        if (ack != null) {
129            protocolConverter.getStompTransport().sendToActiveMQ(ack);
130        }
131    }
132
133    synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
134
135        MessageId msgId = new MessageId(messageId);
136
137        if (!dispatchedMessage.containsKey(msgId)) {
138            return null;
139        }
140
141        MessageAck ack = new MessageAck();
142        ack.setDestination(consumerInfo.getDestination());
143        ack.setConsumerId(consumerInfo.getConsumerId());
144
145        if (ackMode == CLIENT_ACK) {
146            if (transactionId == null) {
147                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
148            } else {
149                ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
150            }
151            int count = 0;
152            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
153
154                @SuppressWarnings("rawtypes")
155                Map.Entry entry = (Entry)iter.next();
156                MessageId id = (MessageId)entry.getKey();
157                MessageDispatch msg = (MessageDispatch)entry.getValue();
158
159                if (transactionId != null) {
160                    if (!unconsumedMessage.contains(msg)) {
161                        unconsumedMessage.add(msg);
162                        count++;
163                    }
164                } else {
165                    iter.remove();
166                    count++;
167                }
168
169                if (id.equals(msgId)) {
170                    ack.setLastMessageId(id);
171                    break;
172                }
173            }
174            ack.setMessageCount(count);
175            if (transactionId != null) {
176                ack.setTransactionId(transactionId);
177            }
178
179        } else if (ackMode == INDIVIDUAL_ACK) {
180            ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
181            ack.setMessageID(msgId);
182            if (transactionId != null) {
183                unconsumedMessage.add(dispatchedMessage.get(msgId));
184                ack.setTransactionId(transactionId);
185            }
186            dispatchedMessage.remove(msgId);
187        }
188        return ack;
189    }
190
191    public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
192
193        MessageId msgId = new MessageId(messageId);
194
195        if (!dispatchedMessage.containsKey(msgId)) {
196            return null;
197        }
198
199        MessageAck ack = new MessageAck();
200        ack.setDestination(consumerInfo.getDestination());
201        ack.setConsumerId(consumerInfo.getConsumerId());
202        ack.setAckType(MessageAck.POSION_ACK_TYPE);
203        ack.setMessageID(msgId);
204        if (transactionId != null) {
205            unconsumedMessage.add(dispatchedMessage.get(msgId));
206            ack.setTransactionId(transactionId);
207        }
208        dispatchedMessage.remove(msgId);
209
210        return ack;
211    }
212
213    public String getAckMode() {
214        return ackMode;
215    }
216
217    public void setAckMode(String ackMode) {
218        this.ackMode = ackMode;
219    }
220
221    public String getSubscriptionId() {
222        return subscriptionId;
223    }
224
225    public void setDestination(ActiveMQDestination destination) {
226        this.destination = destination;
227    }
228
229    public ActiveMQDestination getDestination() {
230        return destination;
231    }
232
233    public ConsumerInfo getConsumerInfo() {
234        return consumerInfo;
235    }
236}