001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import javax.jms.Destination;
024    import javax.jms.IllegalStateException;
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ProducerAck;
031    import org.apache.activemq.command.ProducerId;
032    import org.apache.activemq.command.ProducerInfo;
033    import org.apache.activemq.management.JMSProducerStatsImpl;
034    import org.apache.activemq.management.StatsCapable;
035    import org.apache.activemq.management.StatsImpl;
036    import org.apache.activemq.usage.MemoryUsage;
037    import org.apache.activemq.util.IntrospectionSupport;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
043     * destination. A <CODE>MessageProducer</CODE> object is created by passing a
044     * <CODE>Destination</CODE> object to a message-producer creation method
045     * supplied by a session.
046     * <P>
047     * <CODE>MessageProducer</CODE> is the parent interface for all message
048     * producers.
049     * <P>
050     * A client also has the option of creating a message producer without supplying
051     * a destination. In this case, a destination must be provided with every send
052     * operation. A typical use for this kind of message producer is to send replies
053     * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
054     * <P>
055     * A client can specify a default delivery mode, priority, and time to live for
056     * messages sent by a message producer. It can also specify the delivery mode,
057     * priority, and time to live for an individual message.
058     * <P>
059     * A client can specify a time-to-live value in milliseconds for each message it
060     * sends. This value defines a message expiration time that is the sum of the
061     * message's time-to-live and the GMT when it is sent (for transacted sends,
062     * this is the time the client sends the message, not the time the transaction
063     * is committed).
064     * <P>
065     * A JMS provider should do its best to expire messages accurately; however, the
066     * JMS API does not define the accuracy provided.
067     *
068     *
069     * @see javax.jms.TopicPublisher
070     * @see javax.jms.QueueSender
071     * @see javax.jms.Session#createProducer
072     */
073    public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
074    
075        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducer.class);
076    
077        protected ProducerInfo info;
078        protected boolean closed;
079    
080        private final JMSProducerStatsImpl stats;
081        private AtomicLong messageSequence;
082        private final long startTime;
083        private MessageTransformer transformer;
084        private MemoryUsage producerWindow;
085    
086        protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
087            super(session);
088            this.info = new ProducerInfo(producerId);
089            this.info.setWindowSize(session.connection.getProducerWindowSize());
090            // Allows the options on the destination to configure the producerInfo
091            if (destination != null && destination.getOptions() != null) {
092                Map<String, Object> options = IntrospectionSupport.extractProperties(
093                    new HashMap<String, Object>(destination.getOptions()), "producer.");
094                IntrospectionSupport.setProperties(this.info, options);
095                if (options.size() > 0) {
096                    String msg = "There are " + options.size()
097                        + " producer options that couldn't be set on the producer."
098                        + " Check the options are spelled correctly."
099                        + " Unknown parameters=[" + options + "]."
100                        + " This producer cannot be started.";
101                    LOG.warn(msg);
102                    throw new ConfigurationException(msg);
103                }
104            }
105    
106            this.info.setDestination(destination);
107    
108            // Enable producer window flow control if protocol > 3 and the window
109            // size > 0
110            if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
111                producerWindow = new MemoryUsage("Producer Window: " + producerId);
112                producerWindow.setExecutor(session.getConnectionExecutor());
113                producerWindow.setLimit(this.info.getWindowSize());
114                producerWindow.start();
115            }
116    
117            this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
118            this.defaultPriority = Message.DEFAULT_PRIORITY;
119            this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
120            this.startTime = System.currentTimeMillis();
121            this.messageSequence = new AtomicLong(0);
122            this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
123            try {
124                this.session.addProducer(this);
125                this.session.syncSendPacket(info);
126            } catch (JMSException e) {
127                this.session.removeProducer(this);
128                throw e;
129            }
130            this.setSendTimeout(sendTimeout);
131            setTransformer(session.getTransformer());
132        }
133    
134        @Override
135        public StatsImpl getStats() {
136            return stats;
137        }
138    
139        public JMSProducerStatsImpl getProducerStats() {
140            return stats;
141        }
142    
143        /**
144         * Gets the destination associated with this <CODE>MessageProducer</CODE>.
145         *
146         * @return this producer's <CODE>Destination/ <CODE>
147         * @throws JMSException if the JMS provider fails to close the producer due to
148         *                      some internal error.
149         * @since 1.1
150         */
151        @Override
152        public Destination getDestination() throws JMSException {
153            checkClosed();
154            return this.info.getDestination();
155        }
156    
157        /**
158         * Closes the message producer.
159         * <P>
160         * Since a provider may allocate some resources on behalf of a <CODE>
161         * MessageProducer</CODE>
162         * outside the Java virtual machine, clients should close them when they are
163         * not needed. Relying on garbage collection to eventually reclaim these
164         * resources may not be timely enough.
165         *
166         * @throws JMSException if the JMS provider fails to close the producer due
167         *                 to some internal error.
168         */
169        @Override
170        public void close() throws JMSException {
171            if (!closed) {
172                dispose();
173                this.session.asyncSendPacket(info.createRemoveCommand());
174            }
175        }
176    
177        @Override
178        public void dispose() {
179            if (!closed) {
180                this.session.removeProducer(this);
181                if (producerWindow != null) {
182                    producerWindow.stop();
183                }
184                closed = true;
185            }
186        }
187    
188        /**
189         * Check if the instance of this producer has been closed.
190         *
191         * @throws IllegalStateException
192         */
193        @Override
194        protected void checkClosed() throws IllegalStateException {
195            if (closed) {
196                throw new IllegalStateException("The producer is closed");
197            }
198        }
199    
200        /**
201         * Sends a message to a destination for an unidentified message producer,
202         * specifying delivery mode, priority and time to live.
203         * <P>
204         * Typically, a message producer is assigned a destination at creation time;
205         * however, the JMS API also supports unidentified message producers, which
206         * require that the destination be supplied every time a message is sent.
207         *
208         * @param destination the destination to send this message to
209         * @param message the message to send
210         * @param deliveryMode the delivery mode to use
211         * @param priority the priority for this message
212         * @param timeToLive the message's lifetime (in milliseconds)
213         * @throws JMSException if the JMS provider fails to send the message due to
214         *                 some internal error.
215         * @throws UnsupportedOperationException if an invalid destination is
216         *                 specified.
217         * @throws InvalidDestinationException if a client uses this method with an
218         *                 invalid destination.
219         * @see javax.jms.Session#createProducer
220         * @since 1.1
221         */
222        @Override
223        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
224            this.send(destination, message, deliveryMode, priority, timeToLive, null);
225        }
226    
227        public void send(Message message, AsyncCallback onComplete) throws JMSException {
228            this.send(this.getDestination(),
229                      message,
230                      this.defaultDeliveryMode,
231                      this.defaultPriority,
232                      this.defaultTimeToLive, onComplete);
233        }
234    
235        public void send(Destination destination, Message message, AsyncCallback onComplete) throws JMSException {
236            this.send(destination,
237                      message,
238                      this.defaultDeliveryMode,
239                      this.defaultPriority,
240                      this.defaultTimeToLive,
241                      onComplete);
242        }
243    
244        public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
245            this.send(this.getDestination(),
246                      message,
247                      deliveryMode,
248                      priority,
249                      timeToLive,
250                      onComplete);
251        }
252    
253        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
254            checkClosed();
255            if (destination == null) {
256                if (info.getDestination() == null) {
257                    throw new UnsupportedOperationException("A destination must be specified.");
258                }
259                throw new InvalidDestinationException("Don't understand null destinations");
260            }
261    
262            ActiveMQDestination dest;
263            if (destination.equals(info.getDestination())) {
264                dest = (ActiveMQDestination)destination;
265            } else if (info.getDestination() == null) {
266                dest = ActiveMQDestination.transform(destination);
267            } else {
268                throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
269            }
270            if (dest == null) {
271                throw new JMSException("No destination specified");
272            }
273    
274            if (transformer != null) {
275                Message transformedMessage = transformer.producerTransform(session, this, message);
276                if (transformedMessage != null) {
277                    message = transformedMessage;
278                }
279            }
280    
281            if (producerWindow != null) {
282                try {
283                    producerWindow.waitForSpace();
284                } catch (InterruptedException e) {
285                    throw new JMSException("Send aborted due to thread interrupt.");
286                }
287            }
288    
289            this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
290    
291            stats.onMessage();
292        }
293    
294        public MessageTransformer getTransformer() {
295            return transformer;
296        }
297    
298        /**
299         * Sets the transformer used to transform messages before they are sent on
300         * to the JMS bus
301         */
302        public void setTransformer(MessageTransformer transformer) {
303            this.transformer = transformer;
304        }
305    
306        /**
307         * @return the time in milli second when this object was created.
308         */
309        protected long getStartTime() {
310            return this.startTime;
311        }
312    
313        /**
314         * @return Returns the messageSequence.
315         */
316        protected long getMessageSequence() {
317            return messageSequence.incrementAndGet();
318        }
319    
320        /**
321         * @param messageSequence The messageSequence to set.
322         */
323        protected void setMessageSequence(AtomicLong messageSequence) {
324            this.messageSequence = messageSequence;
325        }
326    
327        /**
328         * @return Returns the info.
329         */
330        protected ProducerInfo getProducerInfo() {
331            return this.info != null ? this.info : null;
332        }
333    
334        /**
335         * @param info The info to set
336         */
337        protected void setProducerInfo(ProducerInfo info) {
338            this.info = info;
339        }
340    
341        @Override
342        public String toString() {
343            return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
344        }
345    
346        public void onProducerAck(ProducerAck pa) {
347            if (this.producerWindow != null) {
348                this.producerWindow.decreaseUsage(pa.getSize());
349            }
350        }
351    
352    }