001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.util.HashMap;
020 import java.util.Map;
021 import java.util.concurrent.atomic.AtomicLong;
022
023 import javax.jms.Destination;
024 import javax.jms.IllegalStateException;
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027 import javax.jms.Message;
028
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ProducerAck;
031 import org.apache.activemq.command.ProducerId;
032 import org.apache.activemq.command.ProducerInfo;
033 import org.apache.activemq.management.JMSProducerStatsImpl;
034 import org.apache.activemq.management.StatsCapable;
035 import org.apache.activemq.management.StatsImpl;
036 import org.apache.activemq.usage.MemoryUsage;
037 import org.apache.activemq.util.IntrospectionSupport;
038
039 /**
040 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
041 * destination. A <CODE>MessageProducer</CODE> object is created by passing a
042 * <CODE>Destination</CODE> object to a message-producer creation method
043 * supplied by a session.
044 * <P>
045 * <CODE>MessageProducer</CODE> is the parent interface for all message
046 * producers.
047 * <P>
048 * A client also has the option of creating a message producer without supplying
049 * a destination. In this case, a destination must be provided with every send
050 * operation. A typical use for this kind of message producer is to send replies
051 * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
052 * <P>
053 * A client can specify a default delivery mode, priority, and time to live for
054 * messages sent by a message producer. It can also specify the delivery mode,
055 * priority, and time to live for an individual message.
056 * <P>
057 * A client can specify a time-to-live value in milliseconds for each message it
058 * sends. This value defines a message expiration time that is the sum of the
059 * message's time-to-live and the GMT when it is sent (for transacted sends,
060 * this is the time the client sends the message, not the time the transaction
061 * is committed).
062 * <P>
063 * A JMS provider should do its best to expire messages accurately; however, the
064 * JMS API does not define the accuracy provided.
065 *
066 * @version $Revision: 1.14 $
067 * @see javax.jms.TopicPublisher
068 * @see javax.jms.QueueSender
069 * @see javax.jms.Session#createProducer
070 */
071 public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
072
073 protected ProducerInfo info;
074 protected boolean closed;
075
076 private JMSProducerStatsImpl stats;
077 private AtomicLong messageSequence;
078 private long startTime;
079 private MessageTransformer transformer;
080 private MemoryUsage producerWindow;
081
082 protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
083 super(session);
084 this.info = new ProducerInfo(producerId);
085 this.info.setWindowSize(session.connection.getProducerWindowSize());
086 if (destination != null && destination.getOptions() != null) {
087 Map<String, String> options = new HashMap<String, String>(destination.getOptions());
088 IntrospectionSupport.setProperties(this.info, options, "producer.");
089 }
090 this.info.setDestination(destination);
091
092 // Enable producer window flow control if protocol > 3 and the window
093 // size > 0
094 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
095 producerWindow = new MemoryUsage("Producer Window: " + producerId);
096 producerWindow.setLimit(this.info.getWindowSize());
097 }
098
099 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
100 this.defaultPriority = Message.DEFAULT_PRIORITY;
101 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
102 this.startTime = System.currentTimeMillis();
103 this.messageSequence = new AtomicLong(0);
104 this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
105 this.session.addProducer(this);
106 this.session.asyncSendPacket(info);
107 this.setSendTimeout(sendTimeout);
108 setTransformer(session.getTransformer());
109 }
110
111 public StatsImpl getStats() {
112 return stats;
113 }
114
115 public JMSProducerStatsImpl getProducerStats() {
116 return stats;
117 }
118
119 /**
120 * Gets the destination associated with this <CODE>MessageProducer</CODE>.
121 *
122 * @return this producer's <CODE>Destination/ <CODE>
123 * @throws JMSException if the JMS provider fails to close the producer due to
124 * some internal error.
125 * @since 1.1
126 */
127 public Destination getDestination() throws JMSException {
128 checkClosed();
129 return this.info.getDestination();
130 }
131
132 /**
133 * Closes the message producer.
134 * <P>
135 * Since a provider may allocate some resources on behalf of a <CODE>
136 * MessageProducer</CODE>
137 * outside the Java virtual machine, clients should close them when they are
138 * not needed. Relying on garbage collection to eventually reclaim these
139 * resources may not be timely enough.
140 *
141 * @throws JMSException if the JMS provider fails to close the producer due
142 * to some internal error.
143 */
144 public void close() throws JMSException {
145 if (!closed) {
146 dispose();
147 this.session.asyncSendPacket(info.createRemoveCommand());
148 }
149 }
150
151 public void dispose() {
152 if (!closed) {
153 this.session.removeProducer(this);
154 closed = true;
155 }
156 }
157
158 /**
159 * Check if the instance of this producer has been closed.
160 *
161 * @throws IllegalStateException
162 */
163 protected void checkClosed() throws IllegalStateException {
164 if (closed) {
165 throw new IllegalStateException("The producer is closed");
166 }
167 }
168
169 /**
170 * Sends a message to a destination for an unidentified message producer,
171 * specifying delivery mode, priority and time to live.
172 * <P>
173 * Typically, a message producer is assigned a destination at creation time;
174 * however, the JMS API also supports unidentified message producers, which
175 * require that the destination be supplied every time a message is sent.
176 *
177 * @param destination the destination to send this message to
178 * @param message the message to send
179 * @param deliveryMode the delivery mode to use
180 * @param priority the priority for this message
181 * @param timeToLive the message's lifetime (in milliseconds)
182 * @throws JMSException if the JMS provider fails to send the message due to
183 * some internal error.
184 * @throws UnsupportedOperationException if an invalid destination is
185 * specified.
186 * @throws InvalidDestinationException if a client uses this method with an
187 * invalid destination.
188 * @see javax.jms.Session#createProducer
189 * @since 1.1
190 */
191 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
192 checkClosed();
193 if (destination == null) {
194 if (info.getDestination() == null) {
195 throw new UnsupportedOperationException("A destination must be specified.");
196 }
197 throw new InvalidDestinationException("Don't understand null destinations");
198 }
199
200 ActiveMQDestination dest;
201 if (destination == info.getDestination()) {
202 dest = (ActiveMQDestination)destination;
203 } else if (info.getDestination() == null) {
204 dest = ActiveMQDestination.transform(destination);
205 } else {
206 throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
207 }
208 if (dest == null) {
209 throw new JMSException("No destination specified");
210 }
211
212 if (transformer != null) {
213 Message transformedMessage = transformer.producerTransform(session, this, message);
214 if (transformedMessage != null) {
215 message = transformedMessage;
216 }
217 }
218
219 if (producerWindow != null) {
220 try {
221 producerWindow.waitForSpace();
222 } catch (InterruptedException e) {
223 throw new JMSException("Send aborted due to thread interrupt.");
224 }
225 }
226
227 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
228
229 stats.onMessage();
230 }
231
232 public MessageTransformer getTransformer() {
233 return transformer;
234 }
235
236 /**
237 * Sets the transformer used to transform messages before they are sent on
238 * to the JMS bus
239 */
240 public void setTransformer(MessageTransformer transformer) {
241 this.transformer = transformer;
242 }
243
244 /**
245 * @return the time in milli second when this object was created.
246 */
247 protected long getStartTime() {
248 return this.startTime;
249 }
250
251 /**
252 * @return Returns the messageSequence.
253 */
254 protected long getMessageSequence() {
255 return messageSequence.incrementAndGet();
256 }
257
258 /**
259 * @param messageSequence The messageSequence to set.
260 */
261 protected void setMessageSequence(AtomicLong messageSequence) {
262 this.messageSequence = messageSequence;
263 }
264
265 /**
266 * @return Returns the info.
267 */
268 protected ProducerInfo getProducerInfo() {
269 return this.info != null ? this.info : null;
270 }
271
272 /**
273 * @param info The info to set
274 */
275 protected void setProducerInfo(ProducerInfo info) {
276 this.info = info;
277 }
278
279 public String toString() {
280 return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
281 }
282
283 public void onProducerAck(ProducerAck pa) {
284 if (this.producerWindow != null) {
285 this.producerWindow.decreaseUsage(pa.getSize());
286 }
287 }
288
289 }