001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.Set;
021    
022    import javax.jms.InvalidSelectorException;
023    import javax.management.ObjectName;
024    
025    import org.apache.activemq.broker.BrokerService;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.region.Subscription;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQQueue;
030    import org.apache.activemq.command.ActiveMQTopic;
031    import org.apache.activemq.command.ConsumerInfo;
032    import org.apache.activemq.filter.DestinationFilter;
033    import org.apache.activemq.util.IOExceptionSupport;
034    
035    /**
036     *
037     */
038    public class SubscriptionView implements SubscriptionViewMBean {
039    
040        protected final Subscription subscription;
041        protected final String clientId;
042        protected final String userName;
043    
044        /**
045         * Constructor
046         *
047         * @param subs
048         */
049        public SubscriptionView(String clientId, String userName, Subscription subs) {
050            this.clientId = clientId;
051            this.subscription = subs;
052            this.userName = userName;
053        }
054    
055        /**
056         * @return the clientId
057         */
058        @Override
059        public String getClientId() {
060            return clientId;
061        }
062    
063        /**
064         * @returns the ObjectName of the Connection that created this subscription
065         */
066        @Override
067        public ObjectName getConnection() {
068            ObjectName result = null;
069    
070            if (clientId != null && subscription != null) {
071                ConnectionContext ctx = subscription.getContext();
072                if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
073                    BrokerService service = ctx.getBroker().getBrokerService();
074                    ManagementContext managementCtx = service.getManagementContext();
075                    if (managementCtx != null) {
076    
077                        try {
078                            ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName());
079                            Set<ObjectName> names = managementCtx.queryNames(query, null);
080                            if (names.size() == 1) {
081                                result = names.iterator().next();
082                            }
083                        } catch (Exception e) {
084                        }
085                    }
086                }
087            }
088            return result;
089        }
090    
091        private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
092            try {
093                return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
094            } catch (Throwable e) {
095                throw IOExceptionSupport.create(e);
096            }
097        }
098    
099        /**
100         * @return the id of the Connection the Subscription is on
101         */
102        @Override
103        public String getConnectionId() {
104            ConsumerInfo info = getConsumerInfo();
105            if (info != null) {
106                return info.getConsumerId().getConnectionId();
107            }
108            return "NOTSET";
109        }
110    
111        /**
112         * @return the id of the Session the subscription is on
113         */
114        @Override
115        public long getSessionId() {
116            ConsumerInfo info = getConsumerInfo();
117            if (info != null) {
118                return info.getConsumerId().getSessionId();
119            }
120            return 0;
121        }
122    
123        /**
124         * @return the id of the Subscription
125         */
126        @Override
127        public long getSubcriptionId() {
128            ConsumerInfo info = getConsumerInfo();
129            if (info != null) {
130                return info.getConsumerId().getValue();
131            }
132            return 0;
133        }
134    
135        /**
136         * @return the destination name
137         */
138        @Override
139        public String getDestinationName() {
140            ConsumerInfo info = getConsumerInfo();
141            if (info != null) {
142                ActiveMQDestination dest = info.getDestination();
143                return dest.getPhysicalName();
144            }
145            return "NOTSET";
146        }
147    
148        @Override
149        public String getSelector() {
150            if (subscription != null) {
151                return subscription.getSelector();
152            }
153            return null;
154        }
155    
156        @Override
157        public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
158            if (subscription != null) {
159                subscription.setSelector(selector);
160            } else {
161                throw new UnsupportedOperationException("No subscription object");
162            }
163        }
164    
165        /**
166         * @return true if the destination is a Queue
167         */
168        @Override
169        public boolean isDestinationQueue() {
170            ConsumerInfo info = getConsumerInfo();
171            if (info != null) {
172                ActiveMQDestination dest = info.getDestination();
173                return dest.isQueue();
174            }
175            return false;
176        }
177    
178        /**
179         * @return true of the destination is a Topic
180         */
181        @Override
182        public boolean isDestinationTopic() {
183            ConsumerInfo info = getConsumerInfo();
184            if (info != null) {
185                ActiveMQDestination dest = info.getDestination();
186                return dest.isTopic();
187            }
188            return false;
189        }
190    
191        /**
192         * @return true if the destination is temporary
193         */
194        @Override
195        public boolean isDestinationTemporary() {
196            ConsumerInfo info = getConsumerInfo();
197            if (info != null) {
198                ActiveMQDestination dest = info.getDestination();
199                return dest.isTemporary();
200            }
201            return false;
202        }
203    
204        /**
205         * @return true if the subscriber is active
206         */
207        @Override
208        public boolean isActive() {
209            return true;
210        }
211    
212        @Override
213        public boolean isNetwork() {
214            ConsumerInfo info = getConsumerInfo();
215            if (info != null) {
216                return info.isNetworkSubscription();
217            }
218            return false;
219        }
220    
221        /**
222         * The subscription should release as may references as it can to help the
223         * garbage collector reclaim memory.
224         */
225        public void gc() {
226            if (subscription != null) {
227                subscription.gc();
228            }
229        }
230    
231        /**
232         * @return whether or not the subscriber is retroactive or not
233         */
234        @Override
235        public boolean isRetroactive() {
236            ConsumerInfo info = getConsumerInfo();
237            return info != null ? info.isRetroactive() : false;
238        }
239    
240        /**
241         * @return whether or not the subscriber is an exclusive consumer
242         */
243        @Override
244        public boolean isExclusive() {
245            ConsumerInfo info = getConsumerInfo();
246            return info != null ? info.isExclusive() : false;
247        }
248    
249        /**
250         * @return whether or not the subscriber is durable (persistent)
251         */
252        @Override
253        public boolean isDurable() {
254            ConsumerInfo info = getConsumerInfo();
255            return info != null ? info.isDurable() : false;
256        }
257    
258        /**
259         * @return whether or not the subscriber ignores local messages
260         */
261        @Override
262        public boolean isNoLocal() {
263            ConsumerInfo info = getConsumerInfo();
264            return info != null ? info.isNoLocal() : false;
265        }
266    
267        /**
268         * @return the maximum number of pending messages allowed in addition to the
269         *         prefetch size. If enabled to a non-zero value then this will
270         *         perform eviction of messages for slow consumers on non-durable
271         *         topics.
272         */
273        @Override
274        public int getMaximumPendingMessageLimit() {
275            ConsumerInfo info = getConsumerInfo();
276            return info != null ? info.getMaximumPendingMessageLimit() : 0;
277        }
278    
279        /**
280         * @return the consumer priority
281         */
282        @Override
283        public byte getPriority() {
284            ConsumerInfo info = getConsumerInfo();
285            return info != null ? info.getPriority() : 0;
286        }
287    
288        /**
289         * @return the name of the consumer which is only used for durable
290         *         consumers.
291         */
292        @Override
293        public String getSubcriptionName() {
294            ConsumerInfo info = getConsumerInfo();
295            return info != null ? info.getSubscriptionName() : null;
296        }
297    
298        /**
299         * @return number of messages pending delivery
300         */
301        @Override
302        public int getPendingQueueSize() {
303            return subscription != null ? subscription.getPendingQueueSize() : 0;
304        }
305    
306        /**
307         * @return number of messages dispatched
308         */
309        @Override
310        public int getDispatchedQueueSize() {
311            return subscription != null ? subscription.getDispatchedQueueSize() : 0;
312        }
313    
314        @Override
315        public int getMessageCountAwaitingAcknowledge() {
316            return getDispatchedQueueSize();
317        }
318    
319        /**
320         * @return number of messages that matched the subscription
321         */
322        @Override
323        public long getDispatchedCounter() {
324            return subscription != null ? subscription.getDispatchedCounter() : 0;
325        }
326    
327        /**
328         * @return number of messages that matched the subscription
329         */
330        @Override
331        public long getEnqueueCounter() {
332            return subscription != null ? subscription.getEnqueueCounter() : 0;
333        }
334    
335        /**
336         * @return number of messages queued by the client
337         */
338        @Override
339        public long getDequeueCounter() {
340            return subscription != null ? subscription.getDequeueCounter() : 0;
341        }
342    
343        protected ConsumerInfo getConsumerInfo() {
344            return subscription != null ? subscription.getConsumerInfo() : null;
345        }
346    
347        /**
348         * @return pretty print
349         */
350        @Override
351        public String toString() {
352            return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
353        }
354    
355        /**
356         */
357        @Override
358        public int getPrefetchSize() {
359            return subscription != null ? subscription.getPrefetchSize() : 0;
360        }
361    
362        @Override
363        public boolean isMatchingQueue(String queueName) {
364            if (isDestinationQueue()) {
365                return matchesDestination(new ActiveMQQueue(queueName));
366            }
367            return false;
368        }
369    
370        @Override
371        public boolean isMatchingTopic(String topicName) {
372            if (isDestinationTopic()) {
373                return matchesDestination(new ActiveMQTopic(topicName));
374            }
375            return false;
376        }
377    
378        /**
379         * Return true if this subscription matches the given destination
380         *
381         * @param destination the destination to compare against
382         * @return true if this subscription matches the given destination
383         */
384        public boolean matchesDestination(ActiveMQDestination destination) {
385            ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination();
386            DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination);
387            return filter.matches(destination);
388        }
389    
390        @Override
391        public boolean isSlowConsumer() {
392            return subscription.isSlowConsumer();
393        }
394    
395        @Override
396        public String getUserName() {
397            return userName;
398        }
399    }