001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import org.apache.activemq.broker.region.Destination;
020import org.apache.activemq.broker.region.Region;
021import org.apache.activemq.command.Message;
022import org.apache.activemq.command.MessageId;
023import org.apache.activemq.state.ProducerState;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import java.io.IOException;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicLong;
030
031/**
032 * Holds internal state in the broker for a MessageProducer
033 */
034public class ProducerBrokerExchange {
035
036    private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
037    private ConnectionContext connectionContext;
038    private Destination regionDestination;
039    private Region region;
040    private ProducerState producerState;
041    private boolean mutable = true;
042    private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
043    private boolean auditProducerSequenceIds;
044    private boolean isNetworkProducer;
045    private BrokerService brokerService;
046    private FlowControlInfo flowControlInfo = new FlowControlInfo();
047
048    public ProducerBrokerExchange() {
049    }
050
051    public ProducerBrokerExchange copy() {
052        ProducerBrokerExchange rc = new ProducerBrokerExchange();
053        rc.connectionContext = connectionContext.copy();
054        rc.regionDestination = regionDestination;
055        rc.region = region;
056        rc.producerState = producerState;
057        rc.mutable = mutable;
058        rc.flowControlInfo = flowControlInfo;
059        return rc;
060    }
061
062
063    /**
064     * @return the connectionContext
065     */
066    public ConnectionContext getConnectionContext() {
067        return this.connectionContext;
068    }
069
070    /**
071     * @param connectionContext the connectionContext to set
072     */
073    public void setConnectionContext(ConnectionContext connectionContext) {
074        this.connectionContext = connectionContext;
075    }
076
077    /**
078     * @return the mutable
079     */
080    public boolean isMutable() {
081        return this.mutable;
082    }
083
084    /**
085     * @param mutable the mutable to set
086     */
087    public void setMutable(boolean mutable) {
088        this.mutable = mutable;
089    }
090
091    /**
092     * @return the regionDestination
093     */
094    public Destination getRegionDestination() {
095        return this.regionDestination;
096    }
097
098    /**
099     * @param regionDestination the regionDestination to set
100     */
101    public void setRegionDestination(Destination regionDestination) {
102        this.regionDestination = regionDestination;
103    }
104
105    /**
106     * @return the region
107     */
108    public Region getRegion() {
109        return this.region;
110    }
111
112    /**
113     * @param region the region to set
114     */
115    public void setRegion(Region region) {
116        this.region = region;
117    }
118
119    /**
120     * @return the producerState
121     */
122    public ProducerState getProducerState() {
123        return this.producerState;
124    }
125
126    /**
127     * @param producerState the producerState to set
128     */
129    public void setProducerState(ProducerState producerState) {
130        this.producerState = producerState;
131    }
132
133    /**
134     * Enforce duplicate suppression using info from persistence adapter
135     *
136     * @return false if message should be ignored as a duplicate
137     */
138    public boolean canDispatch(Message messageSend) {
139        boolean canDispatch = true;
140        if (auditProducerSequenceIds && messageSend.isPersistent()) {
141            final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
142            if (isNetworkProducer) {
143                //  messages are multiplexed on this producer so we need to query the persistenceAdapter
144                long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
145                if (producerSequenceId <= lastStoredForMessageProducer) {
146                    canDispatch = false;
147                    LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
148                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer
149                    });
150                }
151            } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
152                canDispatch = false;
153                if (messageSend.isInTransaction()) {
154                    LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
155                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
156                    });
157                } else {
158                    LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{
159                            (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber
160                    });
161
162                }
163            } else {
164                // track current so we can suppress duplicates later in the stream
165                lastSendSequenceNumber.set(producerSequenceId);
166            }
167        }
168        return canDispatch;
169    }
170
171    private long getStoredSequenceIdForMessage(MessageId messageId) {
172        try {
173            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
174        } catch (IOException ignored) {
175            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
176        }
177        return -1;
178    }
179
180    public void setLastStoredSequenceId(long l) {
181        auditProducerSequenceIds = true;
182        if (connectionContext.isNetworkConnection()) {
183            brokerService = connectionContext.getBroker().getBrokerService();
184            isNetworkProducer = true;
185        }
186        lastSendSequenceNumber.set(l);
187        LOG.debug("last stored sequence id set: {}", l);
188    }
189
190    public void incrementSend() {
191        flowControlInfo.incrementSend();
192    }
193
194    public void blockingOnFlowControl(boolean blockingOnFlowControl) {
195        flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
196    }
197
198    public void incrementTimeBlocked(Destination destination, long timeBlocked) {
199        flowControlInfo.incrementTimeBlocked(timeBlocked);
200    }
201
202
203    public boolean isBlockedForFlowControl() {
204        return flowControlInfo.isBlockingOnFlowControl();
205    }
206
207    public void resetFlowControl() {
208        flowControlInfo.reset();
209    }
210
211    public long getTotalTimeBlocked() {
212        return flowControlInfo.getTotalTimeBlocked();
213    }
214
215    public int getPercentageBlocked() {
216        double value = flowControlInfo.getTotalSends() == 0 ? 0 : flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
217        return (int) value * 100;
218    }
219
220
221    public static class FlowControlInfo {
222        private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
223        private AtomicLong totalSends = new AtomicLong();
224        private AtomicLong sendsBlocked = new AtomicLong();
225        private AtomicLong totalTimeBlocked = new AtomicLong();
226
227
228        public boolean isBlockingOnFlowControl() {
229            return blockingOnFlowControl.get();
230        }
231
232        public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
233            this.blockingOnFlowControl.set(blockingOnFlowControl);
234            if (blockingOnFlowControl) {
235                incrementSendBlocked();
236            }
237        }
238
239
240        public long getTotalSends() {
241            return totalSends.get();
242        }
243
244        public void incrementSend() {
245            this.totalSends.incrementAndGet();
246        }
247
248        public long getSendsBlocked() {
249            return sendsBlocked.get();
250        }
251
252        public void incrementSendBlocked() {
253            this.sendsBlocked.incrementAndGet();
254        }
255
256        public long getTotalTimeBlocked() {
257            return totalTimeBlocked.get();
258        }
259
260        public void incrementTimeBlocked(long time) {
261            this.totalTimeBlocked.addAndGet(time);
262        }
263
264        public void reset() {
265            blockingOnFlowControl.set(false);
266            totalSends.set(0);
267            sendsBlocked.set(0);
268            totalTimeBlocked.set(0);
269
270        }
271    }
272}