001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.util.Set;
020 import java.util.concurrent.CopyOnWriteArraySet;
021 import java.util.concurrent.atomic.AtomicBoolean;
022 import java.util.concurrent.atomic.AtomicInteger;
023
024 import org.apache.activemq.command.ConsumerId;
025 import org.apache.activemq.command.ConsumerInfo;
026 import org.apache.activemq.command.NetworkBridgeFilter;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * Represents a network bridge interface
032 */
033 public class DemandSubscription {
034 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class);
035
036 private final ConsumerInfo remoteInfo;
037 private final ConsumerInfo localInfo;
038 private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
039 private final AtomicInteger dispatched = new AtomicInteger(0);
040 private final AtomicBoolean activeWaiter = new AtomicBoolean();
041
042 private NetworkBridgeFilter networkBridgeFilter;
043
044 DemandSubscription(ConsumerInfo info) {
045 remoteInfo = info;
046 localInfo = info.copy();
047 localInfo.setNetworkSubscription(true);
048 remoteSubsIds.add(info.getConsumerId());
049 }
050
051 /**
052 * Increment the consumers associated with this subscription
053 *
054 * @param id
055 * @return true if added
056 */
057 public boolean add(ConsumerId id) {
058 return remoteSubsIds.add(id);
059 }
060
061 /**
062 * Increment the consumers associated with this subscription
063 *
064 * @param id
065 * @return true if removed
066 */
067 public boolean remove(ConsumerId id) {
068 return remoteSubsIds.remove(id);
069 }
070
071 /**
072 * @return true if there are no interested consumers
073 */
074 public boolean isEmpty() {
075 return remoteSubsIds.isEmpty();
076 }
077
078 public int size() {
079 return remoteSubsIds.size();
080 }
081 /**
082 * @return Returns the localInfo.
083 */
084 public ConsumerInfo getLocalInfo() {
085 return localInfo;
086 }
087
088 /**
089 * @return Returns the remoteInfo.
090 */
091 public ConsumerInfo getRemoteInfo() {
092 return remoteInfo;
093 }
094
095 public void waitForCompletion() {
096 if (dispatched.get() > 0) {
097 if (LOG.isDebugEnabled()) {
098 LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get());
099 }
100 activeWaiter.set(true);
101 if (dispatched.get() > 0) {
102 synchronized (activeWaiter) {
103 try {
104 activeWaiter.wait();
105 } catch (InterruptedException ignored) {
106 }
107 }
108 if (this.dispatched.get() > 0) {
109 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, " +
110 "expect potentially " + this.dispatched.get() + " duplicate deliveried");
111 }
112 }
113 }
114 }
115
116 public void decrementOutstandingResponses() {
117 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
118 synchronized (activeWaiter) {
119 activeWaiter.notifyAll();
120 }
121 }
122 }
123
124 public boolean incrementOutstandingResponses() {
125 dispatched.incrementAndGet();
126 if (activeWaiter.get()) {
127 decrementOutstandingResponses();
128 return false;
129 }
130 return true;
131 }
132
133 public NetworkBridgeFilter getNetworkBridgeFilter() {
134 return networkBridgeFilter;
135 }
136
137 public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
138 this.networkBridgeFilter = networkBridgeFilter;
139 }
140 }