001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.util.Collections;
022 import java.util.HashMap;
023 import java.util.Map;
024
025 import javax.jms.IllegalStateException;
026 import javax.jms.InvalidDestinationException;
027 import javax.jms.JMSException;
028
029 import org.apache.activemq.command.ActiveMQBytesMessage;
030 import org.apache.activemq.command.ActiveMQDestination;
031 import org.apache.activemq.command.ActiveMQMessage;
032 import org.apache.activemq.command.CommandTypes;
033 import org.apache.activemq.command.ConsumerId;
034 import org.apache.activemq.command.ConsumerInfo;
035 import org.apache.activemq.command.MessageAck;
036 import org.apache.activemq.command.MessageDispatch;
037 import org.apache.activemq.command.ProducerId;
038 import org.apache.activemq.selector.SelectorParser;
039 import org.apache.activemq.util.IOExceptionSupport;
040 import org.apache.activemq.util.IntrospectionSupport;
041 import org.apache.activemq.util.JMSExceptionSupport;
042
043 /**
044 *
045 */
046 public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
047
048 private final ActiveMQConnection connection;
049 private final ConsumerInfo info;
050 // These are the messages waiting to be delivered to the client
051 private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
052
053 private int deliveredCounter;
054 private MessageDispatch lastDelivered;
055 private boolean eosReached;
056 private byte buffer[];
057 private int pos;
058 private Map<String, Object> jmsProperties;
059
060 private ProducerId producerId;
061 private long nextSequenceId;
062 private final long timeout;
063 private boolean firstReceived;
064
065 public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch, long timeout)
066 throws JMSException {
067 this.connection = connection;
068
069 if (dest == null) {
070 throw new InvalidDestinationException("Don't understand null destinations");
071 } else if (dest.isTemporary()) {
072 String physicalName = dest.getPhysicalName();
073
074 if (physicalName == null) {
075 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
076 }
077
078 String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
079
080 if (physicalName.indexOf(connectionID) < 0) {
081 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
082 }
083
084 if (connection.isDeleted(dest)) {
085 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
086 }
087 }
088
089 if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
090 this.timeout = timeout;
091
092 this.info = new ConsumerInfo(consumerId);
093 this.info.setSubscriptionName(name);
094
095 if (selector != null && selector.trim().length() != 0) {
096 selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
097 } else {
098 selector = "JMSType='org.apache.activemq.Stream'";
099 }
100
101 SelectorParser.parse(selector);
102 this.info.setSelector(selector);
103
104 this.info.setPrefetchSize(prefetch);
105 this.info.setNoLocal(noLocal);
106 this.info.setBrowser(false);
107 this.info.setDispatchAsync(false);
108
109 // Allows the options on the destination to configure the consumerInfo
110 if (dest.getOptions() != null) {
111 Map<String, String> options = new HashMap<String, String>(dest.getOptions());
112 IntrospectionSupport.setProperties(this.info, options, "consumer.");
113 }
114
115 this.info.setDestination(dest);
116
117 this.connection.addInputStream(this);
118 this.connection.addDispatcher(info.getConsumerId(), this);
119 this.connection.syncSendPacket(info);
120 unconsumedMessages.start();
121 }
122
123 @Override
124 public void close() throws IOException {
125 if (!unconsumedMessages.isClosed()) {
126 try {
127 if (lastDelivered != null) {
128 MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
129 connection.asyncSendPacket(ack);
130 }
131 dispose();
132 this.connection.syncSendPacket(info.createRemoveCommand());
133 } catch (JMSException e) {
134 throw IOExceptionSupport.create(e);
135 }
136 }
137 }
138
139 public void dispose() {
140 if (!unconsumedMessages.isClosed()) {
141 unconsumedMessages.close();
142 this.connection.removeDispatcher(info.getConsumerId());
143 this.connection.removeInputStream(this);
144 }
145 }
146
147 /**
148 * Return the JMS Properties which where used to send the InputStream
149 *
150 * @return jmsProperties
151 * @throws IOException
152 */
153 public Map<String, Object> getJMSProperties() throws IOException {
154 if (jmsProperties == null) {
155 fillBuffer();
156 }
157 return jmsProperties;
158 }
159
160 /**
161 * This method allows the client to receive the Stream data as unaltered ActiveMQMessage
162 * object which is how the split stream data is sent. Each message will contains one
163 * chunk of the written bytes as well as a valid message group sequence id. The EOS
164 * message will have a message group sequence id of -1.
165 *
166 * This method is useful for testing, but should never be mixed with calls to the
167 * normal stream receive methods as it will break the normal stream processing flow
168 * and can lead to loss of data.
169 *
170 * @return an ActiveMQMessage object that either contains byte data or an end of strem
171 * marker.
172 * @throws JMSException
173 * @throws ReadTimeoutException
174 */
175 public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
176 checkClosed();
177 MessageDispatch md;
178 try {
179 if (firstReceived || timeout == -1) {
180 md = unconsumedMessages.dequeue(-1);
181 firstReceived = true;
182 } else {
183 md = unconsumedMessages.dequeue(timeout);
184 if (md == null) throw new ReadTimeoutException();
185 }
186 } catch (InterruptedException e) {
187 Thread.currentThread().interrupt();
188 throw JMSExceptionSupport.create(e);
189 }
190
191 if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
192 return null;
193 }
194
195 deliveredCounter++;
196 if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
197 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
198 connection.asyncSendPacket(ack);
199 deliveredCounter = 0;
200 lastDelivered = null;
201 } else {
202 lastDelivered = md;
203 }
204
205 return (ActiveMQMessage)md.getMessage();
206 }
207
208 /**
209 * @throws IllegalStateException
210 */
211 protected void checkClosed() throws IllegalStateException {
212 if (unconsumedMessages.isClosed()) {
213 throw new IllegalStateException("The Consumer is closed");
214 }
215 }
216
217 /**
218 *
219 * @see InputStream#read()
220 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
221 */
222 @Override
223 public int read() throws IOException {
224 fillBuffer();
225 if (eosReached || buffer.length == 0) {
226 return -1;
227 }
228
229 return buffer[pos++] & 0xff;
230 }
231
232 /**
233 *
234 * @see InputStream#read(byte[], int, int)
235 * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
236 */
237 @Override
238 public int read(byte[] b, int off, int len) throws IOException {
239 fillBuffer();
240 if (eosReached || buffer.length == 0) {
241 return -1;
242 }
243
244 int max = Math.min(len, buffer.length - pos);
245 System.arraycopy(buffer, pos, b, off, max);
246
247 pos += max;
248 return max;
249 }
250
251 private void fillBuffer() throws IOException {
252 if (eosReached || (buffer != null && buffer.length > pos)) {
253 return;
254 }
255 try {
256 while (true) {
257 ActiveMQMessage m = receive();
258 if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
259 // First message.
260 long producerSequenceId = m.getMessageId().getProducerSequenceId();
261 if (producerId == null) {
262 // We have to start a stream at sequence id = 0
263 if (producerSequenceId != 0) {
264 continue;
265 }
266 nextSequenceId++;
267 producerId = m.getMessageId().getProducerId();
268 } else {
269 // Verify it's the next message of the sequence.
270 if (!m.getMessageId().getProducerId().equals(producerId)) {
271 throw new IOException("Received an unexpected message: invalid producer: " + m);
272 }
273 if (producerSequenceId != nextSequenceId++) {
274 throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
275 }
276 }
277
278 // Read the buffer in.
279 ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
280 buffer = new byte[(int)bm.getBodyLength()];
281 bm.readBytes(buffer);
282 pos = 0;
283 if (jmsProperties == null) {
284 jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
285 }
286 } else {
287 eosReached = true;
288 if (jmsProperties == null) {
289 // no properties found
290 jmsProperties = Collections.emptyMap();
291 }
292 }
293 return;
294 }
295 } catch (JMSException e) {
296 eosReached = true;
297 if (jmsProperties == null) {
298 // no properties found
299 jmsProperties = Collections.emptyMap();
300 }
301 throw IOExceptionSupport.create(e);
302 }
303 }
304
305 @Override
306 public void dispatch(MessageDispatch md) {
307 unconsumedMessages.enqueue(md);
308 }
309
310 @Override
311 public String toString() {
312 return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
313 }
314
315 /**
316 * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
317 */
318 public class ReadTimeoutException extends IOException {
319 private static final long serialVersionUID = -3217758894326719909L;
320
321 public ReadTimeoutException() {
322 super();
323 }
324 }
325 }