001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.OutputStream;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.ProducerId;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.util.IOExceptionSupport;
035    import org.apache.activemq.util.IntrospectionSupport;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    public class ActiveMQOutputStream extends OutputStream implements Disposable {
040    
041        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
042    
043        protected int count;
044    
045        final byte buffer[];
046    
047        private final ActiveMQConnection connection;
048        private final Map<String, Object> properties;
049        private final ProducerInfo info;
050    
051        private long messageSequence;
052        private boolean closed;
053        private final int deliveryMode;
054        private final int priority;
055        private final long timeToLive;
056        private boolean alwaysSyncSend = false;
057        private boolean addPropertiesOnFirstMsgOnly = false;
058    
059        /**
060         * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
061         */
062        public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
063    
064        public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
065                                    long timeToLive) throws JMSException {
066            this.connection = connection;
067            this.deliveryMode = deliveryMode;
068            this.priority = priority;
069            this.timeToLive = timeToLive;
070            this.properties = properties == null ? null : new HashMap<String, Object>(properties);
071    
072            Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
073            if (chunkSize == null) {
074                chunkSize = 64 * 1024;
075            } else {
076                if (chunkSize < 1) {
077                    throw new IllegalArgumentException("Chunk size must be greater then 0");
078                } else {
079                    chunkSize *= 1024;
080                }
081            }
082    
083            buffer = new byte[chunkSize];
084    
085            if (destination == null) {
086                throw new InvalidDestinationException("Don't understand null destinations");
087            }
088    
089            this.info = new ProducerInfo(producerId);
090    
091            // Allows the options on the destination to configure the stream
092            if (destination.getOptions() != null) {
093                Map<String, String> options = new HashMap<String, String>(destination.getOptions());
094                IntrospectionSupport.setProperties(this, options, "producer.");
095                IntrospectionSupport.setProperties(this.info, options, "producer.");
096                if (options.size() > 0) {
097                    String msg = "There are " + options.size()
098                        + " producer options that couldn't be set on the producer."
099                        + " Check the options are spelled correctly."
100                        + " Unknown parameters=[" + options + "]."
101                        + " This producer cannot be started.";
102                    LOG.warn(msg);
103                    throw new ConfigurationException(msg);
104                }
105            }
106    
107            this.info.setDestination(destination);
108    
109            this.connection.addOutputStream(this);
110            this.connection.asyncSendPacket(info);
111        }
112    
113        @Override
114        public void close() throws IOException {
115            if (!closed) {
116                flushBuffer();
117                try {
118                    // Send an EOS style empty message to signal EOS.
119                    send(new ActiveMQMessage(), true);
120                    dispose();
121                    this.connection.asyncSendPacket(info.createRemoveCommand());
122                } catch (JMSException e) {
123                    IOExceptionSupport.create(e);
124                }
125            }
126        }
127    
128        @Override
129        public void dispose() {
130            if (!closed) {
131                this.connection.removeOutputStream(this);
132                closed = true;
133            }
134        }
135    
136        @Override
137        public synchronized void write(int b) throws IOException {
138            buffer[count++] = (byte) b;
139            if (count == buffer.length) {
140                flushBuffer();
141            }
142        }
143    
144        @Override
145        public synchronized void write(byte b[], int off, int len) throws IOException {
146            while (len > 0) {
147                int max = Math.min(len, buffer.length - count);
148                System.arraycopy(b, off, buffer, count, max);
149    
150                len -= max;
151                count += max;
152                off += max;
153    
154                if (count == buffer.length) {
155                    flushBuffer();
156                }
157            }
158        }
159    
160        @Override
161        public synchronized void flush() throws IOException {
162            flushBuffer();
163        }
164    
165        private void flushBuffer() throws IOException {
166            if (count != 0) {
167                try {
168                    ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
169                    msg.writeBytes(buffer, 0, count);
170                    send(msg, false);
171                } catch (JMSException e) {
172                    throw IOExceptionSupport.create(e);
173                }
174                count = 0;
175            }
176        }
177    
178        /**
179         * @param msg
180         * @throws JMSException
181         */
182        private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
183            if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) {
184                for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
185                    String key = iter.next();
186                    Object value = properties.get(key);
187                    msg.setObjectProperty(key, value);
188                }
189            }
190            msg.setType("org.apache.activemq.Stream");
191            msg.setGroupID(info.getProducerId().toString());
192            if (eosMessage) {
193                msg.setGroupSequence(-1);
194            } else {
195                msg.setGroupSequence((int) messageSequence);
196            }
197            MessageId id = new MessageId(info.getProducerId(), messageSequence++);
198            connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
199        }
200    
201        @Override
202        public String toString() {
203            return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
204        }
205    
206        public boolean isAlwaysSyncSend() {
207            return alwaysSyncSend;
208        }
209    
210        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
211            this.alwaysSyncSend = alwaysSyncSend;
212        }
213    
214        public boolean isAddPropertiesOnFirstMsgOnly() {
215            return addPropertiesOnFirstMsgOnly;
216        }
217    
218        public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
219            this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
220        }
221    }