001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.net.URI;
020    import java.util.Map;
021    import java.util.Set;
022    import java.util.concurrent.ThreadPoolExecutor;
023    import java.util.concurrent.atomic.AtomicReference;
024    import org.apache.activemq.broker.region.Destination;
025    import org.apache.activemq.broker.region.MessageReference;
026    import org.apache.activemq.broker.region.Subscription;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.BrokerId;
029    import org.apache.activemq.command.BrokerInfo;
030    import org.apache.activemq.command.ConnectionInfo;
031    import org.apache.activemq.command.ConsumerControl;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.DestinationInfo;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageDispatch;
037    import org.apache.activemq.command.MessageDispatchNotification;
038    import org.apache.activemq.command.MessagePull;
039    import org.apache.activemq.command.ProducerInfo;
040    import org.apache.activemq.command.RemoveSubscriptionInfo;
041    import org.apache.activemq.command.Response;
042    import org.apache.activemq.command.SessionInfo;
043    import org.apache.activemq.command.TransactionId;
044    import org.apache.activemq.store.kahadb.plist.PListStore;
045    import org.apache.activemq.thread.Scheduler;
046    import org.apache.activemq.usage.Usage;
047    
048    /**
049     * Like a BrokerFilter but it allows you to switch the getNext().broker. This
050     * has more overhead than a BrokerFilter since access to the getNext().broker
051     * has to synchronized since it is mutable
052     * 
053     * 
054     */
055    public class MutableBrokerFilter implements Broker {
056    
057        protected AtomicReference<Broker> next = new AtomicReference<Broker>();
058    
059        public MutableBrokerFilter(Broker next) {
060            this.next.set(next);
061        }
062    
063        public Broker getAdaptor(Class type) {
064            if (type.isInstance(this)) {
065                return this;
066            }
067            return next.get().getAdaptor(type);
068        }
069    
070        public Broker getNext() {
071            return next.get();
072        }
073    
074        public void setNext(Broker next) {
075            this.next.set(next);
076        }
077    
078        public Map<ActiveMQDestination, Destination> getDestinationMap() {
079            return getNext().getDestinationMap();
080        }
081    
082        public Set getDestinations(ActiveMQDestination destination) {
083            return getNext().getDestinations(destination);
084        }
085    
086        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
087            getNext().acknowledge(consumerExchange, ack);
088        }
089    
090        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
091            getNext().addConnection(context, info);
092        }
093    
094        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
095            return getNext().addConsumer(context, info);
096        }
097    
098        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
099            getNext().addProducer(context, info);
100        }
101    
102        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
103            getNext().commitTransaction(context, xid, onePhase);
104        }
105    
106        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
107            getNext().removeSubscription(context, info);
108        }
109    
110        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
111            return getNext().getPreparedTransactions(context);
112        }
113    
114        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
115            return getNext().prepareTransaction(context, xid);
116        }
117    
118        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
119            getNext().removeConnection(context, info, error);
120        }
121    
122        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
123            getNext().removeConsumer(context, info);
124        }
125    
126        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
127            getNext().removeProducer(context, info);
128        }
129    
130        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
131            getNext().rollbackTransaction(context, xid);
132        }
133    
134        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
135            getNext().send(producerExchange, messageSend);
136        }
137    
138        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
139            getNext().beginTransaction(context, xid);
140        }
141    
142        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
143            getNext().forgetTransaction(context, transactionId);
144        }
145    
146        public Connection[] getClients() throws Exception {
147            return getNext().getClients();
148        }
149    
150        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
151            return getNext().addDestination(context, destination,createIfTemporary);
152        }
153    
154        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
155            getNext().removeDestination(context, destination, timeout);
156        }
157    
158        public ActiveMQDestination[] getDestinations() throws Exception {
159            return getNext().getDestinations();
160        }
161    
162        public void start() throws Exception {
163            getNext().start();
164        }
165    
166        public void stop() throws Exception {
167            getNext().stop();
168        }
169    
170        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
171            getNext().addSession(context, info);
172        }
173    
174        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
175            getNext().removeSession(context, info);
176        }
177    
178        public BrokerId getBrokerId() {
179            return getNext().getBrokerId();
180        }
181    
182        public String getBrokerName() {
183            return getNext().getBrokerName();
184        }
185    
186        public void gc() {
187            getNext().gc();
188        }
189    
190        public void addBroker(Connection connection, BrokerInfo info) {
191            getNext().addBroker(connection, info);
192        }
193    
194        public void removeBroker(Connection connection, BrokerInfo info) {
195            getNext().removeBroker(connection, info);
196        }
197    
198        public BrokerInfo[] getPeerBrokerInfos() {
199            return getNext().getPeerBrokerInfos();
200        }
201    
202        public void preProcessDispatch(MessageDispatch messageDispatch) {
203            getNext().preProcessDispatch(messageDispatch);
204        }
205    
206        public void postProcessDispatch(MessageDispatch messageDispatch) {
207            getNext().postProcessDispatch(messageDispatch);
208        }
209    
210        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
211            getNext().processDispatchNotification(messageDispatchNotification);
212        }
213    
214        public boolean isStopped() {
215            return getNext().isStopped();
216        }
217    
218        public Set<ActiveMQDestination> getDurableDestinations() {
219            return getNext().getDurableDestinations();
220        }
221    
222        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
223            getNext().addDestinationInfo(context, info);
224    
225        }
226    
227        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
228            getNext().removeDestinationInfo(context, info);
229    
230        }
231    
232        public boolean isFaultTolerantConfiguration() {
233            return getNext().isFaultTolerantConfiguration();
234        }
235    
236        public ConnectionContext getAdminConnectionContext() {
237            return getNext().getAdminConnectionContext();
238        }
239    
240        public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
241            getNext().setAdminConnectionContext(adminConnectionContext);
242        }
243    
244        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
245            return getNext().messagePull(context, pull);
246        }
247    
248        public PListStore getTempDataStore() {
249            return getNext().getTempDataStore();
250        }
251    
252        public URI getVmConnectorURI() {
253            return getNext().getVmConnectorURI();
254        }
255    
256        public void brokerServiceStarted() {
257            getNext().brokerServiceStarted();
258        }
259    
260        public BrokerService getBrokerService() {
261            return getNext().getBrokerService();
262        }
263    
264        public boolean isExpired(MessageReference messageReference) {
265            return getNext().isExpired(messageReference);
266        }
267    
268        public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
269            getNext().messageExpired(context, message, subscription);
270        }
271    
272        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
273                                          Subscription subscription) {
274            getNext().sendToDeadLetterQueue(context, messageReference, subscription);
275        }
276    
277        public Broker getRoot() {
278            return getNext().getRoot();
279        }
280        
281        public long getBrokerSequenceId() {
282            return getNext().getBrokerSequenceId();
283        }
284        
285        public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
286            getNext().fastProducer(context, producerInfo, destination);
287        }
288    
289        public void isFull(ConnectionContext context,Destination destination, Usage usage) {
290            getNext().isFull(context,destination, usage);
291        }
292    
293        public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
294            getNext().messageConsumed(context, messageReference);
295        }
296    
297        public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
298            getNext().messageDelivered(context, messageReference);
299        }
300    
301        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
302            getNext().messageDiscarded(context, sub, messageReference);
303        }
304    
305        public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
306            getNext().slowConsumer(context, dest,subs);
307        }
308        
309        public void nowMasterBroker() {   
310           getNext().nowMasterBroker();
311        }
312    
313        public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
314                ConsumerControl control) {
315            getNext().processConsumerControl(consumerExchange, control);
316        }
317    
318        public Scheduler getScheduler() {
319           return getNext().getScheduler();
320        }
321    
322        public ThreadPoolExecutor getExecutor() {
323           return getNext().getExecutor();
324        }
325    
326        public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
327            getNext().networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
328        }
329    
330        public void networkBridgeStopped(BrokerInfo brokerInfo) {
331            getNext().networkBridgeStopped(brokerInfo);
332        }
333    }