001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.InputStream;
021    import java.util.Collections;
022    import java.util.HashMap;
023    import java.util.Map;
024    import javax.jms.IllegalStateException;
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    import org.apache.activemq.command.ActiveMQBytesMessage;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQMessage;
030    import org.apache.activemq.command.CommandTypes;
031    import org.apache.activemq.command.ConsumerId;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.MessageAck;
034    import org.apache.activemq.command.MessageDispatch;
035    import org.apache.activemq.command.ProducerId;
036    import org.apache.activemq.selector.SelectorParser;
037    import org.apache.activemq.util.IOExceptionSupport;
038    import org.apache.activemq.util.IntrospectionSupport;
039    import org.apache.activemq.util.JMSExceptionSupport;
040    
041    /**
042     * 
043     */
044    public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
045    
046        private final ActiveMQConnection connection;
047        private final ConsumerInfo info;
048        // These are the messages waiting to be delivered to the client
049        private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
050    
051        private int deliveredCounter;
052        private MessageDispatch lastDelivered;
053        private boolean eosReached;
054        private byte buffer[];
055        private int pos;
056        private Map<String, Object> jmsProperties;
057    
058        private ProducerId producerId;
059        private long nextSequenceId;
060        private long timeout;
061        private boolean firstReceived;
062    
063        public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch,  long timeout)
064            throws JMSException {
065            this.connection = connection;
066    
067            if (dest == null) {
068                throw new InvalidDestinationException("Don't understand null destinations");
069            } else if (dest.isTemporary()) {
070                String physicalName = dest.getPhysicalName();
071    
072                if (physicalName == null) {
073                    throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
074                }
075    
076                String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
077    
078                if (physicalName.indexOf(connectionID) < 0) {
079                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
080                }
081    
082                if (connection.isDeleted(dest)) {
083                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
084                }
085            }
086    
087            if (timeout < -1) throw new IllegalArgumentException("Timeout must be >= -1");
088            this.timeout = timeout;
089            
090            this.info = new ConsumerInfo(consumerId);
091            this.info.setSubscriptionName(name);
092    
093            if (selector != null && selector.trim().length() != 0) {
094                selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
095            } else {
096                selector = "JMSType='org.apache.activemq.Stream'";
097            }
098    
099            SelectorParser.parse(selector);
100            this.info.setSelector(selector);
101    
102            this.info.setPrefetchSize(prefetch);
103            this.info.setNoLocal(noLocal);
104            this.info.setBrowser(false);
105            this.info.setDispatchAsync(false);
106    
107            // Allows the options on the destination to configure the consumerInfo
108            if (dest.getOptions() != null) {
109                Map<String, String> options = new HashMap<String, String>(dest.getOptions());
110                IntrospectionSupport.setProperties(this.info, options, "consumer.");
111            }
112    
113            this.info.setDestination(dest);
114    
115            this.connection.addInputStream(this);
116            this.connection.addDispatcher(info.getConsumerId(), this);
117            this.connection.syncSendPacket(info);
118            unconsumedMessages.start();
119        }
120    
121        @Override
122        public void close() throws IOException {
123            if (!unconsumedMessages.isClosed()) {
124                try {
125                    if (lastDelivered != null) {
126                        MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
127                        connection.asyncSendPacket(ack);
128                    }
129                    dispose();
130                    this.connection.syncSendPacket(info.createRemoveCommand());
131                } catch (JMSException e) {
132                    throw IOExceptionSupport.create(e);
133                }
134            }
135        }
136    
137        public void dispose() {
138            if (!unconsumedMessages.isClosed()) {
139                unconsumedMessages.close();
140                this.connection.removeDispatcher(info.getConsumerId());
141                this.connection.removeInputStream(this);
142            }
143        }
144    
145        /**
146         * Return the JMS Properties which where used to send the InputStream
147         *
148         * @return jmsProperties
149         * @throws IOException
150         */
151        public Map<String, Object> getJMSProperties() throws IOException {
152            if (jmsProperties == null) {
153                fillBuffer();
154            }
155            return jmsProperties;
156        }
157    
158        public ActiveMQMessage receive() throws JMSException, ReadTimeoutException {
159            checkClosed();
160            MessageDispatch md;
161            try {
162                if (firstReceived || timeout == -1) {
163                    md = unconsumedMessages.dequeue(-1);
164                    firstReceived = true;
165                } else {
166                    md = unconsumedMessages.dequeue(timeout);
167                    if (md == null) throw new ReadTimeoutException();
168                }
169            } catch (InterruptedException e) {
170                Thread.currentThread().interrupt();
171                throw JMSExceptionSupport.create(e);
172            }
173    
174            if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
175                return null;
176            }
177    
178            deliveredCounter++;
179            if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
180                MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
181                connection.asyncSendPacket(ack);
182                deliveredCounter = 0;
183                lastDelivered = null;
184            } else {
185                lastDelivered = md;
186            }
187    
188            return (ActiveMQMessage)md.getMessage();
189        }
190    
191        /**
192         * @throws IllegalStateException
193         */
194        protected void checkClosed() throws IllegalStateException {
195            if (unconsumedMessages.isClosed()) {
196                throw new IllegalStateException("The Consumer is closed");
197            }
198        }
199    
200        /**
201         * 
202         * @see InputStream#read()
203         * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
204         */
205        @Override
206        public int read() throws IOException {
207            fillBuffer();
208            if (eosReached || buffer.length == 0) {
209                return -1;
210            }
211    
212            return buffer[pos++] & 0xff;
213        }
214        
215        /**
216         * 
217         * @see InputStream#read(byte[], int, int)
218         * @throws ReadTimeoutException if a timeout was given and the first chunk of the message could not read within the timeout
219         */
220        @Override
221        public int read(byte[] b, int off, int len) throws IOException {
222            fillBuffer();
223            if (eosReached || buffer.length == 0) {
224                return -1;
225            }
226    
227            int max = Math.min(len, buffer.length - pos);
228            System.arraycopy(buffer, pos, b, off, max);
229    
230            pos += max;
231            return max;
232        }
233    
234        private void fillBuffer() throws IOException {
235            if (eosReached || (buffer != null && buffer.length > pos)) {
236                return;
237            }
238            try {
239                while (true) {
240                    ActiveMQMessage m = receive();
241                    if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
242                        // First message.
243                        long producerSequenceId = m.getMessageId().getProducerSequenceId();
244                        if (producerId == null) {
245                            // We have to start a stream at sequence id = 0
246                            if (producerSequenceId != 0) {
247                                continue;
248                            }
249                            nextSequenceId++;
250                            producerId = m.getMessageId().getProducerId();
251                        } else {
252                            // Verify it's the next message of the sequence.
253                            if (!m.getMessageId().getProducerId().equals(producerId)) {
254                                throw new IOException("Received an unexpected message: invalid producer: " + m);
255                            }
256                            if (producerSequenceId != nextSequenceId++) {
257                                throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
258                            }
259                        }
260    
261                        // Read the buffer in.
262                        ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
263                        buffer = new byte[(int)bm.getBodyLength()];
264                        bm.readBytes(buffer);
265                        pos = 0;
266                        if (jmsProperties == null) {
267                            jmsProperties = Collections.unmodifiableMap(new HashMap<String, Object>(bm.getProperties()));
268                        }
269                    } else {
270                        eosReached = true;
271                        if (jmsProperties == null) {
272                            // no properties found
273                            jmsProperties = Collections.emptyMap();
274                        }
275                    }
276                    return;
277                }
278            } catch (JMSException e) {
279                eosReached = true;
280                if (jmsProperties == null) {
281                    // no properties found
282                    jmsProperties = Collections.emptyMap();
283                }
284                throw IOExceptionSupport.create(e);
285            }
286        }
287    
288        public void dispatch(MessageDispatch md) {
289            unconsumedMessages.enqueue(md);
290        }
291    
292        @Override
293        public String toString() {
294            return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
295        }
296    
297    
298        /**
299         * Exception which should get thrown if the first chunk of the stream could not read within the configured timeout
300         *
301         */
302        public class ReadTimeoutException extends IOException {
303            public ReadTimeoutException() {
304                super();
305            }
306        }
307    }