001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import javax.jms.Destination;
024    import javax.jms.IllegalStateException;
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ProducerAck;
031    import org.apache.activemq.command.ProducerId;
032    import org.apache.activemq.command.ProducerInfo;
033    import org.apache.activemq.management.JMSProducerStatsImpl;
034    import org.apache.activemq.management.StatsCapable;
035    import org.apache.activemq.management.StatsImpl;
036    import org.apache.activemq.usage.MemoryUsage;
037    import org.apache.activemq.util.IntrospectionSupport;
038    
039    /**
040     * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
041     * destination. A <CODE>MessageProducer</CODE> object is created by passing a
042     * <CODE>Destination</CODE> object to a message-producer creation method
043     * supplied by a session.
044     * <P>
045     * <CODE>MessageProducer</CODE> is the parent interface for all message
046     * producers.
047     * <P>
048     * A client also has the option of creating a message producer without supplying
049     * a destination. In this case, a destination must be provided with every send
050     * operation. A typical use for this kind of message producer is to send replies
051     * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
052     * <P>
053     * A client can specify a default delivery mode, priority, and time to live for
054     * messages sent by a message producer. It can also specify the delivery mode,
055     * priority, and time to live for an individual message.
056     * <P>
057     * A client can specify a time-to-live value in milliseconds for each message it
058     * sends. This value defines a message expiration time that is the sum of the
059     * message's time-to-live and the GMT when it is sent (for transacted sends,
060     * this is the time the client sends the message, not the time the transaction
061     * is committed).
062     * <P>
063     * A JMS provider should do its best to expire messages accurately; however, the
064     * JMS API does not define the accuracy provided.
065     * 
066     * @version $Revision: 1.14 $
067     * @see javax.jms.TopicPublisher
068     * @see javax.jms.QueueSender
069     * @see javax.jms.Session#createProducer
070     */
071    public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
072    
073        protected ProducerInfo info;
074        protected boolean closed;
075    
076        private JMSProducerStatsImpl stats;
077        private AtomicLong messageSequence;
078        private long startTime;
079        private MessageTransformer transformer;
080        private MemoryUsage producerWindow;
081    
082        protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
083            super(session);
084            this.info = new ProducerInfo(producerId);
085            this.info.setWindowSize(session.connection.getProducerWindowSize());
086            if (destination != null && destination.getOptions() != null) {
087                Map<String, String> options = new HashMap<String, String>(destination.getOptions());
088                IntrospectionSupport.setProperties(this.info, options, "producer.");
089            }
090            this.info.setDestination(destination);
091    
092            // Enable producer window flow control if protocol > 3 and the window
093            // size > 0
094            if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
095                producerWindow = new MemoryUsage("Producer Window: " + producerId);
096                producerWindow.setLimit(this.info.getWindowSize());
097            }
098    
099            this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
100            this.defaultPriority = Message.DEFAULT_PRIORITY;
101            this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
102            this.startTime = System.currentTimeMillis();
103            this.messageSequence = new AtomicLong(0);
104            this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
105            this.session.addProducer(this);
106            this.session.asyncSendPacket(info);
107            this.setSendTimeout(sendTimeout);
108            setTransformer(session.getTransformer());
109        }
110    
111        public StatsImpl getStats() {
112            return stats;
113        }
114    
115        public JMSProducerStatsImpl getProducerStats() {
116            return stats;
117        }
118    
119        /**
120         * Gets the destination associated with this <CODE>MessageProducer</CODE>.
121         * 
122         * @return this producer's <CODE>Destination/ <CODE>
123         * @throws JMSException if the JMS provider fails to close the producer due to
124         *                      some internal error.
125         * @since 1.1
126         */
127        public Destination getDestination() throws JMSException {
128            checkClosed();
129            return this.info.getDestination();
130        }
131    
132        /**
133         * Closes the message producer.
134         * <P>
135         * Since a provider may allocate some resources on behalf of a <CODE>
136         * MessageProducer</CODE>
137         * outside the Java virtual machine, clients should close them when they are
138         * not needed. Relying on garbage collection to eventually reclaim these
139         * resources may not be timely enough.
140         * 
141         * @throws JMSException if the JMS provider fails to close the producer due
142         *                 to some internal error.
143         */
144        public void close() throws JMSException {
145            if (!closed) {
146                dispose();
147                this.session.asyncSendPacket(info.createRemoveCommand());
148            }
149        }
150    
151        public void dispose() {
152            if (!closed) {
153                this.session.removeProducer(this);
154                closed = true;
155            }
156        }
157    
158        /**
159         * Check if the instance of this producer has been closed.
160         * 
161         * @throws IllegalStateException
162         */
163        protected void checkClosed() throws IllegalStateException {
164            if (closed) {
165                throw new IllegalStateException("The producer is closed");
166            }
167        }
168    
169        /**
170         * Sends a message to a destination for an unidentified message producer,
171         * specifying delivery mode, priority and time to live.
172         * <P>
173         * Typically, a message producer is assigned a destination at creation time;
174         * however, the JMS API also supports unidentified message producers, which
175         * require that the destination be supplied every time a message is sent.
176         * 
177         * @param destination the destination to send this message to
178         * @param message the message to send
179         * @param deliveryMode the delivery mode to use
180         * @param priority the priority for this message
181         * @param timeToLive the message's lifetime (in milliseconds)
182         * @throws JMSException if the JMS provider fails to send the message due to
183         *                 some internal error.
184         * @throws UnsupportedOperationException if an invalid destination is
185         *                 specified.
186         * @throws InvalidDestinationException if a client uses this method with an
187         *                 invalid destination.
188         * @see javax.jms.Session#createProducer
189         * @since 1.1
190         */
191        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
192            checkClosed();
193            if (destination == null) {
194                if (info.getDestination() == null) {
195                    throw new UnsupportedOperationException("A destination must be specified.");
196                }
197                throw new InvalidDestinationException("Don't understand null destinations");
198            }
199    
200            ActiveMQDestination dest;
201            if (destination == info.getDestination()) {
202                dest = (ActiveMQDestination)destination;
203            } else if (info.getDestination() == null) {
204                dest = ActiveMQDestination.transform(destination);
205            } else {
206                throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
207            }
208            if (dest == null) {
209                throw new JMSException("No destination specified");
210            }
211    
212            if (transformer != null) {
213                Message transformedMessage = transformer.producerTransform(session, this, message);
214                if (transformedMessage != null) {
215                    message = transformedMessage;
216                }
217            }
218    
219            if (producerWindow != null) {
220                try {
221                    producerWindow.waitForSpace();
222                } catch (InterruptedException e) {
223                    throw new JMSException("Send aborted due to thread interrupt.");
224                }
225            }
226    
227            this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
228    
229            stats.onMessage();
230        }
231    
232        public MessageTransformer getTransformer() {
233            return transformer;
234        }
235    
236        /**
237         * Sets the transformer used to transform messages before they are sent on
238         * to the JMS bus
239         */
240        public void setTransformer(MessageTransformer transformer) {
241            this.transformer = transformer;
242        }
243    
244        /**
245         * @return the time in milli second when this object was created.
246         */
247        protected long getStartTime() {
248            return this.startTime;
249        }
250    
251        /**
252         * @return Returns the messageSequence.
253         */
254        protected long getMessageSequence() {
255            return messageSequence.incrementAndGet();
256        }
257    
258        /**
259         * @param messageSequence The messageSequence to set.
260         */
261        protected void setMessageSequence(AtomicLong messageSequence) {
262            this.messageSequence = messageSequence;
263        }
264    
265        /**
266         * @return Returns the info.
267         */
268        protected ProducerInfo getProducerInfo() {
269            return this.info != null ? this.info : null;
270        }
271    
272        /**
273         * @param info The info to set
274         */
275        protected void setProducerInfo(ProducerInfo info) {
276            this.info = info;
277        }
278    
279        public String toString() {
280            return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
281        }
282    
283        public void onProducerAck(ProducerAck pa) {
284            if (this.producerWindow != null) {
285                this.producerWindow.decreaseUsage(pa.getSize());
286            }
287        }
288    
289    }