001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import org.apache.activemq.command.ActiveMQDestination;
020import org.apache.activemq.command.Message;
021import org.apache.activemq.command.ProducerInfo;
022
023/**
024 * This broker filter handles composite destinations. If a broker operation is
025 * invoked using a composite destination, this filter repeats the operation
026 * using each destination of the composite. HRC: I think this filter is
027 * dangerous to use to with the consumer operations. Multiple Subscription
028 * objects will be associated with a single JMS consumer each having a different
029 * idea of what the current pre-fetch dispatch size is. If this is used, then
030 * the client has to expect many more messages to be dispatched than the
031 * pre-fetch setting allows.
032 * 
033 * 
034 */
035public class CompositeDestinationBroker extends BrokerFilter {
036
037    public CompositeDestinationBroker(Broker next) {
038        super(next);
039    }
040
041    /**
042     * A producer may register to send to multiple destinations via a composite
043     * destination.
044     */
045    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
046        // The destination may be null.
047        ActiveMQDestination destination = info.getDestination();
048        if (destination != null && destination.isComposite()) {
049            ActiveMQDestination[] destinations = destination.getCompositeDestinations();
050            for (int i = 0; i < destinations.length; i++) {
051                ProducerInfo copy = info.copy();
052                copy.setDestination(destinations[i]);
053                next.addProducer(context, copy);
054            }
055        } else {
056            next.addProducer(context, info);
057        }
058    }
059
060    /**
061     * A producer may de-register from sending to multiple destinations via a
062     * composite destination.
063     */
064    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
065        // The destination may be null.
066        ActiveMQDestination destination = info.getDestination();
067        if (destination != null && destination.isComposite()) {
068            ActiveMQDestination[] destinations = destination.getCompositeDestinations();
069            for (int i = 0; i < destinations.length; i++) {
070                ProducerInfo copy = info.copy();
071                copy.setDestination(destinations[i]);
072                next.removeProducer(context, copy);
073            }
074        } else {
075            next.removeProducer(context, info);
076        }
077    }
078
079    /**
080     * 
081     */
082    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
083        ActiveMQDestination destination = message.getDestination();
084        if (destination.isComposite()) {
085            ActiveMQDestination[] destinations = destination.getCompositeDestinations();
086            for (int i = 0; i < destinations.length; i++) {
087                if (i != 0) {
088                    message = message.copy();
089                    message.setMemoryUsage(null);
090                }
091                message.setOriginalDestination(destination);
092                message.setDestination(destinations[i]);
093                next.send(producerExchange, message);
094            }
095        } else {
096            next.send(producerExchange, message);
097        }
098    }
099
100}