001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.util.Collections;
022    import java.util.HashMap;
023    import java.util.Map;
024    
025    import javax.jms.IllegalStateException;
026    import javax.jms.InvalidDestinationException;
027    import javax.jms.JMSException;
028    
029    import org.apache.activemq.command.ActiveMQBytesMessage;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQMessage;
032    import org.apache.activemq.command.CommandTypes;
033    import org.apache.activemq.command.ConsumerId;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageDispatch;
037    import org.apache.activemq.command.ProducerId;
038    import org.apache.activemq.selector.SelectorParser;
039    import org.apache.activemq.util.IOExceptionSupport;
040    import org.apache.activemq.util.IntrospectionSupport;
041    import org.apache.activemq.util.JMSExceptionSupport;
042    
043    /**
044     *
045     */
046    public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
047    
048        private final ActiveMQConnection connection;
049        private final ConsumerInfo info;
050        // These are the messages waiting to be delivered to the client
051        private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
052    
053        private int deliveredCounter;
054        private MessageDispatch lastDelivered;
055        private boolean eosReached;
056        private byte buffer[];
057        private int pos;
058        private Map<String, Object> jmsProperties;
059    
060        private ProducerId producerId;
061        private long nextSequenceId;
062        private final long timeout;
063        private boolean firstReceived;
064    
065        public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch,  long timeout)
066            throws JMSException {
067            this.connection = connection;
068    
069            if (dest == null) {
070                throw new InvalidDestinationException("Don't understand null destinations");
071            } else if (dest.isTemporary()) {
072                String physicalName = dest.getPhysicalName();
073    
074                if (physicalName == null) {
075                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
076                }
077    
078                String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
079    
080                if (physicalName.indexOf(connectionID) < 0) {
081                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
082                }
083    
084                if (connection.isDeleted(dest)) {
085                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
086                }
087            }
088    
089            if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
090            this.timeout = timeout;
091    
092            this.info = new ConsumerInfo(consumerId);
093            this.info.setSubscriptionName(name);
094    
095            if (selector != null && selector.trim().length() != 0) {
096                selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
097            } else {
098                selector = "JMSType='org.apache.activemq.Stream'";
099            }
100    
101            SelectorParser.parse(selector);
102            this.info.setSelector(selector);
103    
104            this.info.setPrefetchSize(prefetch);
105            this.info.setNoLocal(noLocal);
106            this.info.setBrowser(false);
107            this.info.setDispatchAsync(false);
108    
109            // Allows the options on the destination to configure the consumerInfo
110            if (dest.getOptions() != null) {
111                Map<String, String> options = new HashMap<String, String>(dest.getOptions());
112                IntrospectionSupport.setProperties(this.info, options, "consumer.");
113            }
114    
115            this.info.setDestination(dest);
116    
117            this.connection.addInputStream(this);
118            this.connection.addDispatcher(info.getConsumerId(), this);
119            this.connection.syncSendPacket(info);
120            unconsumedMessages.start();
121        }
122    
123        @Override
124        public void close() throws IOException {
125            if (!unconsumedMessages.isClosed()) {
126                try {
127                    if (lastDelivered != null) {
128                        MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
129                        connection.asyncSendPacket(ack);
130                    }
131                    dispose();
132                    this.connection.syncSendPacket(info.createRemoveCommand());
133                } catch (JMSException e) {
134                    throw IOExceptionSupport.create(e);
135                }
136            }
137        }
138    
139        public void dispose() {
140            if (!unconsumedMessages.isClosed()) {
141                unconsumedMessages.close();
142                this.connection.removeDispatcher(info.getConsumerId());
143                this.connection.removeInputStream(this);
144            }
145        }
146    
147        /**
148         * Return the JMS Properties which where used to send the InputStream
149         *
150         * @return jmsProperties
151         * @throws IOException
152         */
153        public Map<String, Object> getJMSProperties() throws IOException {
154            if (jmsProperties == null) {
155                fillBuffer();
156            }
157            return jmsProperties;
158        }
159    
160        /**
161         * This method allows the client to receive the Stream data as unaltered ActiveMQMessage
162         * object which is how the split stream data is sent.  Each message will contains one
163         * chunk of the written bytes as well as a valid message group sequence id.  The EOS
164         * message will have a message group sequence id of -1.
165         *
166         * This method is useful for testing, but should never be mixed with calls to the
167         * normal stream receive methods as it will break the normal stream processing flow
168         * and can lead to loss of data.
169         *
170         * @return an ActiveMQMessage object that either contains byte data or an end of strem
171         *         marker.
172         * @throws JMSException
173         * @throws ReadTimeoutException
174         */
175        public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
176            checkClosed();
177            MessageDispatch md;
178            try {
179                if (firstReceived || timeout == -1) {
180                    md = unconsumedMessages.dequeue(-1);
181                    firstReceived = true;
182                } else {
183                    md = unconsumedMessages.dequeue(timeout);
184                    if (md == null) throw new ReadTimeoutException();
185                }
186            } catch (InterruptedException e) {
187                Thread.currentThread().interrupt();
188                throw JMSExceptionSupport.create(e);
189            }
190    
191            if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
192                return null;
193            }
194    
195            deliveredCounter++;
196            if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
197                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
198                connection.asyncSendPacket(ack);
199                deliveredCounter = 0;
200                lastDelivered = null;
201            } else {
202                lastDelivered = md;
203            }
204    
205            return (ActiveMQMessage)md.getMessage();
206        }
207    
208        /**
209         * @throws IllegalStateException
210         */
211        protected void checkClosed() throws IllegalStateException {
212            if (unconsumedMessages.isClosed()) {
213                throw new IllegalStateException("The Consumer is closed");
214            }
215        }
216    
217        /**
218         *
219         * @see InputStream#read()
220         * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
221         */
222        @Override
223        public int read() throws IOException {
224            fillBuffer();
225            if (eosReached || buffer.length == 0) {
226                return -1;
227            }
228    
229            return buffer[pos++] & 0xff;
230        }
231    
232        /**
233         *
234         * @see InputStream#read(byte[], int, int)
235         * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
236         */
237        @Override
238        public int read(byte[] b, int off, int len) throws IOException {
239            fillBuffer();
240            if (eosReached || buffer.length == 0) {
241                return -1;
242            }
243    
244            int max = Math.min(len, buffer.length - pos);
245            System.arraycopy(buffer, pos, b, off, max);
246    
247            pos += max;
248            return max;
249        }
250    
251        private void fillBuffer() throws IOException {
252            if (eosReached || (buffer != null && buffer.length > pos)) {
253                return;
254            }
255            try {
256                while (true) {
257                    ActiveMQMessage m = receive();
258                    if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
259                        // First message.
260                        long producerSequenceId = m.getMessageId().getProducerSequenceId();
261                        if (producerId == null) {
262                            // We have to start a stream at sequence id = 0
263                            if (producerSequenceId != 0) {
264                                continue;
265                            }
266                            nextSequenceId++;
267                            producerId = m.getMessageId().getProducerId();
268                        } else {
269                            // Verify it's the next message of the sequence.
270                            if (!m.getMessageId().getProducerId().equals(producerId)) {
271                                throw new IOException("Received an unexpected message: invalid producer: " + m);
272                            }
273                            if (producerSequenceId != nextSequenceId++) {
274                                throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
275                            }
276                        }
277    
278                        // Read the buffer in.
279                        ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
280                        buffer = new byte[(int)bm.getBodyLength()];
281                        bm.readBytes(buffer);
282                        pos = 0;
283                        if (jmsProperties == null) {
284                            jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
285                        }
286                    } else {
287                        eosReached = true;
288                        if (jmsProperties == null) {
289                            // no properties found
290                            jmsProperties = Collections.emptyMap();
291                        }
292                    }
293                    return;
294                }
295            } catch (JMSException e) {
296                eosReached = true;
297                if (jmsProperties == null) {
298                    // no properties found
299                    jmsProperties = Collections.emptyMap();
300                }
301                throw IOExceptionSupport.create(e);
302            }
303        }
304    
305        @Override
306        public void dispatch(MessageDispatch md) {
307            unconsumedMessages.enqueue(md);
308        }
309    
310        @Override
311        public String toString() {
312            return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
313        }
314    
315        /**
316         * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
317         */
318        public class ReadTimeoutException extends IOException {
319            private static final long serialVersionUID = -3217758894326719909L;
320    
321            public ReadTimeoutException() {
322                super();
323            }
324        }
325    }