001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020
021 import org.apache.activemq.command.ActiveMQDestination;
022 import org.apache.activemq.command.ConsumerId;
023 import org.apache.activemq.command.ConsumerInfo;
024 import org.apache.activemq.filter.DestinationFilter;
025 import org.apache.activemq.transport.Transport;
026 import org.slf4j.Logger;
027 import org.slf4j.LoggerFactory;
028
029 /**
030 * Consolidates subscriptions
031 */
032 public class DurableConduitBridge extends ConduitBridge {
033 private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
034
035 /**
036 * Constructor
037 *
038 * @param configuration
039 *
040 * @param localBroker
041 * @param remoteBroker
042 */
043 public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
044 Transport remoteBroker) {
045 super(configuration, localBroker, remoteBroker);
046 }
047
048 /**
049 * Subscriptions for these destinations are always created
050 *
051 */
052 protected void setupStaticDestinations() {
053 super.setupStaticDestinations();
054 ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
055 if (dests != null) {
056 for (ActiveMQDestination dest : dests) {
057 if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
058 DemandSubscription sub = createDemandSubscription(dest);
059 if (dest.isTopic()) {
060 sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
061 }
062 try {
063 addSubscription(sub);
064 } catch (IOException e) {
065 LOG.error("Failed to add static destination " + dest, e);
066 }
067 if (LOG.isTraceEnabled()) {
068 LOG.trace("Forwarding messages for durable destination: " + dest);
069 }
070 }
071 }
072 }
073 }
074
075 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
076 if (addToAlreadyInterestedConsumers(info)) {
077 return null; // don't want this subscription added
078 }
079 //add our original id to ourselves
080 info.addNetworkConsumerId(info.getConsumerId());
081
082 if (info.isDurable()) {
083 // set the subscriber name to something reproducible
084 info.setSubscriptionName(getSubscriberName(info.getDestination()));
085 // and override the consumerId with something unique so that it won't
086 // be removed if the durable subscriber (at the other end) goes away
087 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
088 consumerIdGenerator.getNextSequenceId()));
089 }
090 info.setSelector(null);
091 return doCreateDemandSubscription(info);
092 }
093
094 protected String getSubscriberName(ActiveMQDestination dest) {
095 String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
096 return subscriberName;
097 }
098
099 protected boolean doesConsumerExist(ActiveMQDestination dest) {
100 DestinationFilter filter = DestinationFilter.parseFilter(dest);
101 for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
102 if (filter.matches(ds.getLocalInfo().getDestination())) {
103 return true;
104 }
105 }
106 return false;
107 }
108 }