001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.util.Set;
020
021import javax.annotation.PostConstruct;
022
023import org.apache.activemq.broker.BrokerPluginSupport;
024import org.apache.activemq.broker.Connection;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ConsumerBrokerExchange;
027import org.apache.activemq.broker.ProducerBrokerExchange;
028import org.apache.activemq.broker.region.Destination;
029import org.apache.activemq.broker.region.MessageReference;
030import org.apache.activemq.broker.region.Subscription;
031import org.apache.activemq.command.*;
032import org.apache.activemq.usage.Usage;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A simple Broker intercepter which allows you to enable/disable logging.
038 *
039 * @org.apache.xbean.XBean
040 */
041public class LoggingBrokerPlugin extends BrokerPluginSupport {
042
043    private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
044
045    private boolean logAll = false;
046    private boolean logConnectionEvents = true;
047    private boolean logSessionEvents = true;
048    private boolean logTransactionEvents = false;
049    private boolean logConsumerEvents = false;
050    private boolean logProducerEvents = false;
051    private boolean logInternalEvents = false;
052    private boolean perDestinationLogger = false;
053
054    /**
055     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
056     *
057     * delegates to afterPropertiesSet, done to prevent backwards incompatible signature change
058     */
059    @PostConstruct
060    private void postConstruct() {
061        try {
062            afterPropertiesSet();
063        } catch (Exception ex) {
064            throw new RuntimeException(ex);
065        }
066    }
067
068    /**
069     * @throws Exception
070     * @org.apache.xbean.InitMethod
071     */
072    public void afterPropertiesSet() throws Exception {
073        LOG.info("Created LoggingBrokerPlugin: {}", this.toString());
074    }
075
076    public boolean isLogAll() {
077        return logAll;
078    }
079
080    /**
081     * Logger all Events that go through the Plugin
082     */
083    public void setLogAll(boolean logAll) {
084        this.logAll = logAll;
085    }
086
087
088    public boolean isLogConnectionEvents() {
089        return logConnectionEvents;
090    }
091
092    /**
093     * Logger Events that are related to connections
094     */
095    public void setLogConnectionEvents(boolean logConnectionEvents) {
096        this.logConnectionEvents = logConnectionEvents;
097    }
098
099    public boolean isLogSessionEvents() {
100        return logSessionEvents;
101    }
102
103    /**
104     * Logger Events that are related to sessions
105     */
106    public void setLogSessionEvents(boolean logSessionEvents) {
107        this.logSessionEvents = logSessionEvents;
108    }
109
110    public boolean isLogTransactionEvents() {
111        return logTransactionEvents;
112    }
113
114    /**
115     * Logger Events that are related to transaction processing
116     */
117    public void setLogTransactionEvents(boolean logTransactionEvents) {
118        this.logTransactionEvents = logTransactionEvents;
119    }
120
121    public boolean isLogConsumerEvents() {
122        return logConsumerEvents;
123    }
124
125    /**
126     * Logger Events that are related to Consumers
127     */
128    public void setLogConsumerEvents(boolean logConsumerEvents) {
129        this.logConsumerEvents = logConsumerEvents;
130    }
131
132    public boolean isLogProducerEvents() {
133        return logProducerEvents;
134    }
135
136    /**
137     * Logger Events that are related to Producers
138     */
139    public void setLogProducerEvents(boolean logProducerEvents) {
140        this.logProducerEvents = logProducerEvents;
141    }
142
143    public boolean isLogInternalEvents() {
144        return logInternalEvents;
145    }
146
147    /**
148     * Logger Events that are normally internal to the broker
149     */
150    public void setLogInternalEvents(boolean logInternalEvents) {
151        this.logInternalEvents = logInternalEvents;
152    }
153
154    @Override
155    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
156        if (isLogAll() || isLogConsumerEvents()) {
157            LOG.info("Acknowledging message for client ID: {}{}", consumerExchange.getConnectionContext().getClientId(), (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
158            if (ack.getMessageCount() > 1) {
159                LOG.trace("Message count: {}, First Message Id: {}, Last Message Id: {}", new Object[]{ ack.getMessageCount(), ack.getFirstMessageId(), ack.getLastMessageId() });
160            }
161        }
162        super.acknowledge(consumerExchange, ack);
163    }
164
165    @Override
166    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
167        if (isLogAll() || isLogConsumerEvents()) {
168            LOG.info("Message Pull from: {} on {}", context.getClientId(), pull.getDestination().getPhysicalName());
169        }
170        return super.messagePull(context, pull);
171    }
172
173    @Override
174    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
175        if (isLogAll() || isLogConnectionEvents()) {
176            LOG.info("Adding Connection: {}", info);
177        }
178        super.addConnection(context, info);
179    }
180
181    @Override
182    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
183        if (isLogAll() || isLogConsumerEvents()) {
184            LOG.info("Adding Consumer: {}", info);
185        }
186        return super.addConsumer(context, info);
187    }
188
189    @Override
190    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
191        if (isLogAll() || isLogProducerEvents()) {
192            LOG.info("Adding Producer: {}", info);
193        }
194        super.addProducer(context, info);
195    }
196
197    @Override
198    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
199        if (isLogAll() || isLogTransactionEvents()) {
200            LOG.info("Committing transaction: {}", xid.getTransactionKey());
201        }
202        super.commitTransaction(context, xid, onePhase);
203    }
204
205    @Override
206    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
207        if (isLogAll() || isLogConsumerEvents()) {
208            LOG.info("Removing subscription: {}", info);
209        }
210        super.removeSubscription(context, info);
211    }
212
213    @Override
214    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
215
216        TransactionId[] result = super.getPreparedTransactions(context);
217        if ((isLogAll() || isLogTransactionEvents()) && result != null) {
218            StringBuffer tids = new StringBuffer();
219            for (TransactionId tid : result) {
220                if (tids.length() > 0) {
221                    tids.append(", ");
222                }
223                tids.append(tid.getTransactionKey());
224            }
225            LOG.info("Prepared transactions: {}", tids);
226        }
227        return result;
228    }
229
230    @Override
231    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
232        if (isLogAll() || isLogTransactionEvents()) {
233            LOG.info("Preparing transaction: {}", xid.getTransactionKey());
234        }
235        return super.prepareTransaction(context, xid);
236    }
237
238    @Override
239    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
240        if (isLogAll() || isLogConnectionEvents()) {
241            LOG.info("Removing Connection: {}", info);
242        }
243        super.removeConnection(context, info, error);
244    }
245
246    @Override
247    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
248        if (isLogAll() || isLogConsumerEvents()) {
249            LOG.info("Removing Consumer: {}", info);
250        }
251        super.removeConsumer(context, info);
252    }
253
254    @Override
255    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
256        if (isLogAll() || isLogProducerEvents()) {
257            LOG.info("Removing Producer: {}", info);
258        }
259        super.removeProducer(context, info);
260    }
261
262    @Override
263    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
264        if (isLogAll() || isLogTransactionEvents()) {
265            LOG.info("Rolling back Transaction: {}", xid.getTransactionKey());
266        }
267        super.rollbackTransaction(context, xid);
268    }
269
270    @Override
271    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
272        if (isLogAll() || isLogProducerEvents()) {
273            logSend(messageSend.copy());
274        }
275        super.send(producerExchange, messageSend);
276    }
277
278    private void logSend(Message copy) {
279        Logger perDestinationsLogger = LOG;
280        if (isPerDestinationLogger()) {
281            ActiveMQDestination destination = copy.getDestination();
282            perDestinationsLogger = LoggerFactory.getLogger(LOG.getName() +
283                    "." + destination.getDestinationTypeAsString() + "." + destination.getPhysicalName());
284        }
285        perDestinationsLogger.info("Sending message: {}", copy);
286    }
287
288    @Override
289    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
290        if (isLogAll() || isLogTransactionEvents()) {
291            LOG.info("Beginning transaction: {}", xid.getTransactionKey());
292        }
293        super.beginTransaction(context, xid);
294    }
295
296    @Override
297    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
298        if (isLogAll() || isLogTransactionEvents()) {
299            LOG.info("Forgetting transaction: {}", transactionId.getTransactionKey());
300        }
301        super.forgetTransaction(context, transactionId);
302    }
303
304    @Override
305    public Connection[] getClients() throws Exception {
306        Connection[] result = super.getClients();
307
308        if (isLogAll() || isLogInternalEvents()) {
309            if (result == null) {
310                LOG.info("Get Clients returned empty list.");
311            } else {
312                StringBuffer cids = new StringBuffer();
313                for (Connection c : result) {
314                    cids.append(cids.length() > 0 ? ", " : "");
315                    cids.append(c.getConnectionId());
316                }
317                LOG.info("Connected clients: {}", cids);
318            }
319        }
320        return super.getClients();
321    }
322
323    @Override
324    public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
325            ActiveMQDestination destination, boolean create) throws Exception {
326        if (isLogAll() || isLogInternalEvents()) {
327            LOG.info("Adding destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName());
328        }
329        return super.addDestination(context, destination, create);
330    }
331
332    @Override
333    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
334            throws Exception {
335        if (isLogAll() || isLogInternalEvents()) {
336            LOG.info("Removing destination: {}:{}", destination.getDestinationTypeAsString(), destination.getPhysicalName());
337        }
338        super.removeDestination(context, destination, timeout);
339    }
340
341    @Override
342    public ActiveMQDestination[] getDestinations() throws Exception {
343        ActiveMQDestination[] result = super.getDestinations();
344        if (isLogAll() || isLogInternalEvents()) {
345            if (result == null) {
346                LOG.info("Get Destinations returned empty list.");
347            } else {
348                StringBuffer destinations = new StringBuffer();
349                for (ActiveMQDestination dest : result) {
350                    destinations.append(destinations.length() > 0 ? ", " : "");
351                    destinations.append(dest.getPhysicalName());
352                }
353                LOG.info("Get Destinations: {}", destinations);
354            }
355        }
356        return result;
357    }
358
359    @Override
360    public void start() throws Exception {
361        if (isLogAll() || isLogInternalEvents()) {
362            LOG.info("Starting {}", getBrokerName());
363        }
364        super.start();
365    }
366
367    @Override
368    public void stop() throws Exception {
369        if (isLogAll() || isLogInternalEvents()) {
370            LOG.info("Stopping {}", getBrokerName());
371        }
372        super.stop();
373    }
374
375    @Override
376    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
377        if (isLogAll() || isLogSessionEvents()) {
378            LOG.info("Adding Session: {}", info);
379        }
380        super.addSession(context, info);
381    }
382
383    @Override
384    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
385        if (isLogAll() || isLogSessionEvents()) {
386            LOG.info("Removing Session: {}", info);
387        }
388        super.removeSession(context, info);
389    }
390
391    @Override
392    public void addBroker(Connection connection, BrokerInfo info) {
393        if (isLogAll() || isLogInternalEvents()) {
394            LOG.info("Adding Broker {}", info.getBrokerName());
395        }
396        super.addBroker(connection, info);
397    }
398
399    @Override
400    public void removeBroker(Connection connection, BrokerInfo info) {
401        if (isLogAll() || isLogInternalEvents()) {
402            LOG.info("Removing Broker {}", info.getBrokerName());
403        }
404        super.removeBroker(connection, info);
405    }
406
407    @Override
408    public BrokerInfo[] getPeerBrokerInfos() {
409        BrokerInfo[] result = super.getPeerBrokerInfos();
410        if (isLogAll() || isLogInternalEvents()) {
411            if (result == null) {
412                LOG.info("Get Peer Broker Infos returned empty list.");
413            } else {
414                StringBuffer peers = new StringBuffer();
415                for (BrokerInfo bi : result) {
416                    peers.append(peers.length() > 0 ? ", " : "");
417                    peers.append(bi.getBrokerName());
418                }
419                LOG.info("Get Peer Broker Infos: {}", peers);
420            }
421        }
422        return result;
423    }
424
425    @Override
426    public void preProcessDispatch(MessageDispatch messageDispatch) {
427        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
428            LOG.info("preProcessDispatch: {}", messageDispatch);
429        }
430        super.preProcessDispatch(messageDispatch);
431    }
432
433    @Override
434    public void postProcessDispatch(MessageDispatch messageDispatch) {
435        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
436            LOG.info("postProcessDispatch: {}", messageDispatch);
437        }
438        super.postProcessDispatch(messageDispatch);
439    }
440
441    @Override
442    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
443        if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
444            LOG.info("ProcessDispatchNotification: {}", messageDispatchNotification);
445        }
446        super.processDispatchNotification(messageDispatchNotification);
447    }
448
449    @Override
450    public Set<ActiveMQDestination> getDurableDestinations() {
451        Set<ActiveMQDestination> result = super.getDurableDestinations();
452        if (isLogAll() || isLogInternalEvents()) {
453            if (result == null) {
454                LOG.info("Get Durable Destinations returned empty list.");
455            } else {
456                StringBuffer destinations = new StringBuffer();
457                for (ActiveMQDestination dest : result) {
458                    destinations.append(destinations.length() > 0 ? ", " : "");
459                    destinations.append(dest.getPhysicalName());
460                }
461                LOG.info("Get Durable Destinations: {}", destinations);
462            }
463        }
464        return result;
465    }
466
467    @Override
468    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
469        if (isLogAll() || isLogInternalEvents()) {
470            LOG.info("Adding destination info: {}", info);
471        }
472        super.addDestinationInfo(context, info);
473    }
474
475    @Override
476    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
477        if (isLogAll() || isLogInternalEvents()) {
478            LOG.info("Removing destination info: {}", info);
479        }
480        super.removeDestinationInfo(context, info);
481    }
482
483    @Override
484    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
485        if (isLogAll() || isLogInternalEvents()) {
486            String msg = "Unable to display message.";
487
488            msg = message.getMessage().toString();
489
490            LOG.info("Message has expired: {}", msg);
491        }
492        super.messageExpired(context, message, subscription);
493    }
494
495    @Override
496    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
497                                         Subscription subscription, Throwable poisonCause) {
498        if (isLogAll() || isLogInternalEvents()) {
499            String msg = "Unable to display message.";
500
501            msg = messageReference.getMessage().toString();
502
503            LOG.info("Sending to DLQ: {}", msg);
504        }
505        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
506    }
507
508    @Override
509    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination destination) {
510        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
511            LOG.info("Fast Producer: {}", producerInfo);
512        }
513        super.fastProducer(context, producerInfo, destination);
514    }
515
516    @Override
517    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
518        if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
519            LOG.info("Destination is full: {}", destination.getName());
520        }
521        super.isFull(context, destination, usage);
522    }
523
524    @Override
525    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
526        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
527            String msg = "Unable to display message.";
528
529            msg = messageReference.getMessage().toString();
530
531            LOG.info("Message consumed: {}", msg);
532        }
533        super.messageConsumed(context, messageReference);
534    }
535
536    @Override
537    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
538        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
539            String msg = "Unable to display message.";
540
541            msg = messageReference.getMessage().toString();
542
543            LOG.info("Message delivered: {}", msg);
544        }
545        super.messageDelivered(context, messageReference);
546    }
547
548    @Override
549    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
550        if (isLogAll() || isLogInternalEvents()) {
551            String msg = "Unable to display message.";
552
553            msg = messageReference.getMessage().toString();
554
555            LOG.info("Message discarded: {}", msg);
556        }
557        super.messageDiscarded(context, sub, messageReference);
558    }
559
560    @Override
561    public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
562        if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
563            LOG.info("Detected slow consumer on {}", destination.getName());
564            StringBuffer buf = new StringBuffer("Connection(");
565            buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
566            buf.append(") Session(");
567            buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
568            buf.append(")");
569            LOG.info(buf.toString());
570        }
571        super.slowConsumer(context, destination, subs);
572    }
573
574    @Override
575    public void nowMasterBroker() {
576        if (isLogAll() || isLogInternalEvents()) {
577            LOG.info("Is now the master broker: {}", getBrokerName());
578        }
579        super.nowMasterBroker();
580    }
581
582    @Override
583    public String toString() {
584        StringBuffer buf = new StringBuffer();
585        buf.append("LoggingBrokerPlugin(");
586        buf.append("logAll=");
587        buf.append(isLogAll());
588        buf.append(", logConnectionEvents=");
589        buf.append(isLogConnectionEvents());
590        buf.append(", logSessionEvents=");
591        buf.append(isLogSessionEvents());
592        buf.append(", logConsumerEvents=");
593        buf.append(isLogConsumerEvents());
594        buf.append(", logProducerEvents=");
595        buf.append(isLogProducerEvents());
596        buf.append(", logTransactionEvents=");
597        buf.append(isLogTransactionEvents());
598        buf.append(", logInternalEvents=");
599        buf.append(isLogInternalEvents());
600        buf.append(")");
601        return buf.toString();
602    }
603
604    public void setPerDestinationLogger(boolean perDestinationLogger) {
605        this.perDestinationLogger = perDestinationLogger;
606    }
607
608    public boolean isPerDestinationLogger() {
609        return perDestinationLogger;
610    }
611}