001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.atomic.AtomicLong;
022    import javax.jms.Destination;
023    import javax.jms.IllegalStateException;
024    import javax.jms.InvalidDestinationException;
025    import javax.jms.JMSException;
026    import javax.jms.Message;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.command.ProducerAck;
029    import org.apache.activemq.command.ProducerId;
030    import org.apache.activemq.command.ProducerInfo;
031    import org.apache.activemq.management.JMSProducerStatsImpl;
032    import org.apache.activemq.management.StatsCapable;
033    import org.apache.activemq.management.StatsImpl;
034    import org.apache.activemq.usage.MemoryUsage;
035    import org.apache.activemq.util.IntrospectionSupport;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    /**
040     * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
041     * destination. A <CODE>MessageProducer</CODE> object is created by passing a
042     * <CODE>Destination</CODE> object to a message-producer creation method
043     * supplied by a session.
044     * <P>
045     * <CODE>MessageProducer</CODE> is the parent interface for all message
046     * producers.
047     * <P>
048     * A client also has the option of creating a message producer without supplying
049     * a destination. In this case, a destination must be provided with every send
050     * operation. A typical use for this kind of message producer is to send replies
051     * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
052     * <P>
053     * A client can specify a default delivery mode, priority, and time to live for
054     * messages sent by a message producer. It can also specify the delivery mode,
055     * priority, and time to live for an individual message.
056     * <P>
057     * A client can specify a time-to-live value in milliseconds for each message it
058     * sends. This value defines a message expiration time that is the sum of the
059     * message's time-to-live and the GMT when it is sent (for transacted sends,
060     * this is the time the client sends the message, not the time the transaction
061     * is committed).
062     * <P>
063     * A JMS provider should do its best to expire messages accurately; however, the
064     * JMS API does not define the accuracy provided.
065     *
066     *
067     * @see javax.jms.TopicPublisher
068     * @see javax.jms.QueueSender
069     * @see javax.jms.Session#createProducer
070     */
071    public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
072    
073        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducer.class);
074    
075        protected ProducerInfo info;
076        protected boolean closed;
077    
078        private final JMSProducerStatsImpl stats;
079        private AtomicLong messageSequence;
080        private final long startTime;
081        private MessageTransformer transformer;
082        private MemoryUsage producerWindow;
083    
084        protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
085            super(session);
086            this.info = new ProducerInfo(producerId);
087            this.info.setWindowSize(session.connection.getProducerWindowSize());
088            // Allows the options on the destination to configure the producerInfo
089            if (destination != null && destination.getOptions() != null) {
090                Map<String, Object> options = IntrospectionSupport.extractProperties(
091                    new HashMap<String, Object>(destination.getOptions()), "producer.");
092                IntrospectionSupport.setProperties(this.info, options);
093                if (options.size() > 0) {
094                    String msg = "There are " + options.size()
095                        + " producer options that couldn't be set on the producer."
096                        + " Check the options are spelled correctly."
097                        + " Unknown parameters=[" + options + "]."
098                        + " This producer cannot be started.";
099                    LOG.warn(msg);
100                    throw new ConfigurationException(msg);
101                }
102            }
103    
104            this.info.setDestination(destination);
105    
106            // Enable producer window flow control if protocol > 3 and the window
107            // size > 0
108            if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
109                producerWindow = new MemoryUsage("Producer Window: " + producerId);
110                producerWindow.setExecutor(session.getConnectionExecutor());
111                producerWindow.setLimit(this.info.getWindowSize());
112                producerWindow.start();
113            }
114    
115            this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
116            this.defaultPriority = Message.DEFAULT_PRIORITY;
117            this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
118            this.startTime = System.currentTimeMillis();
119            this.messageSequence = new AtomicLong(0);
120            this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
121            this.session.addProducer(this);
122            this.session.asyncSendPacket(info);
123            this.setSendTimeout(sendTimeout);
124            setTransformer(session.getTransformer());
125        }
126    
127        public StatsImpl getStats() {
128            return stats;
129        }
130    
131        public JMSProducerStatsImpl getProducerStats() {
132            return stats;
133        }
134    
135        /**
136         * Gets the destination associated with this <CODE>MessageProducer</CODE>.
137         *
138         * @return this producer's <CODE>Destination/ <CODE>
139         * @throws JMSException if the JMS provider fails to close the producer due to
140         *                      some internal error.
141         * @since 1.1
142         */
143        public Destination getDestination() throws JMSException {
144            checkClosed();
145            return this.info.getDestination();
146        }
147    
148        /**
149         * Closes the message producer.
150         * <P>
151         * Since a provider may allocate some resources on behalf of a <CODE>
152         * MessageProducer</CODE>
153         * outside the Java virtual machine, clients should close them when they are
154         * not needed. Relying on garbage collection to eventually reclaim these
155         * resources may not be timely enough.
156         *
157         * @throws JMSException if the JMS provider fails to close the producer due
158         *                 to some internal error.
159         */
160        public void close() throws JMSException {
161            if (!closed) {
162                dispose();
163                this.session.asyncSendPacket(info.createRemoveCommand());
164            }
165        }
166    
167        public void dispose() {
168            if (!closed) {
169                this.session.removeProducer(this);
170                if (producerWindow != null) {
171                    producerWindow.stop();
172                }
173                closed = true;
174            }
175        }
176    
177        /**
178         * Check if the instance of this producer has been closed.
179         *
180         * @throws IllegalStateException
181         */
182        @Override
183        protected void checkClosed() throws IllegalStateException {
184            if (closed) {
185                throw new IllegalStateException("The producer is closed");
186            }
187        }
188    
189        /**
190         * Sends a message to a destination for an unidentified message producer,
191         * specifying delivery mode, priority and time to live.
192         * <P>
193         * Typically, a message producer is assigned a destination at creation time;
194         * however, the JMS API also supports unidentified message producers, which
195         * require that the destination be supplied every time a message is sent.
196         *
197         * @param destination the destination to send this message to
198         * @param message the message to send
199         * @param deliveryMode the delivery mode to use
200         * @param priority the priority for this message
201         * @param timeToLive the message's lifetime (in milliseconds)
202         * @throws JMSException if the JMS provider fails to send the message due to
203         *                 some internal error.
204         * @throws UnsupportedOperationException if an invalid destination is
205         *                 specified.
206         * @throws InvalidDestinationException if a client uses this method with an
207         *                 invalid destination.
208         * @see javax.jms.Session#createProducer
209         * @since 1.1
210         */
211        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
212            this.send(destination, message, deliveryMode, priority, timeToLive, null);
213        }
214    
215        public void send(Message message, AsyncCallback onComplete) throws JMSException {
216            this.send(this.getDestination(),
217                      message,
218                      this.defaultDeliveryMode,
219                      this.defaultPriority,
220                      this.defaultTimeToLive, onComplete);
221        }
222    
223        public void send(Destination destination, Message message, AsyncCallback onComplete) throws JMSException {
224            this.send(destination,
225                      message,
226                      this.defaultDeliveryMode,
227                      this.defaultPriority,
228                      this.defaultTimeToLive,
229                      onComplete);
230        }
231    
232        public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
233            this.send(this.getDestination(),
234                      message,
235                      deliveryMode,
236                      priority,
237                      timeToLive,
238                      onComplete);
239        }
240    
241        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
242            checkClosed();
243            if (destination == null) {
244                if (info.getDestination() == null) {
245                    throw new UnsupportedOperationException("A destination must be specified.");
246                }
247                throw new InvalidDestinationException("Don't understand null destinations");
248            }
249    
250            ActiveMQDestination dest;
251            if (destination.equals(info.getDestination())) {
252                dest = (ActiveMQDestination)destination;
253            } else if (info.getDestination() == null) {
254                dest = ActiveMQDestination.transform(destination);
255            } else {
256                throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
257            }
258            if (dest == null) {
259                throw new JMSException("No destination specified");
260            }
261    
262            if (transformer != null) {
263                Message transformedMessage = transformer.producerTransform(session, this, message);
264                if (transformedMessage != null) {
265                    message = transformedMessage;
266                }
267            }
268    
269            if (producerWindow != null) {
270                try {
271                    producerWindow.waitForSpace();
272                } catch (InterruptedException e) {
273                    throw new JMSException("Send aborted due to thread interrupt.");
274                }
275            }
276    
277            this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
278    
279            stats.onMessage();
280        }
281    
282        public MessageTransformer getTransformer() {
283            return transformer;
284        }
285    
286        /**
287         * Sets the transformer used to transform messages before they are sent on
288         * to the JMS bus
289         */
290        public void setTransformer(MessageTransformer transformer) {
291            this.transformer = transformer;
292        }
293    
294        /**
295         * @return the time in milli second when this object was created.
296         */
297        protected long getStartTime() {
298            return this.startTime;
299        }
300    
301        /**
302         * @return Returns the messageSequence.
303         */
304        protected long getMessageSequence() {
305            return messageSequence.incrementAndGet();
306        }
307    
308        /**
309         * @param messageSequence The messageSequence to set.
310         */
311        protected void setMessageSequence(AtomicLong messageSequence) {
312            this.messageSequence = messageSequence;
313        }
314    
315        /**
316         * @return Returns the info.
317         */
318        protected ProducerInfo getProducerInfo() {
319            return this.info != null ? this.info : null;
320        }
321    
322        /**
323         * @param info The info to set
324         */
325        protected void setProducerInfo(ProducerInfo info) {
326            this.info = info;
327        }
328    
329        @Override
330        public String toString() {
331            return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
332        }
333    
334        public void onProducerAck(ProducerAck pa) {
335            if (this.producerWindow != null) {
336                this.producerWindow.decreaseUsage(pa.getSize());
337            }
338        }
339    
340    }