001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import javax.jms.Connection;
020    import javax.jms.ConnectionFactory;
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageConsumer;
025    import javax.jms.MessageListener;
026    import javax.jms.Session;
027    import javax.jms.Topic;
028    
029    /**
030     * 
031     */
032    public class MemConsumer extends MemMessageIdList implements MessageListener {
033    
034        static long ctr;
035    
036        protected Connection connection;
037        protected MessageConsumer consumer;
038        protected long counter;
039        protected boolean isParent;
040        protected boolean inOrder = true;
041    
042    
043        public MemConsumer() {
044            super();
045        }
046    
047        public MemConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException {
048            connection = fac.createConnection();
049            Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
050            if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) {
051                consumer = s.createDurableSubscriber((Topic) dest, consumerName);
052            } else {
053                consumer = s.createConsumer(dest);
054            }
055            consumer.setMessageListener(this);
056        }
057    
058        public MemConsumer(ConnectionFactory fac, Destination dest) throws JMSException {
059            this(fac, dest, null);
060        }
061    
062        public void start() throws JMSException {
063            connection.start();
064        }
065    
066        public void stop() throws JMSException {
067            connection.stop();
068        }
069    
070        public void shutDown() throws JMSException {
071            connection.close();
072        }
073    
074    
075        public Message receive() throws JMSException {
076            return consumer.receive();
077        }
078    
079        public Message receive(long wait) throws JMSException {
080            return consumer.receive(wait);
081        }
082    
083        public void onMessage(Message msg) {
084            super.onMessage(msg);
085    
086            if (isParent) {
087                try {
088                    long ctr = msg.getLongProperty("counter");
089                    if (counter != ctr) {
090                        inOrder = false;
091                    }
092                    counter++;
093    
094                } catch (Exception e) {
095                    e.printStackTrace();
096                }
097            }
098        }
099    
100    
101        public boolean isInOrder() {
102            return inOrder;
103        }
104    
105    
106        public void setAsParent(boolean isParent) {
107            this.isParent = isParent;
108        }
109    
110        public boolean isParent() {
111            return this.isParent;
112        }
113    
114    
115    }