001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.OutputStream;
024 import java.util.HashMap;
025 import java.util.zip.DeflaterOutputStream;
026 import java.util.zip.InflaterInputStream;
027
028 import javax.jms.JMSException;
029 import javax.jms.MessageNotWriteableException;
030 import javax.jms.TextMessage;
031
032 import org.apache.activemq.ActiveMQConnection;
033 import org.apache.activemq.util.ByteArrayInputStream;
034 import org.apache.activemq.util.ByteArrayOutputStream;
035 import org.apache.activemq.util.ByteSequence;
036 import org.apache.activemq.util.JMSExceptionSupport;
037 import org.apache.activemq.util.MarshallingSupport;
038 import org.apache.activemq.wireformat.WireFormat;
039
040 /**
041 * @openwire:marshaller code="28"
042 *
043 */
044 public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage {
045
046 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
047
048 protected String text;
049
050 public Message copy() {
051 ActiveMQTextMessage copy = new ActiveMQTextMessage();
052 copy(copy);
053 return copy;
054 }
055
056 private void copy(ActiveMQTextMessage copy) {
057 super.copy(copy);
058 copy.text = text;
059 }
060
061 public byte getDataStructureType() {
062 return DATA_STRUCTURE_TYPE;
063 }
064
065 public String getJMSXMimeType() {
066 return "jms/text-message";
067 }
068
069 public void setText(String text) throws MessageNotWriteableException {
070 checkReadOnlyBody();
071 this.text = text;
072 setContent(null);
073 }
074
075 public String getText() throws JMSException {
076 if (text == null && getContent() != null) {
077 InputStream is = null;
078 try {
079 ByteSequence bodyAsBytes = getContent();
080 if (bodyAsBytes != null) {
081 is = new ByteArrayInputStream(bodyAsBytes);
082 if (isCompressed()) {
083 is = new InflaterInputStream(is);
084 }
085 DataInputStream dataIn = new DataInputStream(is);
086 text = MarshallingSupport.readUTF8(dataIn);
087 dataIn.close();
088 setContent(null);
089 setCompressed(false);
090 }
091 } catch (IOException ioe) {
092 throw JMSExceptionSupport.create(ioe);
093 } finally {
094 if (is != null) {
095 try {
096 is.close();
097 } catch (IOException e) {
098 // ignore
099 }
100 }
101 }
102 }
103 return text;
104 }
105
106 public void beforeMarshall(WireFormat wireFormat) throws IOException {
107 super.beforeMarshall(wireFormat);
108 storeContent();
109 }
110
111 @Override
112 public void storeContent() {
113 try {
114 ByteSequence content = getContent();
115 if (content == null && text != null) {
116 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
117 OutputStream os = bytesOut;
118 ActiveMQConnection connection = getConnection();
119 if (connection != null && connection.isUseCompression()) {
120 compressed = true;
121 os = new DeflaterOutputStream(os);
122 }
123 DataOutputStream dataOut = new DataOutputStream(os);
124 MarshallingSupport.writeUTF8(dataOut, this.text);
125 dataOut.close();
126 setContent(bytesOut.toByteSequence());
127 }
128 } catch (IOException e) {
129 throw new RuntimeException(e);
130 }
131 }
132
133 // see https://issues.apache.org/activemq/browse/AMQ-2103
134 // and https://issues.apache.org/activemq/browse/AMQ-2966
135 public void clearMarshalledState() throws JMSException {
136 super.clearMarshalledState();
137 this.text = null;
138 }
139
140 /**
141 * Clears out the message body. Clearing a message's body does not clear its
142 * header values or property entries. <p/>
143 * <P>
144 * If this message body was read-only, calling this method leaves the
145 * message body in the same state as an empty body in a newly created
146 * message.
147 *
148 * @throws JMSException if the JMS provider fails to clear the message body
149 * due to some internal error.
150 */
151 public void clearBody() throws JMSException {
152 super.clearBody();
153 this.text = null;
154 }
155
156 public int getSize() {
157 if (size == 0 && content == null && text != null) {
158 size = getMinimumMessageSize();
159 if (marshalledProperties != null) {
160 size += marshalledProperties.getLength();
161 }
162 size += text.length() * 2;
163 }
164 return super.getSize();
165 }
166
167 public String toString() {
168 try {
169 String text = getText();
170 if (text != null) {
171 text = MarshallingSupport.truncate64(text);
172 HashMap<String, Object> overrideFields = new HashMap<String, Object>();
173 overrideFields.put("text", text);
174 return super.toString(overrideFields);
175 }
176 } catch (JMSException e) {
177 }
178 return super.toString();
179 }
180 }