001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.atomic.AtomicLong;
022
023import javax.jms.Destination;
024import javax.jms.IllegalStateException;
025import javax.jms.InvalidDestinationException;
026import javax.jms.JMSException;
027import javax.jms.Message;
028
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ProducerAck;
031import org.apache.activemq.command.ProducerId;
032import org.apache.activemq.command.ProducerInfo;
033import org.apache.activemq.management.JMSProducerStatsImpl;
034import org.apache.activemq.management.StatsCapable;
035import org.apache.activemq.management.StatsImpl;
036import org.apache.activemq.usage.MemoryUsage;
037import org.apache.activemq.util.IntrospectionSupport;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
043 * destination. A <CODE>MessageProducer</CODE> object is created by passing a
044 * <CODE>Destination</CODE> object to a message-producer creation method
045 * supplied by a session.
046 * <P>
047 * <CODE>MessageProducer</CODE> is the parent interface for all message
048 * producers.
049 * <P>
050 * A client also has the option of creating a message producer without supplying
051 * a destination. In this case, a destination must be provided with every send
052 * operation. A typical use for this kind of message producer is to send replies
053 * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
054 * <P>
055 * A client can specify a default delivery mode, priority, and time to live for
056 * messages sent by a message producer. It can also specify the delivery mode,
057 * priority, and time to live for an individual message.
058 * <P>
059 * A client can specify a time-to-live value in milliseconds for each message it
060 * sends. This value defines a message expiration time that is the sum of the
061 * message's time-to-live and the GMT when it is sent (for transacted sends,
062 * this is the time the client sends the message, not the time the transaction
063 * is committed).
064 * <P>
065 * A JMS provider should do its best to expire messages accurately; however, the
066 * JMS API does not define the accuracy provided.
067 *
068 *
069 * @see javax.jms.TopicPublisher
070 * @see javax.jms.QueueSender
071 * @see javax.jms.Session#createProducer
072 */
073public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
074
075    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducer.class);
076
077    protected ProducerInfo info;
078    protected boolean closed;
079
080    private final JMSProducerStatsImpl stats;
081    private AtomicLong messageSequence;
082    private final long startTime;
083    private MessageTransformer transformer;
084    private MemoryUsage producerWindow;
085
086    protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
087        super(session);
088        this.info = new ProducerInfo(producerId);
089        this.info.setWindowSize(session.connection.getProducerWindowSize());
090        // Allows the options on the destination to configure the producerInfo
091        if (destination != null && destination.getOptions() != null) {
092            Map<String, Object> options = IntrospectionSupport.extractProperties(
093                new HashMap<String, Object>(destination.getOptions()), "producer.");
094            IntrospectionSupport.setProperties(this.info, options);
095            if (options.size() > 0) {
096                String msg = "There are " + options.size()
097                    + " producer options that couldn't be set on the producer."
098                    + " Check the options are spelled correctly."
099                    + " Unknown parameters=[" + options + "]."
100                    + " This producer cannot be started.";
101                LOG.warn(msg);
102                throw new ConfigurationException(msg);
103            }
104        }
105
106        this.info.setDestination(destination);
107
108        // Enable producer window flow control if protocol >= 3 and the window size > 0
109        if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
110            producerWindow = new MemoryUsage("Producer Window: " + producerId);
111            producerWindow.setExecutor(session.getConnectionExecutor());
112            producerWindow.setLimit(this.info.getWindowSize());
113            producerWindow.start();
114        }
115
116        this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
117        this.defaultPriority = Message.DEFAULT_PRIORITY;
118        this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
119        this.startTime = System.currentTimeMillis();
120        this.messageSequence = new AtomicLong(0);
121        this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
122        try {
123            this.session.addProducer(this);
124            this.session.syncSendPacket(info);
125        } catch (JMSException e) {
126            this.session.removeProducer(this);
127            throw e;
128        }
129        this.setSendTimeout(sendTimeout);
130        setTransformer(session.getTransformer());
131    }
132
133    @Override
134    public StatsImpl getStats() {
135        return stats;
136    }
137
138    public JMSProducerStatsImpl getProducerStats() {
139        return stats;
140    }
141
142    /**
143     * Gets the destination associated with this <CODE>MessageProducer</CODE>.
144     *
145     * @return this producer's <CODE>Destination/ <CODE>
146     * @throws JMSException if the JMS provider fails to close the producer due to
147     *                      some internal error.
148     * @since 1.1
149     */
150    @Override
151    public Destination getDestination() throws JMSException {
152        checkClosed();
153        return this.info.getDestination();
154    }
155
156    /**
157     * Closes the message producer.
158     * <P>
159     * Since a provider may allocate some resources on behalf of a <CODE>
160     * MessageProducer</CODE>
161     * outside the Java virtual machine, clients should close them when they are
162     * not needed. Relying on garbage collection to eventually reclaim these
163     * resources may not be timely enough.
164     *
165     * @throws JMSException if the JMS provider fails to close the producer due
166     *                 to some internal error.
167     */
168    @Override
169    public void close() throws JMSException {
170        if (!closed) {
171            dispose();
172            this.session.asyncSendPacket(info.createRemoveCommand());
173        }
174    }
175
176    @Override
177    public void dispose() {
178        if (!closed) {
179            this.session.removeProducer(this);
180            if (producerWindow != null) {
181                producerWindow.stop();
182            }
183            closed = true;
184        }
185    }
186
187    /**
188     * Check if the instance of this producer has been closed.
189     *
190     * @throws IllegalStateException
191     */
192    @Override
193    protected void checkClosed() throws IllegalStateException {
194        if (closed) {
195            throw new IllegalStateException("The producer is closed");
196        }
197    }
198
199    /**
200     * Sends a message to a destination for an unidentified message producer,
201     * specifying delivery mode, priority and time to live.
202     * <P>
203     * Typically, a message producer is assigned a destination at creation time;
204     * however, the JMS API also supports unidentified message producers, which
205     * require that the destination be supplied every time a message is sent.
206     *
207     * @param destination the destination to send this message to
208     * @param message the message to send
209     * @param deliveryMode the delivery mode to use
210     * @param priority the priority for this message
211     * @param timeToLive the message's lifetime (in milliseconds)
212     * @throws JMSException if the JMS provider fails to send the message due to
213     *                 some internal error.
214     * @throws UnsupportedOperationException if an invalid destination is
215     *                 specified.
216     * @throws InvalidDestinationException if a client uses this method with an
217     *                 invalid destination.
218     * @see javax.jms.Session#createProducer
219     * @since 1.1
220     */
221    @Override
222    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
223        this.send(destination, message, deliveryMode, priority, timeToLive, null);
224    }
225
226    public void send(Message message, AsyncCallback onComplete) throws JMSException {
227        this.send(this.getDestination(),
228                  message,
229                  this.defaultDeliveryMode,
230                  this.defaultPriority,
231                  this.defaultTimeToLive, onComplete);
232    }
233
234    public void send(Destination destination, Message message, AsyncCallback onComplete) throws JMSException {
235        this.send(destination,
236                  message,
237                  this.defaultDeliveryMode,
238                  this.defaultPriority,
239                  this.defaultTimeToLive,
240                  onComplete);
241    }
242
243    public void send(Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
244        this.send(this.getDestination(),
245                  message,
246                  deliveryMode,
247                  priority,
248                  timeToLive,
249                  onComplete);
250    }
251
252    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
253        checkClosed();
254        if (destination == null) {
255            if (info.getDestination() == null) {
256                throw new UnsupportedOperationException("A destination must be specified.");
257            }
258            throw new InvalidDestinationException("Don't understand null destinations");
259        }
260
261        ActiveMQDestination dest;
262        if (destination.equals(info.getDestination())) {
263            dest = (ActiveMQDestination)destination;
264        } else if (info.getDestination() == null) {
265            dest = ActiveMQDestination.transform(destination);
266        } else {
267            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
268        }
269        if (dest == null) {
270            throw new JMSException("No destination specified");
271        }
272
273        if (transformer != null) {
274            Message transformedMessage = transformer.producerTransform(session, this, message);
275            if (transformedMessage != null) {
276                message = transformedMessage;
277            }
278        }
279
280        if (producerWindow != null) {
281            try {
282                producerWindow.waitForSpace();
283            } catch (InterruptedException e) {
284                throw new JMSException("Send aborted due to thread interrupt.");
285            }
286        }
287
288        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
289
290        stats.onMessage();
291    }
292
293    public MessageTransformer getTransformer() {
294        return transformer;
295    }
296
297    /**
298     * Sets the transformer used to transform messages before they are sent on
299     * to the JMS bus
300     */
301    public void setTransformer(MessageTransformer transformer) {
302        this.transformer = transformer;
303    }
304
305    /**
306     * @return the time in milli second when this object was created.
307     */
308    protected long getStartTime() {
309        return this.startTime;
310    }
311
312    /**
313     * @return Returns the messageSequence.
314     */
315    protected long getMessageSequence() {
316        return messageSequence.incrementAndGet();
317    }
318
319    /**
320     * @param messageSequence The messageSequence to set.
321     */
322    protected void setMessageSequence(AtomicLong messageSequence) {
323        this.messageSequence = messageSequence;
324    }
325
326    /**
327     * @return Returns the info.
328     */
329    protected ProducerInfo getProducerInfo() {
330        return this.info != null ? this.info : null;
331    }
332
333    /**
334     * @param info The info to set
335     */
336    protected void setProducerInfo(ProducerInfo info) {
337        this.info = info;
338    }
339
340    @Override
341    public String toString() {
342        return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
343    }
344
345    public void onProducerAck(ProducerAck pa) {
346        if (this.producerWindow != null) {
347            this.producerWindow.decreaseUsage(pa.getSize());
348        }
349    }
350
351}