001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.net.URISyntaxException;
021    import java.util.ArrayList;
022    import java.util.Collections;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.Map;
027    
028    import javax.jms.Connection;
029    import javax.jms.InvalidSelectorException;
030    import javax.jms.MessageProducer;
031    import javax.jms.Session;
032    import javax.management.MalformedObjectNameException;
033    import javax.management.ObjectName;
034    import javax.management.openmbean.CompositeData;
035    import javax.management.openmbean.CompositeDataSupport;
036    import javax.management.openmbean.CompositeType;
037    import javax.management.openmbean.OpenDataException;
038    import javax.management.openmbean.TabularData;
039    import javax.management.openmbean.TabularDataSupport;
040    import javax.management.openmbean.TabularType;
041    import org.apache.activemq.ActiveMQConnectionFactory;
042    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
043    import org.apache.activemq.broker.region.Destination;
044    import org.apache.activemq.broker.region.Subscription;
045    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
046    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
047    import org.apache.activemq.command.ActiveMQDestination;
048    import org.apache.activemq.command.ActiveMQMessage;
049    import org.apache.activemq.command.ActiveMQTextMessage;
050    import org.apache.activemq.command.Message;
051    import org.apache.activemq.filter.BooleanExpression;
052    import org.apache.activemq.filter.MessageEvaluationContext;
053    import org.apache.activemq.selector.SelectorParser;
054    import org.apache.activemq.util.URISupport;
055    import org.slf4j.Logger;
056    import org.slf4j.LoggerFactory;
057    
058    public class DestinationView implements DestinationViewMBean {
059        private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
060        protected final Destination destination;
061        protected final ManagedRegionBroker broker;
062    
063        public DestinationView(ManagedRegionBroker broker, Destination destination) {
064            this.broker = broker;
065            this.destination = destination;
066        }
067    
068        public void gc() {
069            destination.gc();
070        }
071    
072        @Override
073        public String getName() {
074            return destination.getName();
075        }
076    
077        @Override
078        public void resetStatistics() {
079            destination.getDestinationStatistics().reset();
080        }
081    
082        @Override
083        public long getEnqueueCount() {
084            return destination.getDestinationStatistics().getEnqueues().getCount();
085        }
086    
087        @Override
088        public long getDequeueCount() {
089            return destination.getDestinationStatistics().getDequeues().getCount();
090        }
091    
092        @Override
093        public long getDispatchCount() {
094            return destination.getDestinationStatistics().getDispatched().getCount();
095        }
096    
097        @Override
098        public long getInFlightCount() {
099            return destination.getDestinationStatistics().getInflight().getCount();
100        }
101    
102        @Override
103        public long getExpiredCount() {
104            return destination.getDestinationStatistics().getExpired().getCount();
105        }
106    
107        @Override
108        public long getConsumerCount() {
109            return destination.getDestinationStatistics().getConsumers().getCount();
110        }
111    
112        @Override
113        public long getQueueSize() {
114            return destination.getDestinationStatistics().getMessages().getCount();
115        }
116    
117        public long getMessagesCached() {
118            return destination.getDestinationStatistics().getMessagesCached().getCount();
119        }
120    
121        @Override
122        public int getMemoryPercentUsage() {
123            return destination.getMemoryUsage().getPercentUsage();
124        }
125    
126        @Override
127        public long getMemoryUsageByteCount() {
128            return destination.getMemoryUsage().getUsage();
129        }
130    
131        @Override
132        public long getMemoryLimit() {
133            return destination.getMemoryUsage().getLimit();
134        }
135    
136        @Override
137        public void setMemoryLimit(long limit) {
138            destination.getMemoryUsage().setLimit(limit);
139        }
140    
141        @Override
142        public double getAverageEnqueueTime() {
143            return destination.getDestinationStatistics().getProcessTime().getAverageTime();
144        }
145    
146        @Override
147        public long getMaxEnqueueTime() {
148            return destination.getDestinationStatistics().getProcessTime().getMaxTime();
149        }
150    
151        @Override
152        public long getMinEnqueueTime() {
153            return destination.getDestinationStatistics().getProcessTime().getMinTime();
154        }
155    
156        /**
157         * @return the average size of a message (bytes)
158         */
159        public double getAverageMessageSize() {
160            return destination.getDestinationStatistics().getMessageSize().getAverageSize();
161        }
162    
163        /**
164         * @return the max size of a message (bytes)
165         */
166        public long getMaxMessageSize() {
167            return destination.getDestinationStatistics().getMessageSize().getMaxSize();
168        }
169    
170        /**
171         * @return the min size of a message (bytes)
172         */
173        public long getMinMessageSize() {
174            return destination.getDestinationStatistics().getMessageSize().getMinSize();
175        }
176    
177    
178        @Override
179        public boolean isPrioritizedMessages() {
180            return destination.isPrioritizedMessages();
181        }
182    
183        @Override
184        public CompositeData[] browse() throws OpenDataException {
185            try {
186                return browse(null);
187            } catch (InvalidSelectorException e) {
188                // should not happen.
189                throw new RuntimeException(e);
190            }
191        }
192    
193        @Override
194        public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
195            Message[] messages = destination.browse();
196            ArrayList<CompositeData> c = new ArrayList<CompositeData>();
197    
198            MessageEvaluationContext ctx = new MessageEvaluationContext();
199            ctx.setDestination(destination.getActiveMQDestination());
200            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
201    
202            for (int i = 0; i < messages.length; i++) {
203                try {
204    
205                    if (selectorExpression == null) {
206                        c.add(OpenTypeSupport.convert(messages[i]));
207                    } else {
208                        ctx.setMessageReference(messages[i]);
209                        if (selectorExpression.matches(ctx)) {
210                            c.add(OpenTypeSupport.convert(messages[i]));
211                        }
212                    }
213    
214                } catch (Throwable e) {
215                    // TODO DELETE ME
216                    System.out.println(e);
217                    e.printStackTrace();
218                    // TODO DELETE ME
219                    LOG.warn("exception browsing destination", e);
220                }
221            }
222    
223            CompositeData rc[] = new CompositeData[c.size()];
224            c.toArray(rc);
225            return rc;
226        }
227    
228        /**
229         * Browses the current destination returning a list of messages
230         */
231        @Override
232        public List<Object> browseMessages() throws InvalidSelectorException {
233            return browseMessages(null);
234        }
235    
236        /**
237         * Browses the current destination with the given selector returning a list
238         * of messages
239         */
240        @Override
241        public List<Object> browseMessages(String selector) throws InvalidSelectorException {
242            Message[] messages = destination.browse();
243            ArrayList<Object> answer = new ArrayList<Object>();
244    
245            MessageEvaluationContext ctx = new MessageEvaluationContext();
246            ctx.setDestination(destination.getActiveMQDestination());
247            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
248    
249            for (int i = 0; i < messages.length; i++) {
250                try {
251                    Message message = messages[i];
252                    message.setReadOnlyBody(true);
253                    if (selectorExpression == null) {
254                        answer.add(message);
255                    } else {
256                        ctx.setMessageReference(message);
257                        if (selectorExpression.matches(ctx)) {
258                            answer.add(message);
259                        }
260                    }
261    
262                } catch (Throwable e) {
263                    LOG.warn("exception browsing destination", e);
264                }
265            }
266            return answer;
267        }
268    
269        @Override
270        public TabularData browseAsTable() throws OpenDataException {
271            try {
272                return browseAsTable(null);
273            } catch (InvalidSelectorException e) {
274                throw new RuntimeException(e);
275            }
276        }
277    
278        @Override
279        public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
280            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
281            Message[] messages = destination.browse();
282            CompositeType ct = factory.getCompositeType();
283            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
284            TabularDataSupport rc = new TabularDataSupport(tt);
285    
286            MessageEvaluationContext ctx = new MessageEvaluationContext();
287            ctx.setDestination(destination.getActiveMQDestination());
288            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
289    
290            for (int i = 0; i < messages.length; i++) {
291                try {
292                    if (selectorExpression == null) {
293                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
294                    } else {
295                        ctx.setMessageReference(messages[i]);
296                        if (selectorExpression.matches(ctx)) {
297                            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
298                        }
299                    }
300                } catch (Throwable e) {
301                    LOG.warn("exception browsing destination", e);
302                }
303            }
304    
305            return rc;
306        }
307    
308        @Override
309        public String sendTextMessageWithProperties(String properties) throws Exception {
310            String[] kvs = properties.split(",");
311            Map<String, String> props = new HashMap<String, String>();
312            for (String kv : kvs) {
313                String[] it = kv.split("=");
314                if (it.length == 2) {
315                    props.put(it[0],it[1]);
316                }
317            }
318            return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password"));
319        }
320    
321        @Override
322        public String sendTextMessage(String body) throws Exception {
323            return sendTextMessage(Collections.EMPTY_MAP, body);
324        }
325    
326        @Override
327        public String sendTextMessage(Map headers, String body) throws Exception {
328            return sendTextMessage(headers, body, null, null);
329        }
330    
331        @Override
332        public String sendTextMessage(String body, String user, String password) throws Exception {
333            return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
334        }
335    
336        @Override
337        public String sendTextMessage(Map<String, String> headers, String body, String userName, String password) throws Exception {
338    
339            String brokerUrl = "vm://" + broker.getBrokerName();
340            ActiveMQDestination dest = destination.getActiveMQDestination();
341    
342            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
343            Connection connection = null;
344            try {
345    
346                connection = cf.createConnection(userName, password);
347                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
348                MessageProducer producer = session.createProducer(dest);
349                ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
350    
351                for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
352                    Map.Entry entry = (Map.Entry) iter.next();
353                    msg.setObjectProperty((String) entry.getKey(), entry.getValue());
354                }
355    
356                producer.setDeliveryMode(msg.getJMSDeliveryMode());
357                producer.setPriority(msg.getPriority());
358                long ttl = 0;
359                if (msg.getExpiration() != 0) {
360                    ttl = msg.getExpiration() - System.currentTimeMillis();
361                } else {
362                    String timeToLive = headers.get("timeToLive");
363                    if (timeToLive != null) {
364                        ttl = Integer.valueOf(timeToLive);
365                    }
366                }
367                producer.setTimeToLive(ttl > 0 ? ttl : 0);
368                producer.send(msg);
369    
370                return msg.getJMSMessageID();
371    
372            } finally {
373                connection.close();
374            }
375    
376        }
377    
378        @Override
379        public int getMaxAuditDepth() {
380            return destination.getMaxAuditDepth();
381        }
382    
383        @Override
384        public int getMaxProducersToAudit() {
385            return destination.getMaxProducersToAudit();
386        }
387    
388        public boolean isEnableAudit() {
389            return destination.isEnableAudit();
390        }
391    
392        public void setEnableAudit(boolean enableAudit) {
393            destination.setEnableAudit(enableAudit);
394        }
395    
396        @Override
397        public void setMaxAuditDepth(int maxAuditDepth) {
398            destination.setMaxAuditDepth(maxAuditDepth);
399        }
400    
401        @Override
402        public void setMaxProducersToAudit(int maxProducersToAudit) {
403            destination.setMaxProducersToAudit(maxProducersToAudit);
404        }
405    
406        @Override
407        public float getMemoryUsagePortion() {
408            return destination.getMemoryUsage().getUsagePortion();
409        }
410    
411        @Override
412        public long getProducerCount() {
413            return destination.getDestinationStatistics().getProducers().getCount();
414        }
415    
416        @Override
417        public boolean isProducerFlowControl() {
418            return destination.isProducerFlowControl();
419        }
420    
421        @Override
422        public void setMemoryUsagePortion(float value) {
423            destination.getMemoryUsage().setUsagePortion(value);
424        }
425    
426        @Override
427        public void setProducerFlowControl(boolean producerFlowControl) {
428            destination.setProducerFlowControl(producerFlowControl);
429        }
430    
431        @Override
432        public boolean isAlwaysRetroactive() {
433            return destination.isAlwaysRetroactive();
434        }
435    
436        @Override
437        public void setAlwaysRetroactive(boolean alwaysRetroactive) {
438            destination.setAlwaysRetroactive(alwaysRetroactive);
439        }
440    
441        /**
442         * Set's the interval at which warnings about producers being blocked by
443         * resource usage will be triggered. Values of 0 or less will disable
444         * warnings
445         *
446         * @param blockedProducerWarningInterval the interval at which warning about
447         *            blocked producers will be triggered.
448         */
449        @Override
450        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
451            destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
452        }
453    
454        /**
455         *
456         * @return the interval at which warning about blocked producers will be
457         *         triggered.
458         */
459        @Override
460        public long getBlockedProducerWarningInterval() {
461            return destination.getBlockedProducerWarningInterval();
462        }
463    
464        @Override
465        public int getMaxPageSize() {
466            return destination.getMaxPageSize();
467        }
468    
469        @Override
470        public void setMaxPageSize(int pageSize) {
471            destination.setMaxPageSize(pageSize);
472        }
473    
474        @Override
475        public boolean isUseCache() {
476            return destination.isUseCache();
477        }
478    
479        @Override
480        public void setUseCache(boolean value) {
481            destination.setUseCache(value);
482        }
483    
484        @Override
485        public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
486            List<Subscription> subscriptions = destination.getConsumers();
487            ObjectName[] answer = new ObjectName[subscriptions.size()];
488            ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName();
489            int index = 0;
490            for (Subscription subscription : subscriptions) {
491                String connectionClientId = subscription.getContext().getClientId();
492                answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo());
493            }
494            return answer;
495        }
496    
497        @Override
498        public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
499            ObjectName result = null;
500            SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
501            if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
502                result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
503            }
504            return result;
505        }
506    
507        @Override
508        public String getOptions() {
509            Map<String, String> options = destination.getActiveMQDestination().getOptions();
510            String optionsString = "";
511            try {
512                if (options != null) {
513                    optionsString = URISupport.createQueryString(options);
514                }
515            } catch (URISyntaxException ignored) {}
516            return optionsString;
517        }
518    
519        @Override
520        public boolean isDLQ() {
521            return destination.isDLQ();
522        }
523    
524        @Override
525        public long getBlockedSends() {
526            return destination.getDestinationStatistics().getBlockedSends().getCount();
527        }
528    
529        @Override
530        public double getAverageBlockedTime() {
531            return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
532        }
533    
534        @Override
535        public long getTotalBlockedTime() {
536            return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
537        }
538    
539    }