001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.tool;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import javax.jms.JMSException;
023    import javax.jms.Message;
024    import javax.jms.MessageListener;
025    
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * A simple container of messages for performing testing and rendezvous style
031     * code. You can use this class a {@link MessageListener} and then make
032     * assertions about how many messages it has received allowing a certain maximum
033     * amount of time to ensure that the test does not hang forever.
034     * <p/>
035     * Also you can chain these instances together with the
036     * {@link #setParent(MessageListener)} method so that you can aggregate the
037     * total number of messages consumed across a number of consumers.
038     *
039     * 
040     */
041    public class MemMessageIdList implements MessageListener {
042    
043        protected static final Logger LOG = LoggerFactory.getLogger(MemMessageIdList.class);
044    
045        private List<String> messageIds = new ArrayList<String>();
046        private Object semaphore;
047        private boolean verbose;
048        private MessageListener parent;
049        private long maximumDuration = 15000L;
050    
051        public MemMessageIdList() {
052            this(new Object());
053        }
054    
055        public MemMessageIdList(Object semaphore) {
056            this.semaphore = semaphore;
057        }
058    
059        public boolean equals(Object that) {
060            if (that instanceof MemMessageIdList) {
061                MemMessageIdList thatListMem = (MemMessageIdList) that;
062                return getMessageIds().equals(thatListMem.getMessageIds());
063            }
064            return false;
065        }
066    
067        public int hashCode() {
068            synchronized (semaphore) {
069                return messageIds.hashCode() + 1;
070            }
071        }
072    
073        public String toString() {
074            synchronized (semaphore) {
075                return messageIds.toString();
076            }
077        }
078    
079        /**
080         * @return all the messages on the list so far, clearing the buffer
081         */
082        public List<String> flushMessages() {
083            synchronized (semaphore) {
084                List<String> answer = new ArrayList<String>(messageIds);
085                messageIds.clear();
086                return answer;
087            }
088        }
089    
090        public synchronized List<String> getMessageIds() {
091            synchronized (semaphore) {
092                return new ArrayList<String>(messageIds);
093            }
094        }
095    
096        public void onMessage(Message message) {
097            String id = null;
098            try {
099                id = message.getJMSMessageID();
100                synchronized (semaphore) {
101                    messageIds.add(id);
102                    semaphore.notifyAll();
103                }
104                if (verbose) {
105                    LOG.info("Received message: " + message);
106                }
107            } catch (JMSException e) {
108                e.printStackTrace();
109            }
110            if (parent != null) {
111                parent.onMessage(message);
112            }
113        }
114    
115        public int getMessageCount() {
116            synchronized (semaphore) {
117                return messageIds.size();
118            }
119        }
120    
121        public void waitForMessagesToArrive(int messageCount) {
122            LOG.info("Waiting for " + messageCount + " message(s) to arrive");
123    
124            long start = System.currentTimeMillis();
125    
126            for (int i = 0; i < messageCount; i++) {
127                try {
128                    if (hasReceivedMessages(messageCount)) {
129                        break;
130                    }
131                    long duration = System.currentTimeMillis() - start;
132                    if (duration >= maximumDuration) {
133                        break;
134                    }
135                    synchronized (semaphore) {
136                        semaphore.wait(maximumDuration - duration);
137                    }
138                } catch (InterruptedException e) {
139                    LOG.info("Caught: " + e);
140                }
141            }
142            long end = System.currentTimeMillis() - start;
143    
144            LOG.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages");
145        }
146    
147    
148        public boolean hasReceivedMessage() {
149            return getMessageCount() == 0;
150        }
151    
152        public boolean hasReceivedMessages(int messageCount) {
153            return getMessageCount() >= messageCount;
154        }
155    
156        public boolean isVerbose() {
157            return verbose;
158        }
159    
160        public void setVerbose(boolean verbose) {
161            this.verbose = verbose;
162        }
163    
164        public MessageListener getParent() {
165            return parent;
166        }
167    
168        /**
169         * Allows a parent listener to be specified such as to aggregate messages
170         * consumed across consumers
171         */
172        public void setParent(MessageListener parent) {
173            this.parent = parent;
174        }
175    
176    }