001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.util.ArrayList;
020import java.util.List;
021import org.apache.activemq.broker.region.Destination;
022import org.apache.activemq.broker.region.Subscription;
023import org.apache.activemq.command.ActiveMQDestination;
024import org.apache.activemq.command.BrokerInfo;
025import org.apache.activemq.command.ConnectionInfo;
026import org.apache.activemq.command.ConsumerInfo;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.command.MessageAck;
029import org.apache.activemq.command.ProducerInfo;
030import org.apache.activemq.command.RemoveSubscriptionInfo;
031import org.apache.activemq.command.SessionInfo;
032import org.apache.activemq.command.TransactionId;
033
034/**
035 * Used to add listeners for Broker actions
036 * 
037 * 
038 */
039public class BrokerBroadcaster extends BrokerFilter {
040    protected volatile Broker[] listeners = new Broker[0];
041
042    public BrokerBroadcaster(Broker next) {
043        super(next);
044    }
045
046    @Override
047    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
048        next.acknowledge(consumerExchange, ack);
049        Broker brokers[] = getListeners();
050        for (int i = 0; i < brokers.length; i++) {
051            brokers[i].acknowledge(consumerExchange, ack);
052        }
053    }
054
055    @Override
056    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
057        next.addConnection(context, info);
058        Broker brokers[] = getListeners();
059        for (int i = 0; i < brokers.length; i++) {
060            brokers[i].addConnection(context, info);
061        }
062    }
063
064    @Override
065    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
066        Subscription answer = next.addConsumer(context, info);
067        Broker brokers[] = getListeners();
068        for (int i = 0; i < brokers.length; i++) {
069            brokers[i].addConsumer(context, info);
070        }
071        return answer;
072    }
073
074    @Override
075    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
076        next.addProducer(context, info);
077        Broker brokers[] = getListeners();
078        for (int i = 0; i < brokers.length; i++) {
079            brokers[i].addProducer(context, info);
080        }
081    }
082
083    @Override
084    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
085        next.commitTransaction(context, xid, onePhase);
086        Broker brokers[] = getListeners();
087        for (int i = 0; i < brokers.length; i++) {
088            brokers[i].commitTransaction(context, xid, onePhase);
089        }
090    }
091
092    @Override
093    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
094        next.removeSubscription(context, info);
095        Broker brokers[] = getListeners();
096        for (int i = 0; i < brokers.length; i++) {
097            brokers[i].removeSubscription(context, info);
098        }
099    }
100
101    @Override
102    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
103        int result = next.prepareTransaction(context, xid);
104        Broker brokers[] = getListeners();
105        for (int i = 0; i < brokers.length; i++) {
106            // TODO decide what to do with return values
107            brokers[i].prepareTransaction(context, xid);
108        }
109        return result;
110    }
111
112    @Override
113    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
114        next.removeConnection(context, info, error);
115        Broker brokers[] = getListeners();
116        for (int i = 0; i < brokers.length; i++) {
117            brokers[i].removeConnection(context, info, error);
118        }
119    }
120
121    @Override
122    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
123        next.removeConsumer(context, info);
124        Broker brokers[] = getListeners();
125        for (int i = 0; i < brokers.length; i++) {
126            brokers[i].removeConsumer(context, info);
127        }
128    }
129
130    @Override
131    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
132        next.removeProducer(context, info);
133        Broker brokers[] = getListeners();
134        for (int i = 0; i < brokers.length; i++) {
135            brokers[i].removeProducer(context, info);
136        }
137    }
138
139    @Override
140    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
141        next.rollbackTransaction(context, xid);
142        Broker brokers[] = getListeners();
143        for (int i = 0; i < brokers.length; i++) {
144            brokers[i].rollbackTransaction(context, xid);
145        }
146    }
147
148    @Override
149    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
150        next.send(producerExchange, messageSend);
151        Broker brokers[] = getListeners();
152        for (int i = 0; i < brokers.length; i++) {
153            brokers[i].send(producerExchange, messageSend);
154        }
155    }
156
157    @Override
158    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
159        next.beginTransaction(context, xid);
160        Broker brokers[] = getListeners();
161        for (int i = 0; i < brokers.length; i++) {
162            brokers[i].beginTransaction(context, xid);
163        }
164    }
165
166    @Override
167    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
168        next.forgetTransaction(context, transactionId);
169        Broker brokers[] = getListeners();
170        for (int i = 0; i < brokers.length; i++) {
171            brokers[i].forgetTransaction(context, transactionId);
172        }
173    }
174
175    @Override
176    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
177        Destination result = next.addDestination(context, destination,createIfTemporary);
178        Broker brokers[] = getListeners();
179        for (int i = 0; i < brokers.length; i++) {
180            brokers[i].addDestination(context, destination,createIfTemporary);
181        }
182        return result;
183    }
184
185    @Override
186    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
187        next.removeDestination(context, destination, timeout);
188        Broker brokers[] = getListeners();
189        for (int i = 0; i < brokers.length; i++) {
190            brokers[i].removeDestination(context, destination, timeout);
191        }
192    }
193
194    @Override
195    public void start() throws Exception {
196        next.start();
197        Broker brokers[] = getListeners();
198        for (int i = 0; i < brokers.length; i++) {
199            brokers[i].start();
200        }
201    }
202
203    @Override
204    public void stop() throws Exception {
205        next.stop();
206        Broker brokers[] = getListeners();
207        for (int i = 0; i < brokers.length; i++) {
208            brokers[i].stop();
209        }
210    }
211
212    @Override
213    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
214        next.addSession(context, info);
215        Broker brokers[] = getListeners();
216        for (int i = 0; i < brokers.length; i++) {
217            brokers[i].addSession(context, info);
218        }
219    }
220
221    @Override
222    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
223        next.removeSession(context, info);
224        Broker brokers[] = getListeners();
225        for (int i = 0; i < brokers.length; i++) {
226            brokers[i].removeSession(context, info);
227        }
228    }
229
230    @Override
231    public void gc() {
232        next.gc();
233        Broker brokers[] = getListeners();
234        for (int i = 0; i < brokers.length; i++) {
235            brokers[i].gc();
236        }
237    }
238
239    @Override
240    public void addBroker(Connection connection, BrokerInfo info) {
241        next.addBroker(connection, info);
242        Broker brokers[] = getListeners();
243        for (int i = 0; i < brokers.length; i++) {
244            brokers[i].addBroker(connection, info);
245        }
246    }
247
248    protected Broker[] getListeners() {
249        return listeners;
250    }
251
252    public synchronized void addListener(Broker broker) {
253        List<Broker> tmp = getListenersAsList();
254        tmp.add(broker);
255        listeners = tmp.toArray(new Broker[tmp.size()]);
256    }
257
258    public synchronized void removeListener(Broker broker) {
259        List<Broker> tmp = getListenersAsList();
260        tmp.remove(broker);
261        listeners = tmp.toArray(new Broker[tmp.size()]);
262    }
263
264    protected List<Broker> getListenersAsList() {
265        List<Broker> tmp = new ArrayList<Broker>();
266        Broker brokers[] = getListeners();
267        for (int i = 0; i < brokers.length; i++) {
268            tmp.add(brokers[i]);
269        }
270        return tmp;
271    }
272}