001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import javax.jms.Connection;
026    import javax.jms.InvalidSelectorException;
027    import javax.jms.MessageProducer;
028    import javax.jms.Session;
029    import javax.management.MalformedObjectNameException;
030    import javax.management.ObjectName;
031    import javax.management.openmbean.CompositeData;
032    import javax.management.openmbean.CompositeDataSupport;
033    import javax.management.openmbean.CompositeType;
034    import javax.management.openmbean.OpenDataException;
035    import javax.management.openmbean.TabularData;
036    import javax.management.openmbean.TabularDataSupport;
037    import javax.management.openmbean.TabularType;
038    import org.apache.activemq.ActiveMQConnectionFactory;
039    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
040    import org.apache.activemq.broker.region.Destination;
041    import org.apache.activemq.broker.region.Subscription;
042    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
043    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
044    import org.apache.activemq.command.ActiveMQDestination;
045    import org.apache.activemq.command.ActiveMQMessage;
046    import org.apache.activemq.command.ActiveMQTextMessage;
047    import org.apache.activemq.command.Message;
048    import org.apache.activemq.filter.BooleanExpression;
049    import org.apache.activemq.filter.MessageEvaluationContext;
050    import org.apache.activemq.selector.SelectorParser;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    public class DestinationView implements DestinationViewMBean {
055        private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
056        protected final Destination destination;
057        protected final ManagedRegionBroker broker;
058    
059        public DestinationView(ManagedRegionBroker broker, Destination destination) {
060            this.broker = broker;
061            this.destination = destination;
062        }
063    
064        public void gc() {
065            destination.gc();
066        }
067    
068        public String getName() {
069            return destination.getName();
070        }
071    
072        public void resetStatistics() {
073            destination.getDestinationStatistics().reset();
074        }
075    
076        public long getEnqueueCount() {
077            return destination.getDestinationStatistics().getEnqueues().getCount();
078        }
079    
080        public long getDequeueCount() {
081            return destination.getDestinationStatistics().getDequeues().getCount();
082        }
083    
084        public long getDispatchCount() {
085            return destination.getDestinationStatistics().getDispatched().getCount();
086        }
087    
088        public long getInFlightCount() {
089            return destination.getDestinationStatistics().getInflight().getCount();
090        }
091    
092        public long getExpiredCount() {
093            return destination.getDestinationStatistics().getExpired().getCount();
094        }
095    
096        public long getConsumerCount() {
097            return destination.getDestinationStatistics().getConsumers().getCount();
098        }
099    
100        public long getQueueSize() {
101            return destination.getDestinationStatistics().getMessages().getCount();
102        }
103    
104        public long getMessagesCached() {
105            return destination.getDestinationStatistics().getMessagesCached().getCount();
106        }
107    
108        public int getMemoryPercentUsage() {
109            return destination.getMemoryUsage().getPercentUsage();
110        }
111    
112        public long getMemoryLimit() {
113            return destination.getMemoryUsage().getLimit();
114        }
115    
116        public void setMemoryLimit(long limit) {
117            destination.getMemoryUsage().setLimit(limit);
118        }
119    
120        public double getAverageEnqueueTime() {
121            return destination.getDestinationStatistics().getProcessTime().getAverageTime();
122        }
123    
124        public long getMaxEnqueueTime() {
125            return destination.getDestinationStatistics().getProcessTime().getMaxTime();
126        }
127    
128        public long getMinEnqueueTime() {
129            return destination.getDestinationStatistics().getProcessTime().getMinTime();
130        }
131    
132        public boolean isPrioritizedMessages() {
133            return destination.isPrioritizedMessages();
134        }
135    
136        public CompositeData[] browse() throws OpenDataException {
137            try {
138                return browse(null);
139            } catch (InvalidSelectorException e) {
140                // should not happen.
141                throw new RuntimeException(e);
142            }
143        }
144    
145        public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
146            Message[] messages = destination.browse();
147            ArrayList<CompositeData> c = new ArrayList<CompositeData>();
148    
149            MessageEvaluationContext ctx = new MessageEvaluationContext();
150            ctx.setDestination(destination.getActiveMQDestination());
151            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
152    
153            for (int i = 0; i < messages.length; i++) {
154                try {
155    
156                    if (selectorExpression == null) {
157                        c.add(OpenTypeSupport.convert(messages[i]));
158                    } else {
159                        ctx.setMessageReference(messages[i]);
160                        if (selectorExpression.matches(ctx)) {
161                            c.add(OpenTypeSupport.convert(messages[i]));
162                        }
163                    }
164    
165                } catch (Throwable e) {
166                    // TODO DELETE ME
167                    System.out.println(e);
168                    e.printStackTrace();
169                    // TODO DELETE ME
170                    LOG.warn("exception browsing destination", e);
171                }
172            }
173    
174            CompositeData rc[] = new CompositeData[c.size()];
175            c.toArray(rc);
176            return rc;
177        }
178    
179        /**
180         * Browses the current destination returning a list of messages
181         */
182        public List<Object> browseMessages() throws InvalidSelectorException {
183            return browseMessages(null);
184        }
185    
186        /**
187         * Browses the current destination with the given selector returning a list
188         * of messages
189         */
190        public List<Object> browseMessages(String selector) throws InvalidSelectorException {
191            Message[] messages = destination.browse();
192            ArrayList<Object> answer = new ArrayList<Object>();
193    
194            MessageEvaluationContext ctx = new MessageEvaluationContext();
195            ctx.setDestination(destination.getActiveMQDestination());
196            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
197    
198            for (int i = 0; i < messages.length; i++) {
199                try {
200                    Message message = messages[i];
201                    if (selectorExpression == null) {
202                        answer.add(message);
203                    } else {
204                        ctx.setMessageReference(message);
205                        if (selectorExpression.matches(ctx)) {
206                            answer.add(message);
207                        }
208                    }
209    
210                } catch (Throwable e) {
211                    LOG.warn("exception browsing destination", e);
212                }
213            }
214            return answer;
215        }
216    
217        public TabularData browseAsTable() throws OpenDataException {
218            try {
219                return browseAsTable(null);
220            } catch (InvalidSelectorException e) {
221                throw new RuntimeException(e);
222            }
223        }
224    
225        public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
226            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
227            Message[] messages = destination.browse();
228            CompositeType ct = factory.getCompositeType();
229            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
230            TabularDataSupport rc = new TabularDataSupport(tt);
231    
232            MessageEvaluationContext ctx = new MessageEvaluationContext();
233            ctx.setDestination(destination.getActiveMQDestination());
234            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
235    
236            for (int i = 0; i < messages.length; i++) {
237                try {
238                    if (selectorExpression == null) {
239                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
240                    } else {
241                        ctx.setMessageReference(messages[i]);
242                        if (selectorExpression.matches(ctx)) {
243                            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
244                        }
245                    }
246                } catch (Throwable e) {
247                    LOG.warn("exception browsing destination", e);
248                }
249            }
250    
251            return rc;
252        }
253    
254        public String sendTextMessage(String body) throws Exception {
255            return sendTextMessage(Collections.EMPTY_MAP, body);
256        }
257    
258        public String sendTextMessage(Map headers, String body) throws Exception {
259            return sendTextMessage(headers, body, null, null);
260        }
261    
262        public String sendTextMessage(String body, String user, String password) throws Exception {
263            return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
264        }
265    
266        public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
267    
268            String brokerUrl = "vm://" + broker.getBrokerName();
269            ActiveMQDestination dest = destination.getActiveMQDestination();
270    
271            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
272            Connection connection = null;
273            try {
274    
275                connection = cf.createConnection(userName, password);
276                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
277                MessageProducer producer = session.createProducer(dest);
278                ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
279    
280                for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
281                    Map.Entry entry = (Map.Entry) iter.next();
282                    msg.setObjectProperty((String) entry.getKey(), entry.getValue());
283                }
284    
285                producer.setDeliveryMode(msg.getJMSDeliveryMode());
286                producer.setPriority(msg.getPriority());
287                long ttl = msg.getExpiration() - System.currentTimeMillis();
288                producer.setTimeToLive(ttl > 0 ? ttl : 0);
289                producer.send(msg);
290    
291                return msg.getJMSMessageID();
292    
293            } finally {
294                connection.close();
295            }
296    
297        }
298    
299        public int getMaxAuditDepth() {
300            return destination.getMaxAuditDepth();
301        }
302    
303        public int getMaxProducersToAudit() {
304            return destination.getMaxProducersToAudit();
305        }
306    
307        public boolean isEnableAudit() {
308            return destination.isEnableAudit();
309        }
310    
311        public void setEnableAudit(boolean enableAudit) {
312            destination.setEnableAudit(enableAudit);
313        }
314    
315        public void setMaxAuditDepth(int maxAuditDepth) {
316            destination.setMaxAuditDepth(maxAuditDepth);
317        }
318    
319        public void setMaxProducersToAudit(int maxProducersToAudit) {
320            destination.setMaxProducersToAudit(maxProducersToAudit);
321        }
322    
323        public float getMemoryUsagePortion() {
324            return destination.getMemoryUsage().getUsagePortion();
325        }
326    
327        public long getProducerCount() {
328            return destination.getDestinationStatistics().getProducers().getCount();
329        }
330    
331        public boolean isProducerFlowControl() {
332            return destination.isProducerFlowControl();
333        }
334    
335        public void setMemoryUsagePortion(float value) {
336            destination.getMemoryUsage().setUsagePortion(value);
337        }
338    
339        public void setProducerFlowControl(boolean producerFlowControl) {
340            destination.setProducerFlowControl(producerFlowControl);
341        }
342    
343        public boolean isAlwaysRetroactive() {
344            return destination.isAlwaysRetroactive();
345        }
346    
347        public void setAlwaysRetroactive(boolean alwaysRetroactive) {
348            destination.setAlwaysRetroactive(alwaysRetroactive);
349        }
350    
351        /**
352         * Set's the interval at which warnings about producers being blocked by
353         * resource usage will be triggered. Values of 0 or less will disable
354         * warnings
355         *
356         * @param blockedProducerWarningInterval the interval at which warning about
357         *            blocked producers will be triggered.
358         */
359        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
360            destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
361        }
362    
363        /**
364         *
365         * @return the interval at which warning about blocked producers will be
366         *         triggered.
367         */
368        public long getBlockedProducerWarningInterval() {
369            return destination.getBlockedProducerWarningInterval();
370        }
371    
372        public int getMaxPageSize() {
373            return destination.getMaxPageSize();
374        }
375    
376        public void setMaxPageSize(int pageSize) {
377            destination.setMaxPageSize(pageSize);
378        }
379    
380        public boolean isUseCache() {
381            return destination.isUseCache();
382        }
383    
384        public void setUseCache(boolean value) {
385            destination.setUseCache(value);
386        }
387    
388        public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
389            List<Subscription> subscriptions = destination.getConsumers();
390            ObjectName[] answer = new ObjectName[subscriptions.size()];
391            ObjectName objectName = broker.getBrokerService().getBrokerObjectName();
392            int index = 0;
393            for (Subscription subscription : subscriptions) {
394                String connectionClientId = subscription.getContext().getClientId();
395                String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription.getConsumerInfo(), connectionClientId, objectName);
396                answer[index++] = new ObjectName(objectNameStr);
397            }
398            return answer;
399        }
400    
401        public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
402            ObjectName result = null;
403            SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
404            if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
405                result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
406            }
407            return result;
408        }
409    
410    }