001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import org.apache.activemq.command.ActiveMQDestination;
020    import org.apache.activemq.command.Message;
021    import org.apache.activemq.command.ProducerInfo;
022    
023    /**
024     * This broker filter handles composite destinations. If a broker operation is
025     * invoked using a composite destination, this filter repeats the operation
026     * using each destination of the composite. HRC: I think this filter is
027     * dangerous to use to with the consumer operations. Multiple Subscription
028     * objects will be associated with a single JMS consumer each having a different
029     * idea of what the current pre-fetch dispatch size is. If this is used, then
030     * the client has to expect many more messages to be dispatched than the
031     * pre-fetch setting allows.
032     * 
033     * 
034     */
035    public class CompositeDestinationBroker extends BrokerFilter {
036    
037        public CompositeDestinationBroker(Broker next) {
038            super(next);
039        }
040    
041        /**
042         * A producer may register to send to multiple destinations via a composite
043         * destination.
044         */
045        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
046            // The destination may be null.
047            ActiveMQDestination destination = info.getDestination();
048            if (destination != null && destination.isComposite()) {
049                ActiveMQDestination[] destinations = destination.getCompositeDestinations();
050                for (int i = 0; i < destinations.length; i++) {
051                    ProducerInfo copy = info.copy();
052                    copy.setDestination(destinations[i]);
053                    next.addProducer(context, copy);
054                }
055            } else {
056                next.addProducer(context, info);
057            }
058        }
059    
060        /**
061         * A producer may de-register from sending to multiple destinations via a
062         * composite destination.
063         */
064        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
065            // The destination may be null.
066            ActiveMQDestination destination = info.getDestination();
067            if (destination != null && destination.isComposite()) {
068                ActiveMQDestination[] destinations = destination.getCompositeDestinations();
069                for (int i = 0; i < destinations.length; i++) {
070                    ProducerInfo copy = info.copy();
071                    copy.setDestination(destinations[i]);
072                    next.removeProducer(context, copy);
073                }
074            } else {
075                next.removeProducer(context, info);
076            }
077        }
078    
079        /**
080         * 
081         */
082        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
083            ActiveMQDestination destination = message.getDestination();
084            if (destination.isComposite()) {
085                ActiveMQDestination[] destinations = destination.getCompositeDestinations();
086                for (int i = 0; i < destinations.length; i++) {
087                    if (i != 0) {
088                        message = message.copy();
089                        message.setMemoryUsage(null);
090                    }
091                    message.setOriginalDestination(destination);
092                    message.setDestination(destinations[i]);
093                    next.send(producerExchange, message);
094                }
095            } else {
096                next.send(producerExchange, message);
097            }
098        }
099    
100    }