001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.io.IOException;
020 import java.io.UnsupportedEncodingException;
021 import java.util.Enumeration;
022 import java.util.HashMap;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.Vector;
026
027 import javax.jms.DeliveryMode;
028 import javax.jms.Destination;
029 import javax.jms.JMSException;
030 import javax.jms.MessageFormatException;
031 import javax.jms.MessageNotWriteableException;
032
033 import org.apache.activemq.ActiveMQConnection;
034 import org.apache.activemq.ScheduledMessage;
035 import org.apache.activemq.broker.scheduler.CronParser;
036 import org.apache.activemq.filter.PropertyExpression;
037 import org.apache.activemq.state.CommandVisitor;
038 import org.apache.activemq.util.Callback;
039 import org.apache.activemq.util.JMSExceptionSupport;
040 import org.apache.activemq.util.TypeConversionSupport;
041
042 /**
043 *
044 * @openwire:marshaller code="23"
045 */
046 public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
047 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
048 public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
049 private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
050
051 protected transient Callback acknowledgeCallback;
052
053 public byte getDataStructureType() {
054 return DATA_STRUCTURE_TYPE;
055 }
056
057 @Override
058 public Message copy() {
059 ActiveMQMessage copy = new ActiveMQMessage();
060 copy(copy);
061 return copy;
062 }
063
064 protected void copy(ActiveMQMessage copy) {
065 super.copy(copy);
066 copy.acknowledgeCallback = acknowledgeCallback;
067 }
068
069 @Override
070 public int hashCode() {
071 MessageId id = getMessageId();
072 if (id != null) {
073 return id.hashCode();
074 } else {
075 return super.hashCode();
076 }
077 }
078
079 @Override
080 public boolean equals(Object o) {
081 if (this == o) {
082 return true;
083 }
084 if (o == null || o.getClass() != getClass()) {
085 return false;
086 }
087
088 ActiveMQMessage msg = (ActiveMQMessage) o;
089 MessageId oMsg = msg.getMessageId();
090 MessageId thisMsg = this.getMessageId();
091 return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
092 }
093
094 public void acknowledge() throws JMSException {
095 if (acknowledgeCallback != null) {
096 try {
097 acknowledgeCallback.execute();
098 } catch (JMSException e) {
099 throw e;
100 } catch (Throwable e) {
101 throw JMSExceptionSupport.create(e);
102 }
103 }
104 }
105
106 @Override
107 public void clearBody() throws JMSException {
108 setContent(null);
109 readOnlyBody = false;
110 }
111
112 public String getJMSMessageID() {
113 MessageId messageId = this.getMessageId();
114 if (messageId == null) {
115 return null;
116 }
117 return messageId.toString();
118 }
119
120 /**
121 * Seems to be invalid because the parameter doesn't initialize MessageId
122 * instance variables ProducerId and ProducerSequenceId
123 *
124 * @param value
125 * @throws JMSException
126 */
127 public void setJMSMessageID(String value) throws JMSException {
128 if (value != null) {
129 try {
130 MessageId id = new MessageId(value);
131 this.setMessageId(id);
132 } catch (NumberFormatException e) {
133 // we must be some foreign JMS provider or strange user-supplied
134 // String
135 // so lets set the IDs to be 1
136 MessageId id = new MessageId();
137 id.setTextView(value);
138 this.setMessageId(messageId);
139 }
140 } else {
141 this.setMessageId(null);
142 }
143 }
144
145 /**
146 * This will create an object of MessageId. For it to be valid, the instance
147 * variable ProducerId and producerSequenceId must be initialized.
148 *
149 * @param producerId
150 * @param producerSequenceId
151 * @throws JMSException
152 */
153 public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws JMSException {
154 MessageId id = null;
155 try {
156 id = new MessageId(producerId, producerSequenceId);
157 this.setMessageId(id);
158 } catch (Throwable e) {
159 throw JMSExceptionSupport.create("Invalid message id '" + id + "', reason: " + e.getMessage(), e);
160 }
161 }
162
163 public long getJMSTimestamp() {
164 return this.getTimestamp();
165 }
166
167 public void setJMSTimestamp(long timestamp) {
168 this.setTimestamp(timestamp);
169 }
170
171 public String getJMSCorrelationID() {
172 return this.getCorrelationId();
173 }
174
175 public void setJMSCorrelationID(String correlationId) {
176 this.setCorrelationId(correlationId);
177 }
178
179 public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
180 return encodeString(this.getCorrelationId());
181 }
182
183 public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws JMSException {
184 this.setCorrelationId(decodeString(correlationId));
185 }
186
187 public String getJMSXMimeType() {
188 return "jms/message";
189 }
190
191 protected static String decodeString(byte[] data) throws JMSException {
192 try {
193 if (data == null) {
194 return null;
195 }
196 return new String(data, "UTF-8");
197 } catch (UnsupportedEncodingException e) {
198 throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
199 }
200 }
201
202 protected static byte[] encodeString(String data) throws JMSException {
203 try {
204 if (data == null) {
205 return null;
206 }
207 return data.getBytes("UTF-8");
208 } catch (UnsupportedEncodingException e) {
209 throw new JMSException("Invalid UTF-8 encoding: " + e.getMessage());
210 }
211 }
212
213 public Destination getJMSReplyTo() {
214 return this.getReplyTo();
215 }
216
217 public void setJMSReplyTo(Destination destination) throws JMSException {
218 this.setReplyTo(ActiveMQDestination.transform(destination));
219 }
220
221 public Destination getJMSDestination() {
222 return this.getDestination();
223 }
224
225 public void setJMSDestination(Destination destination) throws JMSException {
226 this.setDestination(ActiveMQDestination.transform(destination));
227 }
228
229 public int getJMSDeliveryMode() {
230 return this.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
231 }
232
233 public void setJMSDeliveryMode(int mode) {
234 this.setPersistent(mode == DeliveryMode.PERSISTENT);
235 }
236
237 public boolean getJMSRedelivered() {
238 return this.isRedelivered();
239 }
240
241 public void setJMSRedelivered(boolean redelivered) {
242 this.setRedelivered(redelivered);
243 }
244
245 public String getJMSType() {
246 return this.getType();
247 }
248
249 public void setJMSType(String type) {
250 this.setType(type);
251 }
252
253 public long getJMSExpiration() {
254 return this.getExpiration();
255 }
256
257 public void setJMSExpiration(long expiration) {
258 this.setExpiration(expiration);
259 }
260
261 public int getJMSPriority() {
262 return this.getPriority();
263 }
264
265 public void setJMSPriority(int priority) {
266 this.setPriority((byte) priority);
267 }
268
269 @Override
270 public void clearProperties() {
271 super.clearProperties();
272 readOnlyProperties = false;
273 }
274
275 public boolean propertyExists(String name) throws JMSException {
276 try {
277 return (this.getProperties().containsKey(name) || getObjectProperty(name)!= null);
278 } catch (IOException e) {
279 throw JMSExceptionSupport.create(e);
280 }
281 }
282
283 @SuppressWarnings("rawtypes")
284 public Enumeration getPropertyNames() throws JMSException {
285 try {
286 Vector<String> result = new Vector<String>(this.getProperties().keySet());
287 return result.elements();
288 } catch (IOException e) {
289 throw JMSExceptionSupport.create(e);
290 }
291 }
292
293 /**
294 * return all property names, including standard JMS properties and JMSX properties
295 * @return Enumeration of all property names on this message
296 * @throws JMSException
297 */
298 @SuppressWarnings("rawtypes")
299 public Enumeration getAllPropertyNames() throws JMSException {
300 try {
301 Vector<String> result = new Vector<String>(this.getProperties().keySet());
302 result.addAll(JMS_PROPERTY_SETERS.keySet());
303 return result.elements();
304 } catch (IOException e) {
305 throw JMSExceptionSupport.create(e);
306 }
307 }
308
309 interface PropertySetter {
310 void set(Message message, Object value) throws MessageFormatException;
311 }
312
313 static {
314 JMS_PROPERTY_SETERS.put("JMSXDeliveryCount", new PropertySetter() {
315 public void set(Message message, Object value) throws MessageFormatException {
316 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
317 if (rc == null) {
318 throw new MessageFormatException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
319 }
320 message.setRedeliveryCounter(rc.intValue() - 1);
321 }
322 });
323 JMS_PROPERTY_SETERS.put("JMSXGroupID", new PropertySetter() {
324 public void set(Message message, Object value) throws MessageFormatException {
325 String rc = (String) TypeConversionSupport.convert(value, String.class);
326 if (rc == null) {
327 throw new MessageFormatException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
328 }
329 message.setGroupID(rc);
330 }
331 });
332 JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() {
333 public void set(Message message, Object value) throws MessageFormatException {
334 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
335 if (rc == null) {
336 throw new MessageFormatException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
337 }
338 message.setGroupSequence(rc.intValue());
339 }
340 });
341 JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() {
342 public void set(Message message, Object value) throws MessageFormatException {
343 String rc = (String) TypeConversionSupport.convert(value, String.class);
344 if (rc == null) {
345 throw new MessageFormatException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
346 }
347 ((ActiveMQMessage) message).setJMSCorrelationID(rc);
348 }
349 });
350 JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() {
351 public void set(Message message, Object value) throws MessageFormatException {
352 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
353 if (rc == null) {
354 Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
355 if (bool == null) {
356 throw new MessageFormatException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
357 }
358 else {
359 rc = bool.booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
360 }
361 }
362 ((ActiveMQMessage) message).setJMSDeliveryMode(rc);
363 }
364 });
365 JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
366 public void set(Message message, Object value) throws MessageFormatException {
367 Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
368 if (rc == null) {
369 throw new MessageFormatException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
370 }
371 ((ActiveMQMessage) message).setJMSExpiration(rc.longValue());
372 }
373 });
374 JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
375 public void set(Message message, Object value) throws MessageFormatException {
376 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
377 if (rc == null) {
378 throw new MessageFormatException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
379 }
380 ((ActiveMQMessage) message).setJMSPriority(rc.intValue());
381 }
382 });
383 JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
384 public void set(Message message, Object value) throws MessageFormatException {
385 Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
386 if (rc == null) {
387 throw new MessageFormatException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
388 }
389 ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue());
390 }
391 });
392 JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
393 public void set(Message message, Object value) throws MessageFormatException {
394 ActiveMQDestination rc = (ActiveMQDestination) TypeConversionSupport.convert(value, ActiveMQDestination.class);
395 if (rc == null) {
396 throw new MessageFormatException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + ".");
397 }
398 ((ActiveMQMessage) message).setReplyTo(rc);
399 }
400 });
401 JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() {
402 public void set(Message message, Object value) throws MessageFormatException {
403 Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
404 if (rc == null) {
405 throw new MessageFormatException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
406 }
407 ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue());
408 }
409 });
410 JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
411 public void set(Message message, Object value) throws MessageFormatException {
412 String rc = (String) TypeConversionSupport.convert(value, String.class);
413 if (rc == null) {
414 throw new MessageFormatException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
415 }
416 ((ActiveMQMessage) message).setJMSType(rc);
417 }
418 });
419 }
420
421 public void setObjectProperty(String name, Object value) throws JMSException {
422 setObjectProperty(name, value, true);
423 }
424
425 public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws JMSException {
426
427 if (checkReadOnly) {
428 checkReadOnlyProperties();
429 }
430 if (name == null || name.equals("")) {
431 throw new IllegalArgumentException("Property name cannot be empty or null");
432 }
433
434 checkValidObject(value);
435 value = convertScheduled(name, value);
436 PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
437
438 if (setter != null && value != null) {
439 setter.set(this, value);
440 } else {
441 try {
442 this.setProperty(name, value);
443 } catch (IOException e) {
444 throw JMSExceptionSupport.create(e);
445 }
446 }
447 }
448
449 public void setProperties(Map<String, ?> properties) throws JMSException {
450 for (Map.Entry<String, ?> entry : properties.entrySet()) {
451 // Lets use the object property method as we may contain standard
452 // extension headers like JMSXGroupID
453 setObjectProperty((String) entry.getKey(), entry.getValue());
454 }
455 }
456
457 protected void checkValidObject(Object value) throws MessageFormatException {
458
459 boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long;
460 valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null;
461
462 if (!valid) {
463
464 ActiveMQConnection conn = getConnection();
465 // conn is null if we are in the broker rather than a JMS client
466 if (conn == null || conn.isNestedMapAndListEnabled()) {
467 if (!(value instanceof Map || value instanceof List)) {
468 throw new MessageFormatException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: " + value.getClass());
469 }
470 } else {
471 throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
472 }
473 }
474 }
475
476 protected void checkValidScheduled(String name, Object value) throws MessageFormatException {
477 if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) {
478 if (value instanceof Long == false && value instanceof Integer == false) {
479 throw new MessageFormatException(name + " should be long or int value");
480 }
481 }
482 if (AMQ_SCHEDULED_CRON.equals(name)) {
483 CronParser.validate(value.toString());
484 }
485 }
486
487 protected Object convertScheduled(String name, Object value) throws MessageFormatException {
488 Object result = value;
489 if (AMQ_SCHEDULED_DELAY.equals(name)){
490 result = TypeConversionSupport.convert(value, Long.class);
491 }
492 else if (AMQ_SCHEDULED_PERIOD.equals(name)){
493 result = TypeConversionSupport.convert(value, Long.class);
494 }
495 else if (AMQ_SCHEDULED_REPEAT.equals(name)){
496 result = TypeConversionSupport.convert(value, Integer.class);
497 }
498 return result;
499 }
500
501 public Object getObjectProperty(String name) throws JMSException {
502 if (name == null) {
503 throw new NullPointerException("Property name cannot be null");
504 }
505
506 // PropertyExpression handles converting message headers to properties.
507 PropertyExpression expression = new PropertyExpression(name);
508 return expression.evaluate(this);
509 }
510
511 public boolean getBooleanProperty(String name) throws JMSException {
512 Object value = getObjectProperty(name);
513 if (value == null) {
514 return false;
515 }
516 Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
517 if (rc == null) {
518 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
519 }
520 return rc.booleanValue();
521 }
522
523 public byte getByteProperty(String name) throws JMSException {
524 Object value = getObjectProperty(name);
525 if (value == null) {
526 throw new NumberFormatException("property " + name + " was null");
527 }
528 Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
529 if (rc == null) {
530 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
531 }
532 return rc.byteValue();
533 }
534
535 public short getShortProperty(String name) throws JMSException {
536 Object value = getObjectProperty(name);
537 if (value == null) {
538 throw new NumberFormatException("property " + name + " was null");
539 }
540 Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
541 if (rc == null) {
542 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
543 }
544 return rc.shortValue();
545 }
546
547 public int getIntProperty(String name) throws JMSException {
548 Object value = getObjectProperty(name);
549 if (value == null) {
550 throw new NumberFormatException("property " + name + " was null");
551 }
552 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
553 if (rc == null) {
554 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
555 }
556 return rc.intValue();
557 }
558
559 public long getLongProperty(String name) throws JMSException {
560 Object value = getObjectProperty(name);
561 if (value == null) {
562 throw new NumberFormatException("property " + name + " was null");
563 }
564 Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
565 if (rc == null) {
566 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
567 }
568 return rc.longValue();
569 }
570
571 public float getFloatProperty(String name) throws JMSException {
572 Object value = getObjectProperty(name);
573 if (value == null) {
574 throw new NullPointerException("property " + name + " was null");
575 }
576 Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
577 if (rc == null) {
578 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
579 }
580 return rc.floatValue();
581 }
582
583 public double getDoubleProperty(String name) throws JMSException {
584 Object value = getObjectProperty(name);
585 if (value == null) {
586 throw new NullPointerException("property " + name + " was null");
587 }
588 Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
589 if (rc == null) {
590 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
591 }
592 return rc.doubleValue();
593 }
594
595 public String getStringProperty(String name) throws JMSException {
596 Object value = null;
597 if (name.equals("JMSXUserID")) {
598 value = getUserID();
599 if (value == null) {
600 value = getObjectProperty(name);
601 }
602 } else {
603 value = getObjectProperty(name);
604 }
605 if (value == null) {
606 return null;
607 }
608 String rc = (String) TypeConversionSupport.convert(value, String.class);
609 if (rc == null) {
610 throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
611 }
612 return rc;
613 }
614
615 public void setBooleanProperty(String name, boolean value) throws JMSException {
616 setBooleanProperty(name, value, true);
617 }
618
619 public void setBooleanProperty(String name, boolean value, boolean checkReadOnly) throws JMSException {
620 setObjectProperty(name, Boolean.valueOf(value), checkReadOnly);
621 }
622
623 public void setByteProperty(String name, byte value) throws JMSException {
624 setObjectProperty(name, Byte.valueOf(value));
625 }
626
627 public void setShortProperty(String name, short value) throws JMSException {
628 setObjectProperty(name, Short.valueOf(value));
629 }
630
631 public void setIntProperty(String name, int value) throws JMSException {
632 setObjectProperty(name, Integer.valueOf(value));
633 }
634
635 public void setLongProperty(String name, long value) throws JMSException {
636 setObjectProperty(name, Long.valueOf(value));
637 }
638
639 public void setFloatProperty(String name, float value) throws JMSException {
640 setObjectProperty(name, new Float(value));
641 }
642
643 public void setDoubleProperty(String name, double value) throws JMSException {
644 setObjectProperty(name, new Double(value));
645 }
646
647 public void setStringProperty(String name, String value) throws JMSException {
648 setObjectProperty(name, value);
649 }
650
651 private void checkReadOnlyProperties() throws MessageNotWriteableException {
652 if (readOnlyProperties) {
653 throw new MessageNotWriteableException("Message properties are read-only");
654 }
655 }
656
657 protected void checkReadOnlyBody() throws MessageNotWriteableException {
658 if (readOnlyBody) {
659 throw new MessageNotWriteableException("Message body is read-only");
660 }
661 }
662
663 public Callback getAcknowledgeCallback() {
664 return acknowledgeCallback;
665 }
666
667 public void setAcknowledgeCallback(Callback acknowledgeCallback) {
668 this.acknowledgeCallback = acknowledgeCallback;
669 }
670
671 /**
672 * Send operation event listener. Used to get the message ready to be sent.
673 */
674 public void onSend() throws JMSException {
675 setReadOnlyBody(true);
676 setReadOnlyProperties(true);
677 }
678
679 public Response visit(CommandVisitor visitor) throws Exception {
680 return visitor.processMessage(this);
681 }
682
683 @Override
684 public void storeContent() {
685 }
686 }