001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicBoolean;
021import java.util.concurrent.atomic.AtomicLong;
022
023import org.apache.activemq.broker.region.Destination;
024import org.apache.activemq.broker.region.Region;
025import org.apache.activemq.command.Message;
026import org.apache.activemq.command.MessageId;
027import org.apache.activemq.state.ProducerState;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Holds internal state in the broker for a MessageProducer
033 */
034public class ProducerBrokerExchange {
035
036    private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
037    private ConnectionContext connectionContext;
038    private Destination regionDestination;
039    private Region region;
040    private ProducerState producerState;
041    private boolean mutable = true;
042    private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
043    private boolean auditProducerSequenceIds;
044    private boolean isNetworkProducer;
045    private BrokerService brokerService;
046    private final FlowControlInfo flowControlInfo = new FlowControlInfo();
047
048    public ProducerBrokerExchange() {
049    }
050
051    public ProducerBrokerExchange copy() {
052        ProducerBrokerExchange rc = new ProducerBrokerExchange();
053        rc.connectionContext = connectionContext.copy();
054        rc.regionDestination = regionDestination;
055        rc.region = region;
056        rc.producerState = producerState;
057        rc.mutable = mutable;
058        return rc;
059    }
060
061
062    /**
063     * @return the connectionContext
064     */
065    public ConnectionContext getConnectionContext() {
066        return this.connectionContext;
067    }
068
069    /**
070     * @param connectionContext the connectionContext to set
071     */
072    public void setConnectionContext(ConnectionContext connectionContext) {
073        this.connectionContext = connectionContext;
074    }
075
076    /**
077     * @return the mutable
078     */
079    public boolean isMutable() {
080        return this.mutable;
081    }
082
083    /**
084     * @param mutable the mutable to set
085     */
086    public void setMutable(boolean mutable) {
087        this.mutable = mutable;
088    }
089
090    /**
091     * @return the regionDestination
092     */
093    public Destination getRegionDestination() {
094        return this.regionDestination;
095    }
096
097    /**
098     * @param regionDestination the regionDestination to set
099     */
100    public void setRegionDestination(Destination regionDestination) {
101        this.regionDestination = regionDestination;
102    }
103
104    /**
105     * @return the region
106     */
107    public Region getRegion() {
108        return this.region;
109    }
110
111    /**
112     * @param region the region to set
113     */
114    public void setRegion(Region region) {
115        this.region = region;
116    }
117
118    /**
119     * @return the producerState
120     */
121    public ProducerState getProducerState() {
122        return this.producerState;
123    }
124
125    /**
126     * @param producerState the producerState to set
127     */
128    public void setProducerState(ProducerState producerState) {
129        this.producerState = producerState;
130    }
131
132    /**
133     * Enforce duplicate suppression using info from persistence adapter
134     *
135     * @return false if message should be ignored as a duplicate
136     */
137    public boolean canDispatch(Message messageSend) {
138        boolean canDispatch = true;
139        if (auditProducerSequenceIds && messageSend.isPersistent()) {
140            final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
141            if (isNetworkProducer) {
142                //  messages are multiplexed on this producer so we need to query the persistenceAdapter
143                long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
144                if (producerSequenceId <= lastStoredForMessageProducer) {
145                    canDispatch = false;
146                    LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
147                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer
148                    });
149                }
150            } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
151                canDispatch = false;
152                if (messageSend.isInTransaction()) {
153                    LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
154                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
155                    });
156                } else {
157                    LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
158                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
159                    });
160
161                }
162            } else {
163                // track current so we can suppress duplicates later in the stream
164                lastSendSequenceNumber.set(producerSequenceId);
165            }
166        }
167        return canDispatch;
168    }
169
170    private long getStoredSequenceIdForMessage(MessageId messageId) {
171        try {
172            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
173        } catch (IOException ignored) {
174            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
175        }
176        return -1;
177    }
178
179    public void setLastStoredSequenceId(long l) {
180        auditProducerSequenceIds = true;
181        if (connectionContext.isNetworkConnection()) {
182            brokerService = connectionContext.getBroker().getBrokerService();
183            isNetworkProducer = true;
184        }
185        lastSendSequenceNumber.set(l);
186        LOG.debug("last stored sequence id set: {}", l);
187    }
188
189    public void incrementSend() {
190        flowControlInfo.incrementSend();
191    }
192
193    public void blockingOnFlowControl(boolean blockingOnFlowControl) {
194        flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
195    }
196
197    public void incrementTimeBlocked(Destination destination, long timeBlocked) {
198        flowControlInfo.incrementTimeBlocked(timeBlocked);
199    }
200
201
202    public boolean isBlockedForFlowControl() {
203        return flowControlInfo.isBlockingOnFlowControl();
204    }
205
206    public void resetFlowControl() {
207        flowControlInfo.reset();
208    }
209
210    public long getTotalTimeBlocked() {
211        return flowControlInfo.getTotalTimeBlocked();
212    }
213
214    public int getPercentageBlocked() {
215        double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
216        return (int) value * 100;
217    }
218
219
220    public static class FlowControlInfo {
221        private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
222        private AtomicLong totalSends = new AtomicLong();
223        private AtomicLong sendsBlocked = new AtomicLong();
224        private AtomicLong totalTimeBlocked = new AtomicLong();
225
226
227        public boolean isBlockingOnFlowControl() {
228            return blockingOnFlowControl.get();
229        }
230
231        public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
232            this.blockingOnFlowControl.set(blockingOnFlowControl);
233            if (blockingOnFlowControl) {
234                incrementSendBlocked();
235            }
236        }
237
238
239        public long getTotalSends() {
240            return totalSends.get();
241        }
242
243        public void incrementSend() {
244            this.totalSends.incrementAndGet();
245        }
246
247        public long getSendsBlocked() {
248            return sendsBlocked.get();
249        }
250
251        public void incrementSendBlocked() {
252            this.sendsBlocked.incrementAndGet();
253        }
254
255        public long getTotalTimeBlocked() {
256            return totalTimeBlocked.get();
257        }
258
259        public void incrementTimeBlocked(long time) {
260            this.totalTimeBlocked.addAndGet(time);
261        }
262
263        public void reset() {
264            blockingOnFlowControl.set(false);
265            totalSends.set(0);
266            sendsBlocked.set(0);
267            totalTimeBlocked.set(0);
268
269        }
270    }
271}