001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.util.concurrent.atomic.AtomicLong;
020
021import org.apache.activemq.state.CommandVisitor;
022
023/**
024 * 
025 * @openwire:marshaller code="6"
026 * 
027 */
028public class ProducerInfo extends BaseCommand {
029
030    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO;
031
032    protected ProducerId producerId;
033    protected ActiveMQDestination destination;
034    protected BrokerId[] brokerPath;
035    protected boolean dispatchAsync;
036    protected int windowSize;
037    protected AtomicLong sentCount = new AtomicLong();
038
039    public ProducerInfo() {
040    }
041
042    public ProducerInfo(ProducerId producerId) {
043        this.producerId = producerId;
044    }
045
046    public ProducerInfo(SessionInfo sessionInfo, long producerId) {
047        this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId);
048    }
049
050    public ProducerInfo copy() {
051        ProducerInfo info = new ProducerInfo();
052        copy(info);
053        return info;
054    }
055
056    public void copy(ProducerInfo info) {
057        super.copy(info);
058        info.producerId = producerId;
059        info.destination = destination;
060    }
061
062    public byte getDataStructureType() {
063        return DATA_STRUCTURE_TYPE;
064    }
065
066    /**
067     * @openwire:property version=1 cache=true
068     */
069    public ProducerId getProducerId() {
070        return producerId;
071    }
072
073    public void setProducerId(ProducerId producerId) {
074        this.producerId = producerId;
075    }
076
077    /**
078     * @openwire:property version=1 cache=true
079     */
080    public ActiveMQDestination getDestination() {
081        return destination;
082    }
083
084    public void setDestination(ActiveMQDestination destination) {
085        this.destination = destination;
086    }
087
088    public RemoveInfo createRemoveCommand() {
089        RemoveInfo command = new RemoveInfo(getProducerId());
090        command.setResponseRequired(isResponseRequired());
091        return command;
092    }
093
094    /**
095     * The route of brokers the command has moved through.
096     * 
097     * @openwire:property version=1 cache=true
098     */
099    public BrokerId[] getBrokerPath() {
100        return brokerPath;
101    }
102
103    public void setBrokerPath(BrokerId[] brokerPath) {
104        this.brokerPath = brokerPath;
105    }
106
107    public Response visit(CommandVisitor visitor) throws Exception {
108        return visitor.processAddProducer(this);
109    }
110
111    /**
112     * If the broker should dispatch messages from this producer async. Since
113     * sync dispatch could potentally block the producer thread, this could be
114     * an important setting for the producer.
115     * 
116     * @openwire:property version=2
117     */
118    public boolean isDispatchAsync() {
119        return dispatchAsync;
120    }
121
122    public void setDispatchAsync(boolean dispatchAsync) {
123        this.dispatchAsync = dispatchAsync;
124    }
125
126    /**
127     * Used to configure the producer window size. A producer will send up to
128     * the configured window size worth of payload data to the broker before
129     * waiting for an Ack that allows him to send more.
130     * 
131     * @openwire:property version=3
132     */
133    public int getWindowSize() {
134        return windowSize;
135    }
136
137    public void setWindowSize(int windowSize) {
138        this.windowSize = windowSize;
139    }
140
141    public long getSentCount(){
142        return sentCount.get();
143    }
144
145    public void incrementSentCount(){
146        sentCount.incrementAndGet();
147    }
148
149    public void resetSentCount(){
150        sentCount.set(0);
151    }
152
153}