001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.view;
018
019import java.io.IOException;
020import java.io.PrintWriter;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.Map;
026import java.util.Set;
027
028import javax.management.ObjectName;
029
030import org.apache.activemq.broker.Broker;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.broker.ProducerBrokerExchange;
033import org.apache.activemq.broker.jmx.BrokerViewMBean;
034import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
035import org.apache.activemq.broker.region.Subscription;
036import org.apache.activemq.command.ActiveMQDestination;
037import org.apache.activemq.command.ConsumerInfo;
038import org.apache.activemq.command.Message;
039import org.apache.activemq.command.ProducerId;
040import org.apache.activemq.command.ProducerInfo;
041import org.apache.activemq.filter.DestinationMapNode;
042
043/**
044 *
045 */
046public class ConnectionDotFileInterceptor extends DotFileInterceptorSupport {
047
048    protected static final String ID_SEPARATOR = "_";
049
050    private final boolean redrawOnRemove;
051    private boolean clearProducerCacheAfterRender;
052    private final String domain = "org.apache.activemq";
053    private BrokerViewMBean brokerView;
054
055    // until we have some MBeans for producers, lets do it all ourselves
056    private final Map<ProducerId, ProducerInfo> producers = new HashMap<ProducerId, ProducerInfo>();
057    private final Map<ProducerId, Set<ActiveMQDestination>> producerDestinations = new HashMap<ProducerId, Set<ActiveMQDestination>>();
058    private final Object lock = new Object();
059
060    public ConnectionDotFileInterceptor(Broker next, String file, boolean redrawOnRemove) throws IOException {
061        super(next, file);
062        this.redrawOnRemove = redrawOnRemove;
063
064    }
065
066    @Override
067    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
068        Subscription answer = super.addConsumer(context, info);
069        generateFile();
070        return answer;
071    }
072
073    @Override
074    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
075        super.addProducer(context, info);
076        ProducerId producerId = info.getProducerId();
077        synchronized (lock) {
078            producers.put(producerId, info);
079        }
080        generateFile();
081    }
082
083    @Override
084    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
085        super.removeConsumer(context, info);
086        if (redrawOnRemove) {
087            generateFile();
088        }
089    }
090
091    @Override
092    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
093        super.removeProducer(context, info);
094        ProducerId producerId = info.getProducerId();
095        if (redrawOnRemove) {
096            synchronized (lock) {
097                producerDestinations.remove(producerId);
098                producers.remove(producerId);
099            }
100            generateFile();
101        }
102    }
103
104    @Override
105    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
106        super.send(producerExchange, messageSend);
107        ProducerId producerId = messageSend.getProducerId();
108        ActiveMQDestination destination = messageSend.getDestination();
109        synchronized (lock) {
110            Set<ActiveMQDestination> destinations = producerDestinations.get(producerId);
111            if (destinations == null) {
112                destinations = new HashSet<ActiveMQDestination>();
113            }
114            producerDestinations.put(producerId, destinations);
115            destinations.add(destination);
116        }
117    }
118
119    @Override
120    protected void generateFile(PrintWriter writer) throws Exception {
121
122        writer.println("digraph \"ActiveMQ Connections\" {");
123        writer.println();
124        writer.println("label=\"ActiveMQ Broker: " + getBrokerView().getBrokerId() + "\"];");
125        writer.println();
126        writer.println("node [style = \"rounded,filled\", fillcolor = yellow, fontname=\"Helvetica-Oblique\"];");
127        writer.println();
128
129        Map<String, String> clients = new HashMap<String, String>();
130        Map<String, String> queues = new HashMap<String, String>();
131        Map<String, String> topics = new HashMap<String, String>();
132
133        printSubscribers(writer, clients, queues, "queue_", getBrokerView().getQueueSubscribers());
134        writer.println();
135
136        printSubscribers(writer, clients, topics, "topic_", getBrokerView().getTopicSubscribers());
137        writer.println();
138
139        printProducers(writer, clients, queues, topics);
140        writer.println();
141
142        writeLabels(writer, "green", "Client: ", clients);
143        writer.println();
144
145        writeLabels(writer, "red", "Queue: ", queues);
146        writeLabels(writer, "blue", "Topic: ", topics);
147        writer.println("}");
148
149        if (clearProducerCacheAfterRender) {
150            producerDestinations.clear();
151        }
152    }
153
154    protected void printProducers(PrintWriter writer, Map<String, String> clients, Map<String, String> queues, Map<String, String> topics) {
155        synchronized (lock) {
156            for (Iterator iter = producerDestinations.entrySet().iterator(); iter.hasNext();) {
157                Map.Entry entry = (Map.Entry)iter.next();
158                ProducerId producerId = (ProducerId)entry.getKey();
159                Set destinationSet = (Set)entry.getValue();
160                printProducers(writer, clients, queues, topics, producerId, destinationSet);
161            }
162        }
163    }
164
165    protected void printProducers(PrintWriter writer, Map<String, String> clients, Map<String, String> queues, Map<String, String> topics, ProducerId producerId, Set destinationSet) {
166        for (Iterator iter = destinationSet.iterator(); iter.hasNext();) {
167            ActiveMQDestination destination = (ActiveMQDestination)iter.next();
168
169            // TODO use clientId one day
170            String clientId = producerId.getConnectionId();
171            String safeClientId = asID(clientId);
172            clients.put(safeClientId, clientId);
173
174            String physicalName = destination.getPhysicalName();
175            String safeDestinationId = asID(physicalName);
176            if (destination.isTopic()) {
177                safeDestinationId = "topic_" + safeDestinationId;
178                topics.put(safeDestinationId, physicalName);
179            } else {
180                safeDestinationId = "queue_" + safeDestinationId;
181                queues.put(safeDestinationId, physicalName);
182            }
183
184            String safeProducerId = asID(producerId.toString());
185
186            // lets write out the links
187
188            writer.print(safeClientId);
189            writer.print(" -> ");
190            writer.print(safeProducerId);
191            writer.println(";");
192
193            writer.print(safeProducerId);
194            writer.print(" -> ");
195            writer.print(safeDestinationId);
196            writer.println(";");
197
198            // now lets write out the label
199            writer.print(safeProducerId);
200            writer.print(" [label = \"");
201            String label = "Producer: " + producerId.getSessionId() + "-" + producerId.getValue();
202            writer.print(label);
203            writer.println("\"];");
204
205        }
206    }
207
208    protected void printSubscribers(PrintWriter writer, Map<String, String> clients, Map<String, String> destinations, String type, ObjectName[] subscribers) {
209        for (int i = 0; i < subscribers.length; i++) {
210            ObjectName name = subscribers[i];
211            SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
212
213            String clientId = subscriber.getClientId();
214            String safeClientId = asID(clientId);
215            clients.put(safeClientId, clientId);
216
217            String destination = subscriber.getDestinationName();
218            String safeDestinationId = type + asID(destination);
219            destinations.put(safeDestinationId, destination);
220
221            String selector = subscriber.getSelector();
222
223            // lets write out the links
224            String subscriberId = safeClientId + "_" + subscriber.getSessionId() + "_" + subscriber.getSubscriptionId();
225
226            writer.print(subscriberId);
227            writer.print(" -> ");
228            writer.print(safeClientId);
229            writer.println(";");
230
231            writer.print(safeDestinationId);
232            writer.print(" -> ");
233            writer.print(subscriberId);
234            writer.println(";");
235
236            // now lets write out the label
237            writer.print(subscriberId);
238            writer.print(" [label = \"");
239            String label = "Subscription: " + subscriber.getSessionId() + "-" + subscriber.getSubscriptionId();
240            if (selector != null && selector.length() > 0) {
241                label = label + "\\nSelector: " + selector;
242            }
243            writer.print(label);
244            writer.println("\"];");
245        }
246    }
247
248    protected void writeLabels(PrintWriter writer, String color, String prefix, Map<String, String> map) {
249        for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
250            Map.Entry entry = (Map.Entry)iter.next();
251            String id = (String)entry.getKey();
252            String label = (String)entry.getValue();
253
254            writer.print(id);
255            writer.print(" [ fillcolor = ");
256            writer.print(color);
257            writer.print(", label = \"");
258            writer.print(prefix);
259            writer.print(label);
260            writer.println("\"];");
261        }
262    }
263
264    /**
265     * Lets strip out any non supported characters
266     */
267    protected String asID(String name) {
268        StringBuffer buffer = new StringBuffer();
269        int size = name.length();
270        for (int i = 0; i < size; i++) {
271            char ch = name.charAt(i);
272            if (Character.isLetterOrDigit(ch) || ch == '_') {
273                buffer.append(ch);
274            } else {
275                buffer.append('_');
276            }
277        }
278        return buffer.toString();
279    }
280
281    protected void printNodes(PrintWriter writer, DestinationMapNode node, String prefix) {
282        String path = getPath(node);
283        writer.print("  ");
284        writer.print(prefix);
285        writer.print(ID_SEPARATOR);
286        writer.print(path);
287        String label = path;
288        if (prefix.equals("topic")) {
289            label = "Topics";
290        } else if (prefix.equals("queue")) {
291            label = "Queues";
292        }
293        writer.print("[ label = \"");
294        writer.print(label);
295        writer.println("\" ];");
296
297        Collection children = node.getChildren();
298        for (Iterator iter = children.iterator(); iter.hasNext();) {
299            DestinationMapNode child = (DestinationMapNode)iter.next();
300            printNodes(writer, child, prefix + ID_SEPARATOR + path);
301        }
302    }
303
304    protected void printNodeLinks(PrintWriter writer, DestinationMapNode node, String prefix) {
305        String path = getPath(node);
306        Collection children = node.getChildren();
307        for (Iterator iter = children.iterator(); iter.hasNext();) {
308            DestinationMapNode child = (DestinationMapNode)iter.next();
309
310            writer.print("  ");
311            writer.print(prefix);
312            writer.print(ID_SEPARATOR);
313            writer.print(path);
314            writer.print(" -> ");
315            writer.print(prefix);
316            writer.print(ID_SEPARATOR);
317            writer.print(path);
318            writer.print(ID_SEPARATOR);
319            writer.print(getPath(child));
320            writer.println(";");
321
322            printNodeLinks(writer, child, prefix + ID_SEPARATOR + path);
323        }
324    }
325
326    protected String getPath(DestinationMapNode node) {
327        String path = node.getPath();
328        if (path.equals("*")) {
329            return "root";
330        }
331        return path;
332    }
333
334    BrokerViewMBean getBrokerView() throws Exception {
335        if (this.brokerView == null) {
336            ObjectName brokerName = getBrokerService().getBrokerObjectName();
337            this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
338                    BrokerViewMBean.class, true);
339        }
340        return this.brokerView;
341    }
342}