001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.net.URISyntaxException;
021    import java.util.ArrayList;
022    import java.util.Collections;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    
027    import javax.jms.Connection;
028    import javax.jms.InvalidSelectorException;
029    import javax.jms.MessageProducer;
030    import javax.jms.Session;
031    import javax.management.MalformedObjectNameException;
032    import javax.management.ObjectName;
033    import javax.management.openmbean.CompositeData;
034    import javax.management.openmbean.CompositeDataSupport;
035    import javax.management.openmbean.CompositeType;
036    import javax.management.openmbean.OpenDataException;
037    import javax.management.openmbean.TabularData;
038    import javax.management.openmbean.TabularDataSupport;
039    import javax.management.openmbean.TabularType;
040    
041    import org.apache.activemq.ActiveMQConnectionFactory;
042    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
043    import org.apache.activemq.broker.region.Destination;
044    import org.apache.activemq.broker.region.Subscription;
045    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
046    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
047    import org.apache.activemq.command.ActiveMQDestination;
048    import org.apache.activemq.command.ActiveMQMessage;
049    import org.apache.activemq.command.ActiveMQTextMessage;
050    import org.apache.activemq.command.Message;
051    import org.apache.activemq.filter.BooleanExpression;
052    import org.apache.activemq.filter.MessageEvaluationContext;
053    import org.apache.activemq.selector.SelectorParser;
054    import org.apache.activemq.util.URISupport;
055    import org.slf4j.Logger;
056    import org.slf4j.LoggerFactory;
057    
058    public class DestinationView implements DestinationViewMBean {
059        private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class);
060        protected final Destination destination;
061        protected final ManagedRegionBroker broker;
062    
063        public DestinationView(ManagedRegionBroker broker, Destination destination) {
064            this.broker = broker;
065            this.destination = destination;
066        }
067    
068        public void gc() {
069            destination.gc();
070        }
071    
072        @Override
073        public String getName() {
074            return destination.getName();
075        }
076    
077        @Override
078        public void resetStatistics() {
079            destination.getDestinationStatistics().reset();
080        }
081    
082        @Override
083        public long getEnqueueCount() {
084            return destination.getDestinationStatistics().getEnqueues().getCount();
085        }
086    
087        @Override
088        public long getDequeueCount() {
089            return destination.getDestinationStatistics().getDequeues().getCount();
090        }
091    
092        @Override
093        public long getDispatchCount() {
094            return destination.getDestinationStatistics().getDispatched().getCount();
095        }
096    
097        @Override
098        public long getInFlightCount() {
099            return destination.getDestinationStatistics().getInflight().getCount();
100        }
101    
102        @Override
103        public long getExpiredCount() {
104            return destination.getDestinationStatistics().getExpired().getCount();
105        }
106    
107        @Override
108        public long getConsumerCount() {
109            return destination.getDestinationStatistics().getConsumers().getCount();
110        }
111    
112        @Override
113        public long getQueueSize() {
114            return destination.getDestinationStatistics().getMessages().getCount();
115        }
116    
117        public long getMessagesCached() {
118            return destination.getDestinationStatistics().getMessagesCached().getCount();
119        }
120    
121        @Override
122        public int getMemoryPercentUsage() {
123            return destination.getMemoryUsage().getPercentUsage();
124        }
125    
126        @Override
127        public long getMemoryUsageByteCount() {
128            return destination.getMemoryUsage().getUsage();
129        }
130    
131        @Override
132        public long getMemoryLimit() {
133            return destination.getMemoryUsage().getLimit();
134        }
135    
136        @Override
137        public void setMemoryLimit(long limit) {
138            destination.getMemoryUsage().setLimit(limit);
139        }
140    
141        @Override
142        public double getAverageEnqueueTime() {
143            return destination.getDestinationStatistics().getProcessTime().getAverageTime();
144        }
145    
146        @Override
147        public long getMaxEnqueueTime() {
148            return destination.getDestinationStatistics().getProcessTime().getMaxTime();
149        }
150    
151        @Override
152        public long getMinEnqueueTime() {
153            return destination.getDestinationStatistics().getProcessTime().getMinTime();
154        }
155    
156        @Override
157        public boolean isPrioritizedMessages() {
158            return destination.isPrioritizedMessages();
159        }
160    
161        @Override
162        public CompositeData[] browse() throws OpenDataException {
163            try {
164                return browse(null);
165            } catch (InvalidSelectorException e) {
166                // should not happen.
167                throw new RuntimeException(e);
168            }
169        }
170    
171        @Override
172        public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
173            Message[] messages = destination.browse();
174            ArrayList<CompositeData> c = new ArrayList<CompositeData>();
175    
176            MessageEvaluationContext ctx = new MessageEvaluationContext();
177            ctx.setDestination(destination.getActiveMQDestination());
178            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
179    
180            for (int i = 0; i < messages.length; i++) {
181                try {
182    
183                    if (selectorExpression == null) {
184                        c.add(OpenTypeSupport.convert(messages[i]));
185                    } else {
186                        ctx.setMessageReference(messages[i]);
187                        if (selectorExpression.matches(ctx)) {
188                            c.add(OpenTypeSupport.convert(messages[i]));
189                        }
190                    }
191    
192                } catch (Throwable e) {
193                    // TODO DELETE ME
194                    System.out.println(e);
195                    e.printStackTrace();
196                    // TODO DELETE ME
197                    LOG.warn("exception browsing destination", e);
198                }
199            }
200    
201            CompositeData rc[] = new CompositeData[c.size()];
202            c.toArray(rc);
203            return rc;
204        }
205    
206        /**
207         * Browses the current destination returning a list of messages
208         */
209        @Override
210        public List<Object> browseMessages() throws InvalidSelectorException {
211            return browseMessages(null);
212        }
213    
214        /**
215         * Browses the current destination with the given selector returning a list
216         * of messages
217         */
218        @Override
219        public List<Object> browseMessages(String selector) throws InvalidSelectorException {
220            Message[] messages = destination.browse();
221            ArrayList<Object> answer = new ArrayList<Object>();
222    
223            MessageEvaluationContext ctx = new MessageEvaluationContext();
224            ctx.setDestination(destination.getActiveMQDestination());
225            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
226    
227            for (int i = 0; i < messages.length; i++) {
228                try {
229                    Message message = messages[i];
230                    message.setReadOnlyBody(true);
231                    if (selectorExpression == null) {
232                        answer.add(message);
233                    } else {
234                        ctx.setMessageReference(message);
235                        if (selectorExpression.matches(ctx)) {
236                            answer.add(message);
237                        }
238                    }
239    
240                } catch (Throwable e) {
241                    LOG.warn("exception browsing destination", e);
242                }
243            }
244            return answer;
245        }
246    
247        @Override
248        public TabularData browseAsTable() throws OpenDataException {
249            try {
250                return browseAsTable(null);
251            } catch (InvalidSelectorException e) {
252                throw new RuntimeException(e);
253            }
254        }
255    
256        @Override
257        public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
258            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
259            Message[] messages = destination.browse();
260            CompositeType ct = factory.getCompositeType();
261            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" });
262            TabularDataSupport rc = new TabularDataSupport(tt);
263    
264            MessageEvaluationContext ctx = new MessageEvaluationContext();
265            ctx.setDestination(destination.getActiveMQDestination());
266            BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector);
267    
268            for (int i = 0; i < messages.length; i++) {
269                try {
270                    if (selectorExpression == null) {
271                        rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
272                    } else {
273                        ctx.setMessageReference(messages[i]);
274                        if (selectorExpression.matches(ctx)) {
275                            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
276                        }
277                    }
278                } catch (Throwable e) {
279                    LOG.warn("exception browsing destination", e);
280                }
281            }
282    
283            return rc;
284        }
285    
286        @Override
287        public String sendTextMessage(String body) throws Exception {
288            return sendTextMessage(Collections.EMPTY_MAP, body);
289        }
290    
291        @Override
292        public String sendTextMessage(Map headers, String body) throws Exception {
293            return sendTextMessage(headers, body, null, null);
294        }
295    
296        @Override
297        public String sendTextMessage(String body, String user, String password) throws Exception {
298            return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
299        }
300    
301        @Override
302        public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
303    
304            String brokerUrl = "vm://" + broker.getBrokerName();
305            ActiveMQDestination dest = destination.getActiveMQDestination();
306    
307            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl);
308            Connection connection = null;
309            try {
310    
311                connection = cf.createConnection(userName, password);
312                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
313                MessageProducer producer = session.createProducer(dest);
314                ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body);
315    
316                for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
317                    Map.Entry entry = (Map.Entry) iter.next();
318                    msg.setObjectProperty((String) entry.getKey(), entry.getValue());
319                }
320    
321                producer.setDeliveryMode(msg.getJMSDeliveryMode());
322                producer.setPriority(msg.getPriority());
323                long ttl = msg.getExpiration() - System.currentTimeMillis();
324                producer.setTimeToLive(ttl > 0 ? ttl : 0);
325                producer.send(msg);
326    
327                return msg.getJMSMessageID();
328    
329            } finally {
330                connection.close();
331            }
332    
333        }
334    
335        @Override
336        public int getMaxAuditDepth() {
337            return destination.getMaxAuditDepth();
338        }
339    
340        @Override
341        public int getMaxProducersToAudit() {
342            return destination.getMaxProducersToAudit();
343        }
344    
345        public boolean isEnableAudit() {
346            return destination.isEnableAudit();
347        }
348    
349        public void setEnableAudit(boolean enableAudit) {
350            destination.setEnableAudit(enableAudit);
351        }
352    
353        @Override
354        public void setMaxAuditDepth(int maxAuditDepth) {
355            destination.setMaxAuditDepth(maxAuditDepth);
356        }
357    
358        @Override
359        public void setMaxProducersToAudit(int maxProducersToAudit) {
360            destination.setMaxProducersToAudit(maxProducersToAudit);
361        }
362    
363        @Override
364        public float getMemoryUsagePortion() {
365            return destination.getMemoryUsage().getUsagePortion();
366        }
367    
368        @Override
369        public long getProducerCount() {
370            return destination.getDestinationStatistics().getProducers().getCount();
371        }
372    
373        @Override
374        public boolean isProducerFlowControl() {
375            return destination.isProducerFlowControl();
376        }
377    
378        @Override
379        public void setMemoryUsagePortion(float value) {
380            destination.getMemoryUsage().setUsagePortion(value);
381        }
382    
383        @Override
384        public void setProducerFlowControl(boolean producerFlowControl) {
385            destination.setProducerFlowControl(producerFlowControl);
386        }
387    
388        @Override
389        public boolean isAlwaysRetroactive() {
390            return destination.isAlwaysRetroactive();
391        }
392    
393        @Override
394        public void setAlwaysRetroactive(boolean alwaysRetroactive) {
395            destination.setAlwaysRetroactive(alwaysRetroactive);
396        }
397    
398        /**
399         * Set's the interval at which warnings about producers being blocked by
400         * resource usage will be triggered. Values of 0 or less will disable
401         * warnings
402         *
403         * @param blockedProducerWarningInterval the interval at which warning about
404         *            blocked producers will be triggered.
405         */
406        @Override
407        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
408            destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
409        }
410    
411        /**
412         *
413         * @return the interval at which warning about blocked producers will be
414         *         triggered.
415         */
416        @Override
417        public long getBlockedProducerWarningInterval() {
418            return destination.getBlockedProducerWarningInterval();
419        }
420    
421        @Override
422        public int getMaxPageSize() {
423            return destination.getMaxPageSize();
424        }
425    
426        @Override
427        public void setMaxPageSize(int pageSize) {
428            destination.setMaxPageSize(pageSize);
429        }
430    
431        @Override
432        public boolean isUseCache() {
433            return destination.isUseCache();
434        }
435    
436        @Override
437        public void setUseCache(boolean value) {
438            destination.setUseCache(value);
439        }
440    
441        @Override
442        public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
443            List<Subscription> subscriptions = destination.getConsumers();
444            ObjectName[] answer = new ObjectName[subscriptions.size()];
445            ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName();
446            int index = 0;
447            for (Subscription subscription : subscriptions) {
448                String connectionClientId = subscription.getContext().getClientId();
449                answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo());
450            }
451            return answer;
452        }
453    
454        @Override
455        public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
456            ObjectName result = null;
457            SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
458            if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) {
459                result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy);
460            }
461            return result;
462        }
463    
464        @Override
465        public String getOptions() {
466            Map<String, String> options = destination.getActiveMQDestination().getOptions();
467            String optionsString = "";
468            try {
469                if (options != null) {
470                    optionsString = URISupport.createQueryString(options);
471                }
472            } catch (URISyntaxException ignored) {}
473            return optionsString;
474        }
475    
476    }