001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.OutputStream;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.command.ActiveMQBytesMessage;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQMessage;
031 import org.apache.activemq.command.MessageId;
032 import org.apache.activemq.command.ProducerId;
033 import org.apache.activemq.command.ProducerInfo;
034 import org.apache.activemq.util.IOExceptionSupport;
035 import org.apache.activemq.util.IntrospectionSupport;
036 import org.slf4j.Logger;
037 import org.slf4j.LoggerFactory;
038
039 public class ActiveMQOutputStream extends OutputStream implements Disposable {
040
041 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQOutputStream.class);
042
043 protected int count;
044
045 final byte buffer[];
046
047 private final ActiveMQConnection connection;
048 private final Map<String, Object> properties;
049 private final ProducerInfo info;
050
051 private long messageSequence;
052 private boolean closed;
053 private final int deliveryMode;
054 private final int priority;
055 private final long timeToLive;
056 private boolean alwaysSyncSend = false;
057 private boolean addPropertiesOnFirstMsgOnly = false;
058
059 /**
060 * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
061 */
062 public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
063
064 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
065 long timeToLive) throws JMSException {
066 this.connection = connection;
067 this.deliveryMode = deliveryMode;
068 this.priority = priority;
069 this.timeToLive = timeToLive;
070 this.properties = properties == null ? null : new HashMap<String, Object>(properties);
071
072 Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
073 if (chunkSize == null) {
074 chunkSize = 64 * 1024;
075 } else {
076 if (chunkSize < 1) {
077 throw new IllegalArgumentException("Chunk size must be greater then 0");
078 } else {
079 chunkSize *= 1024;
080 }
081 }
082
083 buffer = new byte[chunkSize];
084
085 if (destination == null) {
086 throw new InvalidDestinationException("Don't understand null destinations");
087 }
088
089 this.info = new ProducerInfo(producerId);
090
091 // Allows the options on the destination to configure the stream
092 if (destination.getOptions() != null) {
093 Map<String, String> options = new HashMap<String, String>(destination.getOptions());
094 IntrospectionSupport.setProperties(this, options, "producer.");
095 IntrospectionSupport.setProperties(this.info, options, "producer.");
096 if (options.size() > 0) {
097 String msg = "There are " + options.size()
098 + " producer options that couldn't be set on the producer."
099 + " Check the options are spelled correctly."
100 + " Unknown parameters=[" + options + "]."
101 + " This producer cannot be started.";
102 LOG.warn(msg);
103 throw new ConfigurationException(msg);
104 }
105 }
106
107 this.info.setDestination(destination);
108
109 this.connection.addOutputStream(this);
110 this.connection.asyncSendPacket(info);
111 }
112
113 @Override
114 public void close() throws IOException {
115 if (!closed) {
116 flushBuffer();
117 try {
118 // Send an EOS style empty message to signal EOS.
119 send(new ActiveMQMessage(), true);
120 dispose();
121 this.connection.asyncSendPacket(info.createRemoveCommand());
122 } catch (JMSException e) {
123 IOExceptionSupport.create(e);
124 }
125 }
126 }
127
128 @Override
129 public void dispose() {
130 if (!closed) {
131 this.connection.removeOutputStream(this);
132 closed = true;
133 }
134 }
135
136 @Override
137 public synchronized void write(int b) throws IOException {
138 buffer[count++] = (byte) b;
139 if (count == buffer.length) {
140 flushBuffer();
141 }
142 }
143
144 @Override
145 public synchronized void write(byte b[], int off, int len) throws IOException {
146 while (len > 0) {
147 int max = Math.min(len, buffer.length - count);
148 System.arraycopy(b, off, buffer, count, max);
149
150 len -= max;
151 count += max;
152 off += max;
153
154 if (count == buffer.length) {
155 flushBuffer();
156 }
157 }
158 }
159
160 @Override
161 public synchronized void flush() throws IOException {
162 flushBuffer();
163 }
164
165 private void flushBuffer() throws IOException {
166 if (count != 0) {
167 try {
168 ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
169 msg.writeBytes(buffer, 0, count);
170 send(msg, false);
171 } catch (JMSException e) {
172 throw IOExceptionSupport.create(e);
173 }
174 count = 0;
175 }
176 }
177
178 /**
179 * @param msg
180 * @throws JMSException
181 */
182 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
183 if (properties != null && (messageSequence == 0 || !addPropertiesOnFirstMsgOnly)) {
184 for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
185 String key = iter.next();
186 Object value = properties.get(key);
187 msg.setObjectProperty(key, value);
188 }
189 }
190 msg.setType("org.apache.activemq.Stream");
191 msg.setGroupID(info.getProducerId().toString());
192 if (eosMessage) {
193 msg.setGroupSequence(-1);
194 } else {
195 msg.setGroupSequence((int) messageSequence);
196 }
197 MessageId id = new MessageId(info.getProducerId(), messageSequence++);
198 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
199 }
200
201 @Override
202 public String toString() {
203 return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
204 }
205
206 public boolean isAlwaysSyncSend() {
207 return alwaysSyncSend;
208 }
209
210 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
211 this.alwaysSyncSend = alwaysSyncSend;
212 }
213
214 public boolean isAddPropertiesOnFirstMsgOnly() {
215 return addPropertiesOnFirstMsgOnly;
216 }
217
218 public void setAddPropertiesOnFirstMsgOnly(boolean propertiesOnFirstMsgOnly) {
219 this.addPropertiesOnFirstMsgOnly = propertiesOnFirstMsgOnly;
220 }
221 }