001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.io.DataOutputStream;
020import java.io.IOException;
021import java.net.DatagramPacket;
022import java.net.DatagramSocket;
023import java.net.InetAddress;
024import java.net.InetSocketAddress;
025import java.net.SocketAddress;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029
030import org.apache.activemq.broker.BrokerPluginSupport;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.broker.ConsumerBrokerExchange;
033import org.apache.activemq.broker.ProducerBrokerExchange;
034import org.apache.activemq.broker.region.Subscription;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.BrokerId;
037import org.apache.activemq.command.ConnectionInfo;
038import org.apache.activemq.command.ConsumerInfo;
039import org.apache.activemq.command.DataStructure;
040import org.apache.activemq.command.DestinationInfo;
041import org.apache.activemq.command.JournalTrace;
042import org.apache.activemq.command.Message;
043import org.apache.activemq.command.MessageAck;
044import org.apache.activemq.command.MessageDispatch;
045import org.apache.activemq.command.MessageDispatchNotification;
046import org.apache.activemq.command.MessagePull;
047import org.apache.activemq.command.ProducerInfo;
048import org.apache.activemq.command.RemoveSubscriptionInfo;
049import org.apache.activemq.command.Response;
050import org.apache.activemq.command.SessionInfo;
051import org.apache.activemq.command.TransactionId;
052import org.apache.activemq.command.TransactionInfo;
053import org.apache.activemq.openwire.OpenWireFormatFactory;
054import org.apache.activemq.util.ByteArrayOutputStream;
055import org.apache.activemq.util.ByteSequence;
056import org.apache.activemq.wireformat.WireFormat;
057import org.apache.activemq.wireformat.WireFormatFactory;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * A Broker interceptor which allows you to trace all operations to a UDP
063 * socket.
064 * 
065 * @org.apache.xbean.XBean element="udpTraceBrokerPlugin"
066 * 
067 */
068public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
069
070    private static final Logger LOG = LoggerFactory.getLogger(UDPTraceBrokerPlugin.class);
071    protected WireFormat wireFormat;
072    protected WireFormatFactory wireFormatFactory;
073    protected int maxTraceDatagramSize = 1024 * 4;
074    protected URI destination;
075    protected DatagramSocket socket;
076
077    protected BrokerId brokerId;
078    protected SocketAddress address;
079    protected boolean broadcast;
080
081    public UDPTraceBrokerPlugin() {
082        try {
083            destination = new URI("udp://127.0.0.1:61616");
084        } catch (URISyntaxException wontHappen) {
085        }
086    }
087
088    public void start() throws Exception {
089        super.start();
090        if (getWireFormat() == null) {
091            throw new IllegalArgumentException("Wireformat must be specifed.");
092        }
093        if (address == null) {
094            address = createSocketAddress(destination);
095        }
096        socket = createSocket();
097
098        brokerId = super.getBrokerId();
099        trace(new JournalTrace("START"));
100    }
101
102    protected DatagramSocket createSocket() throws IOException {
103        DatagramSocket s = new DatagramSocket();
104        s.setSendBufferSize(maxTraceDatagramSize);
105        s.setBroadcast(broadcast);
106        return s;
107    }
108
109    public void stop() throws Exception {
110        trace(new JournalTrace("STOP"));
111        socket.close();
112        super.stop();
113    }
114
115    private void trace(DataStructure command) {
116        try {
117
118            ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
119            DataOutputStream out = new DataOutputStream(baos);
120            wireFormat.marshal(brokerId, out);
121            wireFormat.marshal(command, out);
122            out.close();
123            ByteSequence sequence = baos.toByteSequence();
124            DatagramPacket datagram = new DatagramPacket(sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
125            socket.send(datagram);
126
127        } catch (Throwable e) {
128            LOG.debug("Failed to trace: {}", command, e);
129        }
130    }
131
132    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
133        trace(messageSend);
134        super.send(producerExchange, messageSend);
135    }
136
137    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
138        trace(ack);
139        super.acknowledge(consumerExchange, ack);
140    }
141
142    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
143        trace(info);
144        super.addConnection(context, info);
145    }
146
147    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
148        trace(info);
149        return super.addConsumer(context, info);
150    }
151
152    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
153        trace(info);
154        super.addDestinationInfo(context, info);
155    }
156
157    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
158        trace(info);
159        super.addProducer(context, info);
160    }
161
162    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
163        trace(info);
164        super.addSession(context, info);
165    }
166
167    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
168        trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN));
169        super.beginTransaction(context, xid);
170    }
171
172    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
173        trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE));
174        super.commitTransaction(context, xid, onePhase);
175    }
176
177    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
178        trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET));
179        super.forgetTransaction(context, xid);
180    }
181
182    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
183        trace(pull);
184        return super.messagePull(context, pull);
185    }
186
187    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
188        trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE));
189        return super.prepareTransaction(context, xid);
190    }
191
192    public void postProcessDispatch(MessageDispatch messageDispatch) {
193        trace(messageDispatch);
194        super.postProcessDispatch(messageDispatch);
195    }
196
197    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
198        trace(messageDispatchNotification);
199        super.processDispatchNotification(messageDispatchNotification);
200    }
201
202    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
203        trace(info.createRemoveCommand());
204        super.removeConnection(context, info, error);
205    }
206
207    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
208        trace(info.createRemoveCommand());
209        super.removeConsumer(context, info);
210    }
211
212    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
213        super.removeDestination(context, destination, timeout);
214    }
215
216    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
217        trace(info);
218        super.removeDestinationInfo(context, info);
219    }
220
221    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
222        trace(info.createRemoveCommand());
223        super.removeProducer(context, info);
224    }
225
226    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
227        trace(info.createRemoveCommand());
228        super.removeSession(context, info);
229    }
230
231    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
232        trace(info);
233        super.removeSubscription(context, info);
234    }
235
236    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
237        trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK));
238        super.rollbackTransaction(context, xid);
239    }
240
241    public WireFormat getWireFormat() {
242        if (wireFormat == null) {
243            wireFormat = createWireFormat();
244        }
245        return wireFormat;
246    }
247
248    protected WireFormat createWireFormat() {
249        return getWireFormatFactory().createWireFormat();
250    }
251
252    public void setWireFormat(WireFormat wireFormat) {
253        this.wireFormat = wireFormat;
254    }
255
256    public WireFormatFactory getWireFormatFactory() {
257        if (wireFormatFactory == null) {
258            wireFormatFactory = createWireFormatFactory();
259        }
260        return wireFormatFactory;
261    }
262
263    protected OpenWireFormatFactory createWireFormatFactory() {
264        OpenWireFormatFactory wf = new OpenWireFormatFactory();
265        wf.setCacheEnabled(false);
266        wf.setVersion(1);
267        wf.setTightEncodingEnabled(true);
268        wf.setSizePrefixDisabled(true);
269        return wf;
270    }
271
272    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
273        this.wireFormatFactory = wireFormatFactory;
274    }
275
276    protected SocketAddress createSocketAddress(URI location) throws UnknownHostException {
277        InetAddress a = InetAddress.getByName(location.getHost());
278        int port = location.getPort();
279        return new InetSocketAddress(a, port);
280    }
281
282    public URI getDestination() {
283        return destination;
284    }
285
286    public void setDestination(URI destination) {
287        this.destination = destination;
288    }
289
290    public int getMaxTraceDatagramSize() {
291        return maxTraceDatagramSize;
292    }
293
294    public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
295        this.maxTraceDatagramSize = maxTraceDatagramSize;
296    }
297
298    public boolean isBroadcast() {
299        return broadcast;
300    }
301
302    public void setBroadcast(boolean broadcast) {
303        this.broadcast = broadcast;
304    }
305
306    public SocketAddress getAddress() {
307        return address;
308    }
309
310    public void setAddress(SocketAddress address) {
311        this.address = address;
312    }
313
314}