001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.Set;
021    
022    import javax.jms.InvalidSelectorException;
023    import javax.management.ObjectName;
024    
025    import org.apache.activemq.broker.BrokerService;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.region.Subscription;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQQueue;
030    import org.apache.activemq.command.ActiveMQTopic;
031    import org.apache.activemq.command.ConsumerInfo;
032    import org.apache.activemq.filter.DestinationFilter;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.util.JMXSupport;
035    
036    /**
037     *
038     */
039    public class SubscriptionView implements SubscriptionViewMBean {
040    
041        protected final Subscription subscription;
042        protected final String clientId;
043        protected final String userName;
044    
045        /**
046         * Constructor
047         *
048         * @param subs
049         */
050        public SubscriptionView(String clientId, String userName, Subscription subs) {
051            this.clientId = clientId;
052            this.subscription = subs;
053            this.userName = userName;
054        }
055    
056        /**
057         * @return the clientId
058         */
059        public String getClientId() {
060            return clientId;
061        }
062    
063        /**
064         * @returns the ObjectName of the Connection that created this subscription
065         */
066        public ObjectName getConnection() {
067            ObjectName result = null;
068    
069            if (clientId != null && subscription != null) {
070                ConnectionContext ctx = subscription.getContext();
071                if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
072                    BrokerService service = ctx.getBroker().getBrokerService();
073                    ManagementContext managementCtx = service.getManagementContext();
074                    if (managementCtx != null) {
075    
076                        try {
077                            ObjectName query = createConnectionQueury(managementCtx, service.getBrokerName());
078                            Set<ObjectName> names = managementCtx.queryNames(query, null);
079                            if (names.size() == 1) {
080                                result = names.iterator().next();
081                            }
082                        } catch (Exception e) {
083                        }
084                    }
085                }
086            }
087            return result;
088        }
089    
090        private ObjectName createConnectionQueury(ManagementContext ctx, String brokerName) throws IOException {
091            try {
092                return new ObjectName(ctx.getJmxDomainName() + ":" + "BrokerName="
093                                      + JMXSupport.encodeObjectNamePart(brokerName) + ","
094                                      + "Type=Connection," + "ConnectorName=*,"
095                                      + "Connection=" + JMXSupport.encodeObjectNamePart(clientId));
096            } catch (Throwable e) {
097                throw IOExceptionSupport.create(e);
098            }
099        }
100    
101        /**
102         * @return the id of the Connection the Subscription is on
103         */
104        public String getConnectionId() {
105            ConsumerInfo info = getConsumerInfo();
106            if (info != null) {
107                return info.getConsumerId().getConnectionId();
108            }
109            return "NOTSET";
110        }
111    
112        /**
113         * @return the id of the Session the subscription is on
114         */
115        public long getSessionId() {
116            ConsumerInfo info = getConsumerInfo();
117            if (info != null) {
118                return info.getConsumerId().getSessionId();
119            }
120            return 0;
121        }
122    
123        /**
124         * @return the id of the Subscription
125         */
126        public long getSubcriptionId() {
127            ConsumerInfo info = getConsumerInfo();
128            if (info != null) {
129                return info.getConsumerId().getValue();
130            }
131            return 0;
132        }
133    
134        /**
135         * @return the destination name
136         */
137        public String getDestinationName() {
138            ConsumerInfo info = getConsumerInfo();
139            if (info != null) {
140                ActiveMQDestination dest = info.getDestination();
141                return dest.getPhysicalName();
142            }
143            return "NOTSET";
144        }
145    
146        public String getSelector() {
147            if (subscription != null) {
148                return subscription.getSelector();
149            }
150            return null;
151        }
152    
153        public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
154            if (subscription != null) {
155                subscription.setSelector(selector);
156            } else {
157                throw new UnsupportedOperationException("No subscription object");
158            }
159        }
160    
161        /**
162         * @return true if the destination is a Queue
163         */
164        public boolean isDestinationQueue() {
165            ConsumerInfo info = getConsumerInfo();
166            if (info != null) {
167                ActiveMQDestination dest = info.getDestination();
168                return dest.isQueue();
169            }
170            return false;
171        }
172    
173        /**
174         * @return true of the destination is a Topic
175         */
176        public boolean isDestinationTopic() {
177            ConsumerInfo info = getConsumerInfo();
178            if (info != null) {
179                ActiveMQDestination dest = info.getDestination();
180                return dest.isTopic();
181            }
182            return false;
183        }
184    
185        /**
186         * @return true if the destination is temporary
187         */
188        public boolean isDestinationTemporary() {
189            ConsumerInfo info = getConsumerInfo();
190            if (info != null) {
191                ActiveMQDestination dest = info.getDestination();
192                return dest.isTemporary();
193            }
194            return false;
195        }
196    
197        /**
198         * @return true if the subscriber is active
199         */
200        public boolean isActive() {
201            return true;
202        }
203    
204        /**
205         * The subscription should release as may references as it can to help the
206         * garbage collector reclaim memory.
207         */
208        public void gc() {
209            if (subscription != null) {
210                subscription.gc();
211            }
212        }
213    
214        /**
215         * @return whether or not the subscriber is retroactive or not
216         */
217        public boolean isRetroactive() {
218            ConsumerInfo info = getConsumerInfo();
219            return info != null ? info.isRetroactive() : false;
220        }
221    
222        /**
223         * @return whether or not the subscriber is an exclusive consumer
224         */
225        public boolean isExclusive() {
226            ConsumerInfo info = getConsumerInfo();
227            return info != null ? info.isExclusive() : false;
228        }
229    
230        /**
231         * @return whether or not the subscriber is durable (persistent)
232         */
233        public boolean isDurable() {
234            ConsumerInfo info = getConsumerInfo();
235            return info != null ? info.isDurable() : false;
236        }
237    
238        /**
239         * @return whether or not the subscriber ignores local messages
240         */
241        public boolean isNoLocal() {
242            ConsumerInfo info = getConsumerInfo();
243            return info != null ? info.isNoLocal() : false;
244        }
245    
246        /**
247         * @return the maximum number of pending messages allowed in addition to the
248         *         prefetch size. If enabled to a non-zero value then this will
249         *         perform eviction of messages for slow consumers on non-durable
250         *         topics.
251         */
252        public int getMaximumPendingMessageLimit() {
253            ConsumerInfo info = getConsumerInfo();
254            return info != null ? info.getMaximumPendingMessageLimit() : 0;
255        }
256    
257        /**
258         * @return the consumer priority
259         */
260        public byte getPriority() {
261            ConsumerInfo info = getConsumerInfo();
262            return info != null ? info.getPriority() : 0;
263        }
264    
265        /**
266         * @return the name of the consumer which is only used for durable
267         *         consumers.
268         */
269        public String getSubcriptionName() {
270            ConsumerInfo info = getConsumerInfo();
271            return info != null ? info.getSubscriptionName() : null;
272        }
273    
274        /**
275         * @return number of messages pending delivery
276         */
277        public int getPendingQueueSize() {
278            return subscription != null ? subscription.getPendingQueueSize() : 0;
279        }
280    
281        /**
282         * @return number of messages dispatched
283         */
284        public int getDispatchedQueueSize() {
285            return subscription != null ? subscription.getDispatchedQueueSize() : 0;
286        }
287    
288        public int getMessageCountAwaitingAcknowledge() {
289            return getDispatchedQueueSize();
290        }
291    
292        /**
293         * @return number of messages that matched the subscription
294         */
295        public long getDispatchedCounter() {
296            return subscription != null ? subscription.getDispatchedCounter() : 0;
297        }
298    
299        /**
300         * @return number of messages that matched the subscription
301         */
302        public long getEnqueueCounter() {
303            return subscription != null ? subscription.getEnqueueCounter() : 0;
304        }
305    
306        /**
307         * @return number of messages queued by the client
308         */
309        public long getDequeueCounter() {
310            return subscription != null ? subscription.getDequeueCounter() : 0;
311        }
312    
313        protected ConsumerInfo getConsumerInfo() {
314            return subscription != null ? subscription.getConsumerInfo() : null;
315        }
316    
317        /**
318         * @return pretty print
319         */
320        public String toString() {
321            return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
322        }
323    
324        /**
325         */
326        public int getPrefetchSize() {
327            return subscription != null ? subscription.getPrefetchSize() : 0;
328        }
329    
330        public boolean isMatchingQueue(String queueName) {
331            if (isDestinationQueue()) {
332                return matchesDestination(new ActiveMQQueue(queueName));
333            }
334            return false;
335        }
336    
337        public boolean isMatchingTopic(String topicName) {
338            if (isDestinationTopic()) {
339                return matchesDestination(new ActiveMQTopic(topicName));
340            }
341            return false;
342        }
343    
344        /**
345         * Return true if this subscription matches the given destination
346         *
347         * @param destination the destination to compare against
348         * @return true if this subscription matches the given destination
349         */
350        public boolean matchesDestination(ActiveMQDestination destination) {
351            ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination();
352            DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination);
353            return filter.matches(destination);
354        }
355    
356        @Override
357        public boolean isSlowConsumer() {
358            return subscription.isSlowConsumer();
359        }
360    
361        @Override
362        public String getUserName() {
363            return userName;
364        }
365    }