001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import java.util.Set;
020    import javax.annotation.PostConstruct;
021    import org.apache.activemq.broker.BrokerPluginSupport;
022    import org.apache.activemq.broker.Connection;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.ConsumerBrokerExchange;
025    import org.apache.activemq.broker.ProducerBrokerExchange;
026    import org.apache.activemq.broker.region.Destination;
027    import org.apache.activemq.broker.region.MessageReference;
028    import org.apache.activemq.broker.region.Subscription;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.BrokerInfo;
031    import org.apache.activemq.command.ConnectionInfo;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.DestinationInfo;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageDispatch;
037    import org.apache.activemq.command.MessageDispatchNotification;
038    import org.apache.activemq.command.MessagePull;
039    import org.apache.activemq.command.ProducerInfo;
040    import org.apache.activemq.command.RemoveSubscriptionInfo;
041    import org.apache.activemq.command.Response;
042    import org.apache.activemq.command.SessionInfo;
043    import org.apache.activemq.command.TransactionId;
044    import org.apache.activemq.usage.Usage;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A simple Broker intercepter which allows you to enable/disable logging.
050     *
051     * @org.apache.xbean.XBean
052     */
053    
054    public class LoggingBrokerPlugin extends BrokerPluginSupport {
055    
056        private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
057    
058        private boolean logAll = false;
059        private boolean logMessageEvents = false;
060        private boolean logConnectionEvents = true;
061        private boolean logSessionEvents = true;
062        private boolean logTransactionEvents = false;
063        private boolean logConsumerEvents = false;
064        private boolean logProducerEvents = false;
065        private boolean logInternalEvents = false;
066    
067        /**
068         * @throws Exception
069         * @org.apache.xbean.InitMethod
070         */
071        @PostConstruct
072        public void afterPropertiesSet() throws Exception {
073            LOG.info("Created LoggingBrokerPlugin: " + this.toString());
074        }
075    
076        public boolean isLogAll() {
077            return logAll;
078        }
079    
080        /**
081         * Logger all Events that go through the Plugin
082         */
083        public void setLogAll(boolean logAll) {
084            this.logAll = logAll;
085        }
086    
087        public boolean isLogMessageEvents() {
088            return logMessageEvents;
089        }
090    
091        /**
092         * Logger Events that are related to message processing
093         */
094        public void setLogMessageEvents(boolean logMessageEvents) {
095            this.logMessageEvents = logMessageEvents;
096        }
097    
098        public boolean isLogConnectionEvents() {
099            return logConnectionEvents;
100        }
101    
102        /**
103         * Logger Events that are related to connections
104         */
105        public void setLogConnectionEvents(boolean logConnectionEvents) {
106            this.logConnectionEvents = logConnectionEvents;
107        }
108    
109        public boolean isLogSessionEvents() {
110            return logSessionEvents;
111        }
112    
113        /**
114         * Logger Events that are related to sessions
115         */
116        public void setLogSessionEvents(boolean logSessionEvents) {
117            this.logSessionEvents = logSessionEvents;
118        }
119    
120        public boolean isLogTransactionEvents() {
121            return logTransactionEvents;
122        }
123    
124        /**
125         * Logger Events that are related to transaction processing
126         */
127        public void setLogTransactionEvents(boolean logTransactionEvents) {
128            this.logTransactionEvents = logTransactionEvents;
129        }
130    
131        public boolean isLogConsumerEvents() {
132            return logConsumerEvents;
133        }
134    
135        /**
136         * Logger Events that are related to Consumers
137         */
138        public void setLogConsumerEvents(boolean logConsumerEvents) {
139            this.logConsumerEvents = logConsumerEvents;
140        }
141    
142        public boolean isLogProducerEvents() {
143            return logProducerEvents;
144        }
145    
146        /**
147         * Logger Events that are related to Producers
148         */
149        public void setLogProducerEvents(boolean logProducerEvents) {
150            this.logProducerEvents = logProducerEvents;
151        }
152    
153        public boolean isLogInternalEvents() {
154            return logInternalEvents;
155        }
156    
157        /**
158         * Logger Events that are normally internal to the broker
159         */
160        public void setLogInternalEvents(boolean logInternalEvents) {
161            this.logInternalEvents = logInternalEvents;
162        }
163    
164        @Override
165        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
166            if (isLogAll() || isLogConsumerEvents()) {
167                LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
168                        + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
169                if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
170                    LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
171                            + ", Last Message Id: " + ack.getLastMessageId());
172                }
173            }
174            super.acknowledge(consumerExchange, ack);
175        }
176    
177        @Override
178        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
179            if (isLogAll() || isLogConsumerEvents()) {
180                LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
181            }
182            return super.messagePull(context, pull);
183        }
184    
185        @Override
186        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
187            if (isLogAll() || isLogConnectionEvents()) {
188                LOG.info("Adding Connection : " + info);
189            }
190            super.addConnection(context, info);
191        }
192    
193        @Override
194        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
195            if (isLogAll() || isLogConsumerEvents()) {
196                LOG.info("Adding Consumer : " + info);
197            }
198            return super.addConsumer(context, info);
199        }
200    
201        @Override
202        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
203            if (isLogAll() || isLogProducerEvents()) {
204                LOG.info("Adding Producer :" + info);
205            }
206            super.addProducer(context, info);
207        }
208    
209        @Override
210        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
211            if (isLogAll() || isLogTransactionEvents()) {
212                LOG.info("Commiting transaction : " + xid.getTransactionKey());
213            }
214            super.commitTransaction(context, xid, onePhase);
215        }
216    
217        @Override
218        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
219            if (isLogAll() || isLogConsumerEvents()) {
220                LOG.info("Removing subscription : " + info);
221            }
222            super.removeSubscription(context, info);
223        }
224    
225        @Override
226        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
227    
228            TransactionId[] result = super.getPreparedTransactions(context);
229            if ((isLogAll() || isLogTransactionEvents()) && result != null) {
230                StringBuffer tids = new StringBuffer();
231                for (TransactionId tid : result) {
232                    if (tids.length() > 0) {
233                        tids.append(", ");
234                    }
235                    tids.append(tid.getTransactionKey());
236                }
237                LOG.info("Prepared transactions : " + tids);
238            }
239            return result;
240        }
241    
242        @Override
243        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
244            if (isLogAll() || isLogTransactionEvents()) {
245                LOG.info("Preparing transaction : " + xid.getTransactionKey());
246            }
247            return super.prepareTransaction(context, xid);
248        }
249    
250        @Override
251        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
252            if (isLogAll() || isLogConnectionEvents()) {
253                LOG.info("Removing Connection : " + info);
254            }
255            super.removeConnection(context, info, error);
256        }
257    
258        @Override
259        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
260            if (isLogAll() || isLogConsumerEvents()) {
261                LOG.info("Removing Consumer : " + info);
262            }
263            super.removeConsumer(context, info);
264        }
265    
266        @Override
267        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
268            if (isLogAll() || isLogProducerEvents()) {
269                LOG.info("Removing Producer : " + info);
270            }
271            super.removeProducer(context, info);
272        }
273    
274        @Override
275        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
276            if (isLogAll() || isLogTransactionEvents()) {
277                LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
278            }
279            super.rollbackTransaction(context, xid);
280        }
281    
282        @Override
283        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
284            if (isLogAll() || isLogProducerEvents()) {
285                LOG.info("Sending message : " + messageSend.copy());
286            }
287            super.send(producerExchange, messageSend);
288        }
289    
290        @Override
291        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
292            if (isLogAll() || isLogTransactionEvents()) {
293                LOG.info("Beginning transaction : " + xid.getTransactionKey());
294            }
295            super.beginTransaction(context, xid);
296        }
297    
298        @Override
299        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
300            if (isLogAll() || isLogTransactionEvents()) {
301                LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
302            }
303            super.forgetTransaction(context, transactionId);
304        }
305    
306        @Override
307        public Connection[] getClients() throws Exception {
308            Connection[] result = super.getClients();
309    
310            if (isLogAll() || isLogInternalEvents()) {
311                if (result == null) {
312                    LOG.info("Get Clients returned empty list.");
313                } else {
314                    StringBuffer cids = new StringBuffer();
315                    for (Connection c : result) {
316                        cids.append(cids.length() > 0 ? ", " : "");
317                        cids.append(c.getConnectionId());
318                    }
319                    LOG.info("Connected clients : " + cids);
320                }
321            }
322            return super.getClients();
323        }
324    
325        @Override
326        public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
327                ActiveMQDestination destination, boolean create) throws Exception {
328            if (isLogAll() || isLogInternalEvents()) {
329                LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
330                        + destination.getPhysicalName());
331            }
332            return super.addDestination(context, destination, create);
333        }
334    
335        @Override
336        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
337                throws Exception {
338            if (isLogAll() || isLogInternalEvents()) {
339                LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
340                        + destination.getPhysicalName());
341            }
342            super.removeDestination(context, destination, timeout);
343        }
344    
345        @Override
346        public ActiveMQDestination[] getDestinations() throws Exception {
347            ActiveMQDestination[] result = super.getDestinations();
348            if (isLogAll() || isLogInternalEvents()) {
349                if (result == null) {
350                    LOG.info("Get Destinations returned empty list.");
351                } else {
352                    StringBuffer destinations = new StringBuffer();
353                    for (ActiveMQDestination dest : result) {
354                        destinations.append(destinations.length() > 0 ? ", " : "");
355                        destinations.append(dest.getPhysicalName());
356                    }
357                    LOG.info("Get Destinations : " + destinations);
358                }
359            }
360            return result;
361        }
362    
363        @Override
364        public void start() throws Exception {
365            if (isLogAll() || isLogInternalEvents()) {
366                LOG.info("Starting " + getBrokerName());
367            }
368            super.start();
369        }
370    
371        @Override
372        public void stop() throws Exception {
373            if (isLogAll() || isLogInternalEvents()) {
374                LOG.info("Stopping " + getBrokerName());
375            }
376            super.stop();
377        }
378    
379        @Override
380        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
381            if (isLogAll() || isLogSessionEvents()) {
382                LOG.info("Adding Session : " + info);
383            }
384            super.addSession(context, info);
385        }
386    
387        @Override
388        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
389            if (isLogAll() || isLogSessionEvents()) {
390                LOG.info("Removing Session : " + info);
391            }
392            super.removeSession(context, info);
393        }
394    
395        @Override
396        public void addBroker(Connection connection, BrokerInfo info) {
397            if (isLogAll() || isLogInternalEvents()) {
398                LOG.info("Adding Broker " + info.getBrokerName());
399            }
400            super.addBroker(connection, info);
401        }
402    
403        @Override
404        public void removeBroker(Connection connection, BrokerInfo info) {
405            if (isLogAll() || isLogInternalEvents()) {
406                LOG.info("Removing Broker " + info.getBrokerName());
407            }
408            super.removeBroker(connection, info);
409        }
410    
411        @Override
412        public BrokerInfo[] getPeerBrokerInfos() {
413            BrokerInfo[] result = super.getPeerBrokerInfos();
414            if (isLogAll() || isLogInternalEvents()) {
415                if (result == null) {
416                    LOG.info("Get Peer Broker Infos returned empty list.");
417                } else {
418                    StringBuffer peers = new StringBuffer();
419                    for (BrokerInfo bi : result) {
420                        peers.append(peers.length() > 0 ? ", " : "");
421                        peers.append(bi.getBrokerName());
422                    }
423                    LOG.info("Get Peer Broker Infos : " + peers);
424                }
425            }
426            return result;
427        }
428    
429        @Override
430        public void preProcessDispatch(MessageDispatch messageDispatch) {
431            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
432                LOG.info("preProcessDispatch :" + messageDispatch);
433            }
434            super.preProcessDispatch(messageDispatch);
435        }
436    
437        @Override
438        public void postProcessDispatch(MessageDispatch messageDispatch) {
439            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
440                LOG.info("postProcessDispatch :" + messageDispatch);
441            }
442            super.postProcessDispatch(messageDispatch);
443        }
444    
445        @Override
446        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
447            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
448                LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
449            }
450            super.processDispatchNotification(messageDispatchNotification);
451        }
452    
453        @Override
454        public Set<ActiveMQDestination> getDurableDestinations() {
455            Set<ActiveMQDestination> result = super.getDurableDestinations();
456            if (isLogAll() || isLogInternalEvents()) {
457                if (result == null) {
458                    LOG.info("Get Durable Destinations returned empty list.");
459                } else {
460                    StringBuffer destinations = new StringBuffer();
461                    for (ActiveMQDestination dest : result) {
462                        destinations.append(destinations.length() > 0 ? ", " : "");
463                        destinations.append(dest.getPhysicalName());
464                    }
465                    LOG.info("Get Durable Destinations : " + destinations);
466                }
467            }
468            return result;
469        }
470    
471        @Override
472        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
473            if (isLogAll() || isLogInternalEvents()) {
474                LOG.info("Adding destination info : " + info);
475            }
476            super.addDestinationInfo(context, info);
477        }
478    
479        @Override
480        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
481            if (isLogAll() || isLogInternalEvents()) {
482                LOG.info("Removing destination info : " + info);
483            }
484            super.removeDestinationInfo(context, info);
485        }
486    
487        @Override
488        public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
489            if (isLogAll() || isLogInternalEvents()) {
490                String msg = "Unable to display message.";
491    
492                msg = message.getMessage().toString();
493    
494                LOG.info("Message has expired : " + msg);
495            }
496            super.messageExpired(context, message, subscription);
497        }
498    
499        @Override
500        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
501                                          Subscription subscription) {
502            if (isLogAll() || isLogInternalEvents()) {
503                String msg = "Unable to display message.";
504    
505                msg = messageReference.getMessage().toString();
506    
507                LOG.info("Sending to DLQ : " + msg);
508            }
509            super.sendToDeadLetterQueue(context, messageReference, subscription);
510        }
511    
512        @Override
513        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) {
514            if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
515                LOG.info("Fast Producer : " + producerInfo);
516            }
517            super.fastProducer(context, producerInfo, destination);
518        }
519    
520        @Override
521        public void isFull(ConnectionContext context, Destination destination, Usage usage) {
522            if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
523                LOG.info("Destination is full : " + destination.getName());
524            }
525            super.isFull(context, destination, usage);
526        }
527    
528        @Override
529        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
530            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
531                String msg = "Unable to display message.";
532    
533                msg = messageReference.getMessage().toString();
534    
535                LOG.info("Message consumed : " + msg);
536            }
537            super.messageConsumed(context, messageReference);
538        }
539    
540        @Override
541        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
542            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
543                String msg = "Unable to display message.";
544    
545                msg = messageReference.getMessage().toString();
546    
547                LOG.info("Message delivered : " + msg);
548            }
549            super.messageDelivered(context, messageReference);
550        }
551    
552        @Override
553        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
554            if (isLogAll() || isLogInternalEvents()) {
555                String msg = "Unable to display message.";
556    
557                msg = messageReference.getMessage().toString();
558    
559                LOG.info("Message discarded : " + msg);
560            }
561            super.messageDiscarded(context, sub, messageReference);
562        }
563    
564        @Override
565        public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
566            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
567                LOG.info("Detected slow consumer on " + destination.getName());
568                StringBuffer buf = new StringBuffer("Connection(");
569                buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
570                buf.append(") Session(");
571                buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
572                buf.append(")");
573                LOG.info(buf.toString());
574            }
575            super.slowConsumer(context, destination, subs);
576        }
577    
578        @Override
579        public void nowMasterBroker() {
580            if (isLogAll() || isLogInternalEvents()) {
581                LOG.info("Is now the master broker : " + getBrokerName());
582            }
583            super.nowMasterBroker();
584        }
585    
586        @Override
587        public String toString() {
588            StringBuffer buf = new StringBuffer();
589            buf.append("LoggingBrokerPlugin(");
590            buf.append("logAll=");
591            buf.append(isLogAll());
592            buf.append(", logConnectionEvents=");
593            buf.append(isLogConnectionEvents());
594            buf.append(", logSessionEvents=");
595            buf.append(isLogSessionEvents());
596            buf.append(", logConsumerEvents=");
597            buf.append(isLogConsumerEvents());
598            buf.append(", logProducerEvents=");
599            buf.append(isLogProducerEvents());
600            buf.append(", logMessageEvents=");
601            buf.append(isLogMessageEvents());
602            buf.append(", logTransactionEvents=");
603            buf.append(isLogTransactionEvents());
604            buf.append(", logInternalEvents=");
605            buf.append(isLogInternalEvents());
606            buf.append(")");
607            return buf.toString();
608        }
609    }