001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.List;
021    import java.util.Set;
022    import org.apache.activemq.broker.Broker;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.ProducerBrokerExchange;
025    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
026    import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageDispatchNotification;
031    import org.apache.activemq.command.ProducerInfo;
032    import org.apache.activemq.store.MessageStore;
033    import org.apache.activemq.usage.MemoryUsage;
034    import org.apache.activemq.usage.Usage;
035    
036    /**
037     *
038     *
039     */
040    public class DestinationFilter implements Destination {
041    
042        private final Destination next;
043    
044        public DestinationFilter(Destination next) {
045            this.next = next;
046        }
047    
048        public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
049            next.acknowledge(context, sub, ack, node);
050        }
051    
052        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
053            next.addSubscription(context, sub);
054        }
055    
056        public Message[] browse() {
057            return next.browse();
058        }
059    
060        public void dispose(ConnectionContext context) throws IOException {
061            next.dispose(context);
062        }
063    
064        public boolean isDisposed() {
065            return next.isDisposed();
066        }
067    
068        public void gc() {
069            next.gc();
070        }
071    
072        public void markForGC(long timeStamp) {
073            next.markForGC(timeStamp);
074        }
075    
076        public boolean canGC() {
077            return next.canGC();
078        }
079    
080        public long getInactiveTimoutBeforeGC() {
081            return next.getInactiveTimoutBeforeGC();
082        }
083    
084        public ActiveMQDestination getActiveMQDestination() {
085            return next.getActiveMQDestination();
086        }
087    
088        public DeadLetterStrategy getDeadLetterStrategy() {
089            return next.getDeadLetterStrategy();
090        }
091    
092        public DestinationStatistics getDestinationStatistics() {
093            return next.getDestinationStatistics();
094        }
095    
096        public String getName() {
097            return next.getName();
098        }
099    
100        public MemoryUsage getMemoryUsage() {
101            return next.getMemoryUsage();
102        }
103    
104            @Override
105            public void setMemoryUsage(MemoryUsage memoryUsage) {
106                    next.setMemoryUsage(memoryUsage);
107            }
108    
109        public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
110            next.removeSubscription(context, sub, lastDeliveredSequenceId);
111        }
112    
113        public void send(ProducerBrokerExchange context, Message messageSend) throws Exception {
114            next.send(context, messageSend);
115        }
116    
117        public void start() throws Exception {
118            next.start();
119        }
120    
121        public void stop() throws Exception {
122            next.stop();
123        }
124    
125        public List<Subscription> getConsumers() {
126            return next.getConsumers();
127        }
128    
129        /**
130         * Sends a message to the given destination which may be a wildcard
131         *
132         * @param context broker context
133         * @param message message to send
134         * @param destination possibly wildcard destination to send the message to
135         * @throws Exception on error
136         */
137        protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
138            Broker broker = context.getConnectionContext().getBroker();
139            Set<Destination> destinations = broker.getDestinations(destination);
140    
141            for (Destination dest : destinations) {
142                dest.send(context, message.copy());
143            }
144        }
145    
146        public MessageStore getMessageStore() {
147            return next.getMessageStore();
148        }
149    
150        public boolean isProducerFlowControl() {
151            return next.isProducerFlowControl();
152        }
153    
154        public void setProducerFlowControl(boolean value) {
155            next.setProducerFlowControl(value);
156        }
157    
158        public boolean isAlwaysRetroactive() {
159            return next.isAlwaysRetroactive();
160        }
161    
162        public void setAlwaysRetroactive(boolean value) {
163            next.setAlwaysRetroactive(value);
164        }
165    
166        public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
167            next.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
168        }
169    
170        public long getBlockedProducerWarningInterval() {
171            return next.getBlockedProducerWarningInterval();
172        }
173    
174        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
175            next.addProducer(context, info);
176        }
177    
178        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
179            next.removeProducer(context, info);
180        }
181    
182        public int getMaxAuditDepth() {
183            return next.getMaxAuditDepth();
184        }
185    
186        public int getMaxProducersToAudit() {
187            return next.getMaxProducersToAudit();
188        }
189    
190        public boolean isEnableAudit() {
191            return next.isEnableAudit();
192        }
193    
194        public void setEnableAudit(boolean enableAudit) {
195            next.setEnableAudit(enableAudit);
196        }
197    
198        public void setMaxAuditDepth(int maxAuditDepth) {
199            next.setMaxAuditDepth(maxAuditDepth);
200        }
201    
202        public void setMaxProducersToAudit(int maxProducersToAudit) {
203            next.setMaxProducersToAudit(maxProducersToAudit);
204        }
205    
206        public boolean isActive() {
207            return next.isActive();
208        }
209    
210        public int getMaxPageSize() {
211            return next.getMaxPageSize();
212        }
213    
214        public void setMaxPageSize(int maxPageSize) {
215            next.setMaxPageSize(maxPageSize);
216        }
217    
218        public boolean isUseCache() {
219            return next.isUseCache();
220        }
221    
222        public void setUseCache(boolean useCache) {
223            next.setUseCache(useCache);
224        }
225    
226        public int getMinimumMessageSize() {
227            return next.getMinimumMessageSize();
228        }
229    
230        public void setMinimumMessageSize(int minimumMessageSize) {
231            next.setMinimumMessageSize(minimumMessageSize);
232        }
233    
234        public void wakeup() {
235            next.wakeup();
236        }
237    
238        public boolean isLazyDispatch() {
239            return next.isLazyDispatch();
240        }
241    
242        public void setLazyDispatch(boolean value) {
243            next.setLazyDispatch(value);
244        }
245    
246        public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
247            next.messageExpired(context, prefetchSubscription, node);
248        }
249    
250        public boolean iterate() {
251            return next.iterate();
252        }
253    
254        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
255            next.fastProducer(context, producerInfo);
256        }
257    
258        public void isFull(ConnectionContext context, Usage<?> usage) {
259            next.isFull(context, usage);
260        }
261    
262        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
263            next.messageConsumed(context, messageReference);
264        }
265    
266        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
267            next.messageDelivered(context, messageReference);
268        }
269    
270        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
271            next.messageDiscarded(context, sub, messageReference);
272        }
273    
274        public void slowConsumer(ConnectionContext context, Subscription subs) {
275            next.slowConsumer(context, subs);
276        }
277    
278        public void messageExpired(ConnectionContext context, Subscription subs, MessageReference node) {
279            next.messageExpired(context, subs, node);
280        }
281    
282        public int getMaxBrowsePageSize() {
283            return next.getMaxBrowsePageSize();
284        }
285    
286        public void setMaxBrowsePageSize(int maxPageSize) {
287            next.setMaxBrowsePageSize(maxPageSize);
288        }
289    
290        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
291            next.processDispatchNotification(messageDispatchNotification);
292        }
293    
294        public int getCursorMemoryHighWaterMark() {
295            return next.getCursorMemoryHighWaterMark();
296        }
297    
298        public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
299            next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
300        }
301    
302        public boolean isPrioritizedMessages() {
303            return next.isPrioritizedMessages();
304        }
305    
306        public SlowConsumerStrategy getSlowConsumerStrategy() {
307            return next.getSlowConsumerStrategy();
308        }
309    
310        public boolean isDoOptimzeMessageStorage() {
311            return next.isDoOptimzeMessageStorage();
312        }
313    
314        public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
315            next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
316        }
317    
318        @Override
319        public void clearPendingMessages() {
320            next.clearPendingMessages();
321        }
322    
323    }