001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.net.URI;
020    import java.util.Map;
021    import java.util.Set;
022    import java.util.concurrent.ThreadPoolExecutor;
023    import org.apache.activemq.broker.region.Destination;
024    import org.apache.activemq.broker.region.MessageReference;
025    import org.apache.activemq.broker.region.Subscription;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.BrokerId;
028    import org.apache.activemq.command.BrokerInfo;
029    import org.apache.activemq.command.ConnectionInfo;
030    import org.apache.activemq.command.ConsumerControl;
031    import org.apache.activemq.command.ConsumerInfo;
032    import org.apache.activemq.command.DestinationInfo;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageDispatch;
036    import org.apache.activemq.command.MessageDispatchNotification;
037    import org.apache.activemq.command.MessagePull;
038    import org.apache.activemq.command.ProducerInfo;
039    import org.apache.activemq.command.RemoveSubscriptionInfo;
040    import org.apache.activemq.command.Response;
041    import org.apache.activemq.command.SessionInfo;
042    import org.apache.activemq.command.TransactionId;
043    import org.apache.activemq.store.kahadb.plist.PListStore;
044    import org.apache.activemq.thread.Scheduler;
045    import org.apache.activemq.usage.Usage;
046    
047    /**
048     * Allows you to intercept broker operation so that features such as security
049     * can be implemented as a pluggable filter.
050     * 
051     * 
052     */
053    public class BrokerFilter implements Broker {
054    
055        protected final Broker next;
056    
057        public BrokerFilter(Broker next) {
058            this.next = next;
059        }
060    
061        public Broker getAdaptor(Class type) {
062            if (type.isInstance(this)) {
063                return this;
064            }
065            return next.getAdaptor(type);
066        }
067    
068        public Map<ActiveMQDestination, Destination> getDestinationMap() {
069            return next.getDestinationMap();
070        }
071    
072        public Set <Destination>getDestinations(ActiveMQDestination destination) {
073            return next.getDestinations(destination);
074        }
075    
076        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
077            next.acknowledge(consumerExchange, ack);
078        }
079    
080        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
081            return next.messagePull(context, pull);
082        }
083    
084        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
085            next.addConnection(context, info);
086        }
087    
088        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
089            return next.addConsumer(context, info);
090        }
091    
092        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
093            next.addProducer(context, info);
094        }
095    
096        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
097            next.commitTransaction(context, xid, onePhase);
098        }
099    
100        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
101            next.removeSubscription(context, info);
102        }
103    
104        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
105            return next.getPreparedTransactions(context);
106        }
107    
108        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
109            return next.prepareTransaction(context, xid);
110        }
111    
112        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
113            next.removeConnection(context, info, error);
114        }
115    
116        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
117            next.removeConsumer(context, info);
118        }
119    
120        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
121            next.removeProducer(context, info);
122        }
123    
124        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
125            next.rollbackTransaction(context, xid);
126        }
127    
128        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
129            next.send(producerExchange, messageSend);
130        }
131    
132        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
133            next.beginTransaction(context, xid);
134        }
135    
136        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
137            next.forgetTransaction(context, transactionId);
138        }
139    
140        public Connection[] getClients() throws Exception {
141            return next.getClients();
142        }
143    
144        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
145            return next.addDestination(context, destination,createIfTemporary);
146        }
147    
148        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
149            next.removeDestination(context, destination, timeout);
150        }
151    
152        public ActiveMQDestination[] getDestinations() throws Exception {
153            return next.getDestinations();
154        }
155    
156        public void start() throws Exception {
157            next.start();
158        }
159    
160        public void stop() throws Exception {
161            next.stop();
162        }
163    
164        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
165            next.addSession(context, info);
166        }
167    
168        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
169            next.removeSession(context, info);
170        }
171    
172        public BrokerId getBrokerId() {
173            return next.getBrokerId();
174        }
175    
176        public String getBrokerName() {
177            return next.getBrokerName();
178        }
179    
180        public void gc() {
181            next.gc();
182        }
183    
184        public void addBroker(Connection connection, BrokerInfo info) {
185            next.addBroker(connection, info);
186        }
187    
188        public void removeBroker(Connection connection, BrokerInfo info) {
189            next.removeBroker(connection, info);
190        }
191    
192        public BrokerInfo[] getPeerBrokerInfos() {
193            return next.getPeerBrokerInfos();
194        }
195    
196        public void preProcessDispatch(MessageDispatch messageDispatch) {
197            next.preProcessDispatch(messageDispatch);
198        }
199    
200        public void postProcessDispatch(MessageDispatch messageDispatch) {
201            next.postProcessDispatch(messageDispatch);
202        }
203    
204        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
205            next.processDispatchNotification(messageDispatchNotification);
206        }
207    
208        public boolean isStopped() {
209            return next.isStopped();
210        }
211    
212        public Set<ActiveMQDestination> getDurableDestinations() {
213            return next.getDurableDestinations();
214        }
215    
216        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
217            next.addDestinationInfo(context, info);
218        }
219    
220        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
221            next.removeDestinationInfo(context, info);
222        }
223    
224        public boolean isFaultTolerantConfiguration() {
225            return next.isFaultTolerantConfiguration();
226        }
227    
228        public ConnectionContext getAdminConnectionContext() {
229            return next.getAdminConnectionContext();
230        }
231    
232        public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
233            next.setAdminConnectionContext(adminConnectionContext);
234        }
235    
236        public PListStore getTempDataStore() {
237            return next.getTempDataStore();
238        }
239    
240        public URI getVmConnectorURI() {
241            return next.getVmConnectorURI();
242        }
243    
244        public void brokerServiceStarted() {
245            next.brokerServiceStarted();
246        }
247    
248        public BrokerService getBrokerService() {
249            return next.getBrokerService();
250        }
251    
252        public boolean isExpired(MessageReference messageReference) {
253            return next.isExpired(messageReference);
254        }
255    
256        public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
257            next.messageExpired(context, message, subscription);
258        }
259    
260        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
261                                          Subscription subscription) {
262            next.sendToDeadLetterQueue(context, messageReference, subscription);
263        }
264    
265        public Broker getRoot() {
266            return next.getRoot();
267        }
268    
269        public long getBrokerSequenceId() {
270            return next.getBrokerSequenceId();
271        }
272    
273       
274        public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
275            next.fastProducer(context, producerInfo, destination);
276        }
277    
278        public void isFull(ConnectionContext context,Destination destination, Usage usage) {
279            next.isFull(context,destination, usage);
280        }
281    
282        public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
283            next.messageConsumed(context, messageReference);
284        }
285    
286        public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
287            next.messageDelivered(context, messageReference);
288        }
289    
290        public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) {
291            next.messageDiscarded(context, sub, messageReference);
292        }
293    
294        public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
295            next.slowConsumer(context, destination,subs);
296        }
297        
298        public void nowMasterBroker() {   
299            next.nowMasterBroker();
300        }
301    
302        public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
303                ConsumerControl control) {
304            next.processConsumerControl(consumerExchange, control);
305        }
306    
307        public Scheduler getScheduler() {
308           return next.getScheduler();
309        }
310    
311        public ThreadPoolExecutor getExecutor() {
312           return next.getExecutor();
313        }
314    
315        public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
316            next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
317        }
318    
319        public void networkBridgeStopped(BrokerInfo brokerInfo) {
320            next.networkBridgeStopped(brokerInfo);
321        }
322    }