001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.virtual;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    
022    import org.apache.activemq.broker.Broker;
023    import org.apache.activemq.broker.ProducerBrokerExchange;
024    import org.apache.activemq.broker.region.Destination;
025    import org.apache.activemq.broker.region.DestinationFilter;
026    import org.apache.activemq.command.ActiveMQDestination;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.filter.MessageEvaluationContext;
029    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
030    
031    /**
032     * Represents a composite {@link Destination} where send()s are replicated to
033     * each Destination instance.
034     * 
035     * 
036     */
037    public class CompositeDestinationFilter extends DestinationFilter {
038    
039        private Collection forwardDestinations;
040        private boolean forwardOnly;
041        private boolean copyMessage;
042    
043        public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage) {
044            super(next);
045            this.forwardDestinations = forwardDestinations;
046            this.forwardOnly = forwardOnly;
047            this.copyMessage = copyMessage;
048        }
049    
050        public void send(ProducerBrokerExchange context, Message message) throws Exception {
051            MessageEvaluationContext messageContext = null;
052    
053            for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
054                ActiveMQDestination destination = null;
055                Object value = iter.next();
056    
057                if (value instanceof FilteredDestination) {
058                    FilteredDestination filteredDestination = (FilteredDestination)value;
059                    if (messageContext == null) {
060                        messageContext = new NonCachedMessageEvaluationContext();
061                        messageContext.setMessageReference(message);
062                    }
063                    messageContext.setDestination(filteredDestination.getDestination());
064                    if (filteredDestination.matches(messageContext)) {
065                        destination = filteredDestination.getDestination();
066                    }
067                } else if (value instanceof ActiveMQDestination) {
068                    destination = (ActiveMQDestination)value;
069                }
070                if (destination == null) {
071                    continue;
072                }
073    
074                Message forwarded_message;
075                if (copyMessage) {
076                    forwarded_message = message.copy();
077                    forwarded_message.setDestination(destination);
078                }
079                else {
080                    forwarded_message = message;
081                }
082    
083                // Send it back through the region broker for routing.
084                context.setMutable(true);
085                Broker regionBroker = context.getConnectionContext().getBroker().getBrokerService().getRegionBroker();
086                regionBroker.send(context, forwarded_message);
087            }
088            if (!forwardOnly) {
089                super.send(context, message);
090            }
091        }
092    }