001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.OutputStream;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.ProducerId;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.util.IOExceptionSupport;
035    import org.apache.activemq.util.IntrospectionSupport;
036    
037    /**
038     *
039     */
040    public class ActiveMQOutputStream extends OutputStream implements Disposable {
041    
042        protected int count;
043    
044        final byte buffer[];
045    
046        private final ActiveMQConnection connection;
047        private final Map<String, Object> properties;
048        private final ProducerInfo info;
049    
050        private long messageSequence;
051        private boolean closed;
052        private final int deliveryMode;
053        private final int priority;
054        private final long timeToLive;
055        private boolean alwaysSyncSend = false;
056    
057        /**
058         * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
059         */
060        public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
061    
062        public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
063                                    long timeToLive) throws JMSException {
064            this.connection = connection;
065            this.deliveryMode = deliveryMode;
066            this.priority = priority;
067            this.timeToLive = timeToLive;
068            this.properties = properties == null ? null : new HashMap<String, Object>(properties);
069    
070            Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
071            if (chunkSize == null) {
072                chunkSize = 64 * 1024;
073            } else {
074                if (chunkSize < 1) {
075                    throw new IllegalArgumentException("Chunk size must be greater then 0");
076                } else {
077                    chunkSize *= 1024;
078                }
079            }
080    
081            buffer = new byte[chunkSize];
082    
083            if (destination == null) {
084                throw new InvalidDestinationException("Don't understand null destinations");
085            }
086    
087            this.info = new ProducerInfo(producerId);
088    
089            // Allows the options on the destination to configure the stream
090            if (destination.getOptions() != null) {
091                Map<String, String> options = new HashMap<String, String>(destination.getOptions());
092                IntrospectionSupport.setProperties(this, options, "producer.");
093                IntrospectionSupport.setProperties(this.info, options, "producer.");
094            }
095    
096            this.info.setDestination(destination);
097    
098            this.connection.addOutputStream(this);
099            this.connection.asyncSendPacket(info);
100        }
101    
102        public void close() throws IOException {
103            if (!closed) {
104                flushBuffer();
105                try {
106                    // Send an EOS style empty message to signal EOS.
107                    send(new ActiveMQMessage(), true);
108                    dispose();
109                    this.connection.asyncSendPacket(info.createRemoveCommand());
110                } catch (JMSException e) {
111                    IOExceptionSupport.create(e);
112                }
113            }
114        }
115    
116        public void dispose() {
117            if (!closed) {
118                this.connection.removeOutputStream(this);
119                closed = true;
120            }
121        }
122    
123        public synchronized void write(int b) throws IOException {
124            buffer[count++] = (byte) b;
125            if (count == buffer.length) {
126                flushBuffer();
127            }
128        }
129    
130        public synchronized void write(byte b[], int off, int len) throws IOException {
131            while (len > 0) {
132                int max = Math.min(len, buffer.length - count);
133                System.arraycopy(b, off, buffer, count, max);
134    
135                len -= max;
136                count += max;
137                off += max;
138    
139                if (count == buffer.length) {
140                    flushBuffer();
141                }
142            }
143        }
144    
145        public synchronized void flush() throws IOException {
146            flushBuffer();
147        }
148    
149        private void flushBuffer() throws IOException {
150            if (count != 0) {
151                try {
152                    ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
153                    msg.writeBytes(buffer, 0, count);
154                    send(msg, false);
155                } catch (JMSException e) {
156                    throw IOExceptionSupport.create(e);
157                }
158                count = 0;
159            }
160        }
161    
162        /**
163         * @param msg
164         * @throws JMSException
165         */
166        private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
167            if (properties != null) {
168                for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
169                    String key = iter.next();
170                    Object value = properties.get(key);
171                    msg.setObjectProperty(key, value);
172                }
173            }
174            msg.setType("org.apache.activemq.Stream");
175            msg.setGroupID(info.getProducerId().toString());
176            if (eosMessage) {
177                msg.setGroupSequence(-1);
178            } else {
179                msg.setGroupSequence((int) messageSequence);
180            }
181            MessageId id = new MessageId(info.getProducerId(), messageSequence++);
182            connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
183        }
184    
185        public String toString() {
186            return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
187        }
188    
189        public boolean isAlwaysSyncSend() {
190            return alwaysSyncSend;
191        }
192    
193        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
194            this.alwaysSyncSend = alwaysSyncSend;
195        }
196    
197    }