001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import org.apache.activemq.broker.region.Destination;
022    import org.apache.activemq.broker.region.Subscription;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.BrokerInfo;
025    import org.apache.activemq.command.ConnectionInfo;
026    import org.apache.activemq.command.ConsumerInfo;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageAck;
029    import org.apache.activemq.command.ProducerInfo;
030    import org.apache.activemq.command.RemoveSubscriptionInfo;
031    import org.apache.activemq.command.SessionInfo;
032    import org.apache.activemq.command.TransactionId;
033    
034    /**
035     * Used to add listeners for Broker actions
036     * 
037     * 
038     */
039    public class BrokerBroadcaster extends BrokerFilter {
040        protected volatile Broker[] listeners = new Broker[0];
041    
042        public BrokerBroadcaster(Broker next) {
043            super(next);
044        }
045    
046        @Override
047        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
048            next.acknowledge(consumerExchange, ack);
049            Broker brokers[] = getListeners();
050            for (int i = 0; i < brokers.length; i++) {
051                brokers[i].acknowledge(consumerExchange, ack);
052            }
053        }
054    
055        @Override
056        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
057            next.addConnection(context, info);
058            Broker brokers[] = getListeners();
059            for (int i = 0; i < brokers.length; i++) {
060                brokers[i].addConnection(context, info);
061            }
062        }
063    
064        @Override
065        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
066            Subscription answer = next.addConsumer(context, info);
067            Broker brokers[] = getListeners();
068            for (int i = 0; i < brokers.length; i++) {
069                brokers[i].addConsumer(context, info);
070            }
071            return answer;
072        }
073    
074        @Override
075        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
076            next.addProducer(context, info);
077            Broker brokers[] = getListeners();
078            for (int i = 0; i < brokers.length; i++) {
079                brokers[i].addProducer(context, info);
080            }
081        }
082    
083        @Override
084        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
085            next.commitTransaction(context, xid, onePhase);
086            Broker brokers[] = getListeners();
087            for (int i = 0; i < brokers.length; i++) {
088                brokers[i].commitTransaction(context, xid, onePhase);
089            }
090        }
091    
092        @Override
093        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
094            next.removeSubscription(context, info);
095            Broker brokers[] = getListeners();
096            for (int i = 0; i < brokers.length; i++) {
097                brokers[i].removeSubscription(context, info);
098            }
099        }
100    
101        @Override
102        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
103            int result = next.prepareTransaction(context, xid);
104            Broker brokers[] = getListeners();
105            for (int i = 0; i < brokers.length; i++) {
106                // TODO decide what to do with return values
107                brokers[i].prepareTransaction(context, xid);
108            }
109            return result;
110        }
111    
112        @Override
113        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
114            next.removeConnection(context, info, error);
115            Broker brokers[] = getListeners();
116            for (int i = 0; i < brokers.length; i++) {
117                brokers[i].removeConnection(context, info, error);
118            }
119        }
120    
121        @Override
122        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
123            next.removeConsumer(context, info);
124            Broker brokers[] = getListeners();
125            for (int i = 0; i < brokers.length; i++) {
126                brokers[i].removeConsumer(context, info);
127            }
128        }
129    
130        @Override
131        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
132            next.removeProducer(context, info);
133            Broker brokers[] = getListeners();
134            for (int i = 0; i < brokers.length; i++) {
135                brokers[i].removeProducer(context, info);
136            }
137        }
138    
139        @Override
140        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
141            next.rollbackTransaction(context, xid);
142            Broker brokers[] = getListeners();
143            for (int i = 0; i < brokers.length; i++) {
144                brokers[i].rollbackTransaction(context, xid);
145            }
146        }
147    
148        @Override
149        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
150            next.send(producerExchange, messageSend);
151            Broker brokers[] = getListeners();
152            for (int i = 0; i < brokers.length; i++) {
153                brokers[i].send(producerExchange, messageSend);
154            }
155        }
156    
157        @Override
158        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
159            next.beginTransaction(context, xid);
160            Broker brokers[] = getListeners();
161            for (int i = 0; i < brokers.length; i++) {
162                brokers[i].beginTransaction(context, xid);
163            }
164        }
165    
166        @Override
167        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
168            next.forgetTransaction(context, transactionId);
169            Broker brokers[] = getListeners();
170            for (int i = 0; i < brokers.length; i++) {
171                brokers[i].forgetTransaction(context, transactionId);
172            }
173        }
174    
175        @Override
176        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
177            Destination result = next.addDestination(context, destination,createIfTemporary);
178            Broker brokers[] = getListeners();
179            for (int i = 0; i < brokers.length; i++) {
180                brokers[i].addDestination(context, destination,createIfTemporary);
181            }
182            return result;
183        }
184    
185        @Override
186        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
187            next.removeDestination(context, destination, timeout);
188            Broker brokers[] = getListeners();
189            for (int i = 0; i < brokers.length; i++) {
190                brokers[i].removeDestination(context, destination, timeout);
191            }
192        }
193    
194        @Override
195        public void start() throws Exception {
196            next.start();
197            Broker brokers[] = getListeners();
198            for (int i = 0; i < brokers.length; i++) {
199                brokers[i].start();
200            }
201        }
202    
203        @Override
204        public void stop() throws Exception {
205            next.stop();
206            Broker brokers[] = getListeners();
207            for (int i = 0; i < brokers.length; i++) {
208                brokers[i].stop();
209            }
210        }
211    
212        @Override
213        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
214            next.addSession(context, info);
215            Broker brokers[] = getListeners();
216            for (int i = 0; i < brokers.length; i++) {
217                brokers[i].addSession(context, info);
218            }
219        }
220    
221        @Override
222        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
223            next.removeSession(context, info);
224            Broker brokers[] = getListeners();
225            for (int i = 0; i < brokers.length; i++) {
226                brokers[i].removeSession(context, info);
227            }
228        }
229    
230        @Override
231        public void gc() {
232            next.gc();
233            Broker brokers[] = getListeners();
234            for (int i = 0; i < brokers.length; i++) {
235                brokers[i].gc();
236            }
237        }
238    
239        @Override
240        public void addBroker(Connection connection, BrokerInfo info) {
241            next.addBroker(connection, info);
242            Broker brokers[] = getListeners();
243            for (int i = 0; i < brokers.length; i++) {
244                brokers[i].addBroker(connection, info);
245            }
246        }
247    
248        protected Broker[] getListeners() {
249            return listeners;
250        }
251    
252        public synchronized void addListener(Broker broker) {
253            List<Broker> tmp = getListenersAsList();
254            tmp.add(broker);
255            listeners = tmp.toArray(new Broker[tmp.size()]);
256        }
257    
258        public synchronized void removeListener(Broker broker) {
259            List<Broker> tmp = getListenersAsList();
260            tmp.remove(broker);
261            listeners = tmp.toArray(new Broker[tmp.size()]);
262        }
263    
264        protected List<Broker> getListenersAsList() {
265            List<Broker> tmp = new ArrayList<Broker>();
266            Broker brokers[] = getListeners();
267            for (int i = 0; i < brokers.length; i++) {
268                tmp.add(brokers[i]);
269            }
270            return tmp;
271        }
272    }