001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import javax.management.openmbean.CompositeData;
020    import javax.management.openmbean.OpenDataException;
021    import javax.jms.JMSException;
022    
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.region.Queue;
025    import org.apache.activemq.broker.region.QueueMessageReference;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.util.BrokerSupport;
029    
030    /**
031     * Provides a JMX Management view of a Queue.
032     */
033    public class QueueView extends DestinationView implements QueueViewMBean {
034        public QueueView(ManagedRegionBroker broker, Queue destination) {
035            super(broker, destination);
036        }
037    
038        public CompositeData getMessage(String messageId) throws OpenDataException {
039            CompositeData result = null;
040            QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
041    
042            if (ref != null) {
043                    Message rc = ref.getMessage();
044                    if (rc == null) {
045                        return null;
046                    }
047                    result = OpenTypeSupport.convert(rc);
048            }
049    
050            return result;
051        }
052    
053        public void purge() throws Exception {
054            ((Queue)destination).purge();
055        }
056    
057        public boolean removeMessage(String messageId) throws Exception {
058            return ((Queue)destination).removeMessage(messageId);
059        }
060    
061        public int removeMatchingMessages(String selector) throws Exception {
062            return ((Queue)destination).removeMatchingMessages(selector);
063        }
064    
065        public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
066            return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
067        }
068    
069        public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
070            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
071            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
072            return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
073        }
074    
075        public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
076            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
077            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
078            return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
079        }
080    
081        public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
082            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
083            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
084            return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
085        }
086    
087        public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
088            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
089            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
090            return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
091        }
092    
093        public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
094            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
095            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
096            return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
097        }
098    
099        public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
100            ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
101            ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
102            return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
103        }
104    
105        /**
106         * Moves a message back to its original destination
107         */
108        public boolean retryMessage(String messageId) throws Exception {
109            Queue queue = (Queue) destination;
110            QueueMessageReference ref = queue.getMessage(messageId);
111            Message rc = ref.getMessage();
112            if (rc != null) {
113                ActiveMQDestination originalDestination = rc.getOriginalDestination();
114                if (originalDestination != null) {
115                    ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
116                    return queue.moveMessageTo(context, ref, originalDestination);
117                }
118                else {
119                    throw new JMSException("No original destination for message: "+ messageId);
120                }
121            }
122            else {
123                throw new JMSException("Could not find message: "+ messageId);
124            }
125        }
126    
127        public int cursorSize() {
128            Queue queue = (Queue) destination;
129            if (queue.getMessages() != null){
130                return queue.getMessages().size();
131            }
132            return 0;
133        }
134    
135    
136        public boolean doesCursorHaveMessagesBuffered() {
137           Queue queue = (Queue) destination;
138           if (queue.getMessages() != null){
139               return queue.getMessages().hasMessagesBufferedToDeliver();
140           }
141           return false;
142    
143        }
144    
145    
146        public boolean doesCursorHaveSpace() {
147            Queue queue = (Queue) destination;
148            if (queue.getMessages() != null){
149                return queue.getMessages().hasSpace();
150            }
151            return false;
152        }
153    
154    
155        public long getCursorMemoryUsage() {
156            Queue queue = (Queue) destination;
157            if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
158                return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
159            }
160            return 0;
161        }
162    
163        public int getCursorPercentUsage() {
164            Queue queue = (Queue) destination;
165            if (queue.getMessages() != null &&  queue.getMessages().getSystemUsage() != null){
166                return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
167            }
168            return 0;
169        }
170    
171        public boolean isCursorFull() {
172            Queue queue = (Queue) destination;
173            if (queue.getMessages() != null){
174                return queue.getMessages().isFull();
175            }
176            return false;
177        }
178    
179        public boolean isCacheEnabled() {
180            Queue queue = (Queue) destination;
181            if (queue.getMessages() != null){
182                return queue.getMessages().isCacheEnabled();
183            }
184            return false;
185        }
186    }