001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import java.io.DataOutputStream;
020    import java.io.IOException;
021    import java.net.DatagramPacket;
022    import java.net.DatagramSocket;
023    import java.net.InetAddress;
024    import java.net.InetSocketAddress;
025    import java.net.SocketAddress;
026    import java.net.URI;
027    import java.net.URISyntaxException;
028    import java.net.UnknownHostException;
029    
030    import org.apache.activemq.broker.BrokerPluginSupport;
031    import org.apache.activemq.broker.ConnectionContext;
032    import org.apache.activemq.broker.ConsumerBrokerExchange;
033    import org.apache.activemq.broker.ProducerBrokerExchange;
034    import org.apache.activemq.broker.region.Subscription;
035    import org.apache.activemq.command.ActiveMQDestination;
036    import org.apache.activemq.command.BrokerId;
037    import org.apache.activemq.command.ConnectionInfo;
038    import org.apache.activemq.command.ConsumerInfo;
039    import org.apache.activemq.command.DataStructure;
040    import org.apache.activemq.command.DestinationInfo;
041    import org.apache.activemq.command.JournalTrace;
042    import org.apache.activemq.command.Message;
043    import org.apache.activemq.command.MessageAck;
044    import org.apache.activemq.command.MessageDispatch;
045    import org.apache.activemq.command.MessageDispatchNotification;
046    import org.apache.activemq.command.MessagePull;
047    import org.apache.activemq.command.ProducerInfo;
048    import org.apache.activemq.command.RemoveSubscriptionInfo;
049    import org.apache.activemq.command.Response;
050    import org.apache.activemq.command.SessionInfo;
051    import org.apache.activemq.command.TransactionId;
052    import org.apache.activemq.command.TransactionInfo;
053    import org.apache.activemq.openwire.OpenWireFormatFactory;
054    import org.apache.activemq.util.ByteArrayOutputStream;
055    import org.apache.activemq.util.ByteSequence;
056    import org.apache.activemq.wireformat.WireFormat;
057    import org.apache.activemq.wireformat.WireFormatFactory;
058    import org.slf4j.Logger;
059    import org.slf4j.LoggerFactory;
060    
061    /**
062     * A Broker interceptor which allows you to trace all operations to a UDP
063     * socket.
064     * 
065     * @org.apache.xbean.XBean element="udpTraceBrokerPlugin"
066     * 
067     */
068    public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
069    
070        private static final Logger LOG = LoggerFactory.getLogger(UDPTraceBrokerPlugin.class);
071        protected WireFormat wireFormat;
072        protected WireFormatFactory wireFormatFactory;
073        protected int maxTraceDatagramSize = 1024 * 4;
074        protected URI destination;
075        protected DatagramSocket socket;
076    
077        protected BrokerId brokerId;
078        protected SocketAddress address;
079        protected boolean broadcast;
080    
081        public UDPTraceBrokerPlugin() {
082            try {
083                destination = new URI("udp://127.0.0.1:61616");
084            } catch (URISyntaxException wontHappen) {
085            }
086        }
087    
088        public void start() throws Exception {
089            super.start();
090            if (getWireFormat() == null) {
091                throw new IllegalArgumentException("Wireformat must be specifed.");
092            }
093            if (address == null) {
094                address = createSocketAddress(destination);
095            }
096            socket = createSocket();
097    
098            brokerId = super.getBrokerId();
099            trace(new JournalTrace("START"));
100        }
101    
102        protected DatagramSocket createSocket() throws IOException {
103            DatagramSocket s = new DatagramSocket();
104            s.setSendBufferSize(maxTraceDatagramSize);
105            s.setBroadcast(broadcast);
106            return s;
107        }
108    
109        public void stop() throws Exception {
110            trace(new JournalTrace("STOP"));
111            socket.close();
112            super.stop();
113        }
114    
115        private void trace(DataStructure command) {
116            try {
117    
118                ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
119                DataOutputStream out = new DataOutputStream(baos);
120                wireFormat.marshal(brokerId, out);
121                wireFormat.marshal(command, out);
122                out.close();
123                ByteSequence sequence = baos.toByteSequence();
124                DatagramPacket datagram = new DatagramPacket(sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
125                socket.send(datagram);
126    
127            } catch (Throwable e) {
128                LOG.debug("Failed to trace: " + command, e);
129            }
130        }
131    
132        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
133            trace(messageSend);
134            super.send(producerExchange, messageSend);
135        }
136    
137        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
138            trace(ack);
139            super.acknowledge(consumerExchange, ack);
140        }
141    
142        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
143            trace(info);
144            super.addConnection(context, info);
145        }
146    
147        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
148            trace(info);
149            return super.addConsumer(context, info);
150        }
151    
152        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
153            trace(info);
154            super.addDestinationInfo(context, info);
155        }
156    
157        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
158            trace(info);
159            super.addProducer(context, info);
160        }
161    
162        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
163            trace(info);
164            super.addSession(context, info);
165        }
166    
167        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
168            trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN));
169            super.beginTransaction(context, xid);
170        }
171    
172        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
173            trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE));
174            super.commitTransaction(context, xid, onePhase);
175        }
176    
177        public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
178            trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET));
179            super.forgetTransaction(context, xid);
180        }
181    
182        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
183            trace(pull);
184            return super.messagePull(context, pull);
185        }
186    
187        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
188            trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE));
189            return super.prepareTransaction(context, xid);
190        }
191    
192        public void postProcessDispatch(MessageDispatch messageDispatch) {
193            trace(messageDispatch);
194            super.postProcessDispatch(messageDispatch);
195        }
196    
197        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
198            trace(messageDispatchNotification);
199            super.processDispatchNotification(messageDispatchNotification);
200        }
201    
202        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
203            trace(info.createRemoveCommand());
204            super.removeConnection(context, info, error);
205        }
206    
207        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
208            trace(info.createRemoveCommand());
209            super.removeConsumer(context, info);
210        }
211    
212        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
213            super.removeDestination(context, destination, timeout);
214        }
215    
216        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
217            trace(info);
218            super.removeDestinationInfo(context, info);
219        }
220    
221        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
222            trace(info.createRemoveCommand());
223            super.removeProducer(context, info);
224        }
225    
226        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
227            trace(info.createRemoveCommand());
228            super.removeSession(context, info);
229        }
230    
231        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
232            trace(info);
233            super.removeSubscription(context, info);
234        }
235    
236        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
237            trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK));
238            super.rollbackTransaction(context, xid);
239        }
240    
241        public WireFormat getWireFormat() {
242            if (wireFormat == null) {
243                wireFormat = createWireFormat();
244            }
245            return wireFormat;
246        }
247    
248        protected WireFormat createWireFormat() {
249            return getWireFormatFactory().createWireFormat();
250        }
251    
252        public void setWireFormat(WireFormat wireFormat) {
253            this.wireFormat = wireFormat;
254        }
255    
256        public WireFormatFactory getWireFormatFactory() {
257            if (wireFormatFactory == null) {
258                wireFormatFactory = createWireFormatFactory();
259            }
260            return wireFormatFactory;
261        }
262    
263        protected OpenWireFormatFactory createWireFormatFactory() {
264            OpenWireFormatFactory wf = new OpenWireFormatFactory();
265            wf.setCacheEnabled(false);
266            wf.setVersion(1);
267            wf.setTightEncodingEnabled(true);
268            wf.setSizePrefixDisabled(true);
269            return wf;
270        }
271    
272        public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
273            this.wireFormatFactory = wireFormatFactory;
274        }
275    
276        protected SocketAddress createSocketAddress(URI location) throws UnknownHostException {
277            InetAddress a = InetAddress.getByName(location.getHost());
278            int port = location.getPort();
279            return new InetSocketAddress(a, port);
280        }
281    
282        public URI getDestination() {
283            return destination;
284        }
285    
286        public void setDestination(URI destination) {
287            this.destination = destination;
288        }
289    
290        public int getMaxTraceDatagramSize() {
291            return maxTraceDatagramSize;
292        }
293    
294        public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
295            this.maxTraceDatagramSize = maxTraceDatagramSize;
296        }
297    
298        public boolean isBroadcast() {
299            return broadcast;
300        }
301    
302        public void setBroadcast(boolean broadcast) {
303            this.broadcast = broadcast;
304        }
305    
306        public SocketAddress getAddress() {
307            return address;
308        }
309    
310        public void setAddress(SocketAddress address) {
311            this.address = address;
312        }
313    
314    }