001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import javax.management.openmbean.CompositeData;
020 import javax.management.openmbean.OpenDataException;
021 import javax.jms.JMSException;
022
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.broker.region.Queue;
025 import org.apache.activemq.broker.region.QueueMessageReference;
026 import org.apache.activemq.command.ActiveMQDestination;
027 import org.apache.activemq.command.Message;
028 import org.apache.activemq.util.BrokerSupport;
029
030 /**
031 * Provides a JMX Management view of a Queue.
032 */
033 public class QueueView extends DestinationView implements QueueViewMBean {
034 public QueueView(ManagedRegionBroker broker, Queue destination) {
035 super(broker, destination);
036 }
037
038 public CompositeData getMessage(String messageId) throws OpenDataException {
039 CompositeData result = null;
040 QueueMessageReference ref = ((Queue)destination).getMessage(messageId);
041
042 if (ref != null) {
043 Message rc = ref.getMessage();
044 if (rc == null) {
045 return null;
046 }
047 result = OpenTypeSupport.convert(rc);
048 }
049
050 return result;
051 }
052
053 public void purge() throws Exception {
054 ((Queue)destination).purge();
055 }
056
057 public boolean removeMessage(String messageId) throws Exception {
058 return ((Queue)destination).removeMessage(messageId);
059 }
060
061 public int removeMatchingMessages(String selector) throws Exception {
062 return ((Queue)destination).removeMatchingMessages(selector);
063 }
064
065 public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
066 return ((Queue)destination).removeMatchingMessages(selector, maximumMessages);
067 }
068
069 public boolean copyMessageTo(String messageId, String destinationName) throws Exception {
070 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
071 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
072 return ((Queue)destination).copyMessageTo(context, messageId, toDestination);
073 }
074
075 public int copyMatchingMessagesTo(String selector, String destinationName) throws Exception {
076 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
077 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
078 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination);
079 }
080
081 public int copyMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
082 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
083 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
084 return ((Queue)destination).copyMatchingMessagesTo(context, selector, toDestination, maximumMessages);
085 }
086
087 public boolean moveMessageTo(String messageId, String destinationName) throws Exception {
088 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
089 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
090 return ((Queue)destination).moveMessageTo(context, messageId, toDestination);
091 }
092
093 public int moveMatchingMessagesTo(String selector, String destinationName) throws Exception {
094 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
095 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
096 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination);
097 }
098
099 public int moveMatchingMessagesTo(String selector, String destinationName, int maximumMessages) throws Exception {
100 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
101 ActiveMQDestination toDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
102 return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
103 }
104
105 /**
106 * Moves a message back to its original destination
107 */
108 public boolean retryMessage(String messageId) throws Exception {
109 Queue queue = (Queue) destination;
110 QueueMessageReference ref = queue.getMessage(messageId);
111 Message rc = ref.getMessage();
112 if (rc != null) {
113 ActiveMQDestination originalDestination = rc.getOriginalDestination();
114 if (originalDestination != null) {
115 ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
116 return queue.moveMessageTo(context, ref, originalDestination);
117 }
118 else {
119 throw new JMSException("No original destination for message: "+ messageId);
120 }
121 }
122 else {
123 throw new JMSException("Could not find message: "+ messageId);
124 }
125 }
126
127 public int cursorSize() {
128 Queue queue = (Queue) destination;
129 if (queue.getMessages() != null){
130 return queue.getMessages().size();
131 }
132 return 0;
133 }
134
135
136 public boolean doesCursorHaveMessagesBuffered() {
137 Queue queue = (Queue) destination;
138 if (queue.getMessages() != null){
139 return queue.getMessages().hasMessagesBufferedToDeliver();
140 }
141 return false;
142
143 }
144
145
146 public boolean doesCursorHaveSpace() {
147 Queue queue = (Queue) destination;
148 if (queue.getMessages() != null){
149 return queue.getMessages().hasSpace();
150 }
151 return false;
152 }
153
154
155 public long getCursorMemoryUsage() {
156 Queue queue = (Queue) destination;
157 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){
158 return queue.getMessages().getSystemUsage().getMemoryUsage().getUsage();
159 }
160 return 0;
161 }
162
163 public int getCursorPercentUsage() {
164 Queue queue = (Queue) destination;
165 if (queue.getMessages() != null && queue.getMessages().getSystemUsage() != null){
166 return queue.getMessages().getSystemUsage().getMemoryUsage().getPercentUsage();
167 }
168 return 0;
169 }
170
171 public boolean isCursorFull() {
172 Queue queue = (Queue) destination;
173 if (queue.getMessages() != null){
174 return queue.getMessages().isFull();
175 }
176 return false;
177 }
178
179 public boolean isCacheEnabled() {
180 Queue queue = (Queue) destination;
181 if (queue.getMessages() != null){
182 return queue.getMessages().isCacheEnabled();
183 }
184 return false;
185 }
186 }