001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import org.apache.activemq.broker.region.Destination;
020    import org.apache.activemq.broker.region.Region;
021    import org.apache.activemq.command.Message;
022    import org.apache.activemq.command.MessageId;
023    import org.apache.activemq.state.ProducerState;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    import java.io.IOException;
028    import java.util.concurrent.atomic.AtomicLong;
029    
030    /**
031     * Holds internal state in the broker for a MessageProducer
032     * 
033     * 
034     */
035    public class ProducerBrokerExchange {
036    
037        private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
038        private ConnectionContext connectionContext;
039        private Destination regionDestination;
040        private Region region;
041        private ProducerState producerState;
042        private boolean mutable = true;
043        private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
044        private boolean auditProducerSequenceIds;
045        private boolean isNetworkProducer;
046        private BrokerService brokerService;
047    
048        public ProducerBrokerExchange() {
049        }
050    
051        public ProducerBrokerExchange copy() {
052            ProducerBrokerExchange rc = new ProducerBrokerExchange();
053            rc.connectionContext = connectionContext.copy();
054            rc.regionDestination = regionDestination;
055            rc.region = region;
056            rc.producerState = producerState;
057            rc.mutable = mutable;
058            return rc;
059        }
060    
061        
062        /**
063         * @return the connectionContext
064         */
065        public ConnectionContext getConnectionContext() {
066            return this.connectionContext;
067        }
068    
069        /**
070         * @param connectionContext the connectionContext to set
071         */
072        public void setConnectionContext(ConnectionContext connectionContext) {
073            this.connectionContext = connectionContext;
074        }
075    
076        /**
077         * @return the mutable
078         */
079        public boolean isMutable() {
080            return this.mutable;
081        }
082    
083        /**
084         * @param mutable the mutable to set
085         */
086        public void setMutable(boolean mutable) {
087            this.mutable = mutable;
088        }
089    
090        /**
091         * @return the regionDestination
092         */
093        public Destination getRegionDestination() {
094            return this.regionDestination;
095        }
096    
097        /**
098         * @param regionDestination the regionDestination to set
099         */
100        public void setRegionDestination(Destination regionDestination) {
101            this.regionDestination = regionDestination;
102        }
103    
104        /**
105         * @return the region
106         */
107        public Region getRegion() {
108            return this.region;
109        }
110    
111        /**
112         * @param region the region to set
113         */
114        public void setRegion(Region region) {
115            this.region = region;
116        }
117    
118        /**
119         * @return the producerState
120         */
121        public ProducerState getProducerState() {
122            return this.producerState;
123        }
124    
125        /**
126         * @param producerState the producerState to set
127         */
128        public void setProducerState(ProducerState producerState) {
129            this.producerState = producerState;
130        }
131    
132        /**
133         * Enforce duplicate suppression using info from persistence adapter
134         * @param messageSend
135         * @return false if message should be ignored as a duplicate
136         */
137        public boolean canDispatch(Message messageSend) {
138            boolean canDispatch = true;
139            if (auditProducerSequenceIds && messageSend.isPersistent()) {
140                final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
141                if (isNetworkProducer) {
142                    //  messages are multiplexed on this producer so we need to query the persistenceAdapter
143                    long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
144                    if (producerSequenceId <= lastStoredForMessageProducer) {
145                        canDispatch = false;
146                        if (LOG.isDebugEnabled()) {
147                            LOG.debug("suppressing duplicate message send  [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] from network producer with producerSequenceId ["
148                                    + producerSequenceId + "] less than last stored: "  + lastStoredForMessageProducer);
149                        }
150                    }
151                } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
152                    canDispatch = false;
153                    if (LOG.isDebugEnabled()) {
154                        LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] with producerSequenceId ["
155                                + producerSequenceId + "] less than last stored: "  + lastSendSequenceNumber);
156                    }
157                } else {
158                    // track current so we can suppress duplicates later in the stream
159                    lastSendSequenceNumber.set(producerSequenceId);
160                }
161            }
162            return canDispatch;
163        }
164    
165        private long getStoredSequenceIdForMessage(MessageId messageId) {
166            try {
167                return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
168           } catch (IOException ignored) {
169                LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
170            }
171            return -1;
172        }
173    
174        public void setLastStoredSequenceId(long l) {
175            auditProducerSequenceIds = true;
176            if (connectionContext.isNetworkConnection()) {
177                brokerService = connectionContext.getBroker().getBrokerService();
178                isNetworkProducer = true;
179            }
180            lastSendSequenceNumber.set(l);
181            LOG.debug("last stored sequence id set: " + l);
182        }
183    }