001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.junit;
018
019import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
020
021import java.io.Serializable;
022import java.net.URI;
023import java.net.URISyntaxException;
024import java.util.Map;
025
026import javax.jms.BytesMessage;
027import javax.jms.Connection;
028import javax.jms.JMSException;
029import javax.jms.MapMessage;
030import javax.jms.Message;
031import javax.jms.MessageProducer;
032import javax.jms.ObjectMessage;
033import javax.jms.Session;
034import javax.jms.StreamMessage;
035import javax.jms.TextMessage;
036
037import org.apache.activemq.ActiveMQConnectionFactory;
038import org.apache.activemq.broker.BrokerFactory;
039import org.apache.activemq.broker.BrokerPlugin;
040import org.apache.activemq.broker.BrokerService;
041import org.apache.activemq.broker.region.Destination;
042import org.apache.activemq.broker.region.policy.PolicyEntry;
043import org.apache.activemq.broker.region.policy.PolicyMap;
044import org.apache.activemq.command.ActiveMQDestination;
045import org.apache.activemq.plugin.StatisticsBrokerPlugin;
046import org.apache.activemq.pool.PooledConnectionFactory;
047import org.junit.rules.ExternalResource;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * A JUnit Rule that embeds an ActiveMQ broker into a test.
053 */
054public class EmbeddedActiveMQBroker extends ExternalResource {
055    Logger log = LoggerFactory.getLogger(this.getClass());
056
057    BrokerService brokerService;
058    InternalClient internalClient;
059
060    /**
061     * Create an embedded ActiveMQ broker using defaults
062     * <p>
063     * The defaults are:
064     * - the broker name is 'embedded-broker'
065     * - JMX is enable but no management connector is created.
066     * - Persistence is disabled
067     */
068    public EmbeddedActiveMQBroker() {
069        brokerService = new BrokerService();
070        brokerService.setUseJmx(true);
071        brokerService.getManagementContext().setCreateConnector(false);
072        brokerService.setUseShutdownHook(false);
073        brokerService.setPersistent(false);
074        brokerService.setBrokerName("embedded-broker");
075    }
076
077    /**
078     * Create an embedded ActiveMQ broker using a configuration URI
079     */
080    public EmbeddedActiveMQBroker(String configurationURI) {
081        try {
082            brokerService = BrokerFactory.createBroker(configurationURI);
083        } catch (Exception ex) {
084            throw new RuntimeException("Exception encountered creating embedded ActiveMQ broker from configuration URI: " + configurationURI, ex);
085        }
086    }
087
088    /**
089     * Create an embedded ActiveMQ broker using a configuration URI
090     */
091    public EmbeddedActiveMQBroker(URI configurationURI) {
092        try {
093            brokerService = BrokerFactory.createBroker(configurationURI);
094        } catch (Exception ex) {
095            throw new RuntimeException("Exception encountered creating embedded ActiveMQ broker from configuration URI: " + configurationURI, ex);
096        }
097    }
098
099    public static void setMessageProperties(Message message, Map<String, Object> properties) {
100        if (properties != null && properties.size() > 0) {
101            for (Map.Entry<String, Object> property : properties.entrySet()) {
102                try {
103                    message.setObjectProperty(property.getKey(), property.getValue());
104                } catch (JMSException jmsEx) {
105                    throw new EmbeddedActiveMQBrokerException(String.format("Failed to set property {%s = %s}", property.getKey(), property.getValue().toString()), jmsEx);
106                }
107            }
108        }
109    }
110
111    /**
112     * Customize the configuration of the embedded ActiveMQ broker
113     * <p>
114     * This method is called before the embedded ActiveMQ broker is started, and can
115     * be overridden to this method to customize the broker configuration.
116     */
117    protected void configure() {
118    }
119
120    /**
121     * Start the embedded ActiveMQ broker, blocking until the broker has successfully started.
122     * <p/>
123     * The broker will normally be started by JUnit using the before() method.  This method allows the broker to
124     * be started manually to support advanced testing scenarios.
125     */
126    public void start() {
127        try {
128            this.configure();
129            brokerService.start();
130            internalClient = new InternalClient();
131            internalClient.start();
132        } catch (Exception ex) {
133            throw new RuntimeException("Exception encountered starting embedded ActiveMQ broker: {}" + this.getBrokerName(), ex);
134        }
135
136        brokerService.waitUntilStarted();
137    }
138
139    /**
140     * Stop the embedded ActiveMQ broker, blocking until the broker has stopped.
141     * <p/>
142     * The broker will normally be stopped by JUnit using the after() method.  This method allows the broker to
143     * be stopped manually to support advanced testing scenarios.
144     */
145    public void stop() {
146        if (internalClient != null) {
147            internalClient.stop();
148            internalClient = null;
149        }
150        if (!brokerService.isStopped()) {
151            try {
152                brokerService.stop();
153            } catch (Exception ex) {
154                log.warn("Exception encountered stopping embedded ActiveMQ broker: {}" + this.getBrokerName(), ex);
155            }
156        }
157
158        brokerService.waitUntilStopped();
159    }
160
161    /**
162     * Start the embedded ActiveMQ Broker
163     * <p/>
164     * Invoked by JUnit to setup the resource
165     */
166    @Override
167    protected void before() throws Throwable {
168        log.info("Starting embedded ActiveMQ broker: {}", this.getBrokerName());
169
170        this.start();
171
172        super.before();
173    }
174
175    /**
176     * Stop the embedded ActiveMQ Broker
177     * <p/>
178     * Invoked by JUnit to tear down the resource
179     */
180    @Override
181    protected void after() {
182        log.info("Stopping Embedded ActiveMQ Broker: {}", this.getBrokerName());
183
184        super.after();
185
186        this.stop();
187    }
188
189    /**
190     * Create an ActiveMQConnectionFactory for the embedded ActiveMQ Broker
191     *
192     * @return a new ActiveMQConnectionFactory
193     */
194    public ActiveMQConnectionFactory createConnectionFactory() {
195        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
196        connectionFactory.setBrokerURL(getVmURL());
197        return connectionFactory;
198    }
199
200    /**
201     * Create an PooledConnectionFactory for the embedded ActiveMQ Broker
202     *
203     * @return a new PooledConnectionFactory
204     */
205    public PooledConnectionFactory createPooledConnectionFactory() {
206        ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
207
208        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory);
209
210        return pooledConnectionFactory;
211    }
212
213    /**
214     * Get the BrokerService for the embedded ActiveMQ broker.
215     * <p/>
216     * This may be required for advanced configuration of the BrokerService.
217     *
218     * @return the embedded ActiveMQ broker
219     */
220    public BrokerService getBrokerService() {
221        return brokerService;
222    }
223
224    /**
225     * Get the failover VM URL for the embedded ActiveMQ Broker
226     * <p/>
227     * NOTE:  The create=false option is appended to the URL to avoid the automatic creation of brokers
228     * and the resulting duplicate broker errors
229     *
230     * @return the VM URL for the embedded broker
231     */
232    public String getVmURL() {
233        return getVmURL(true);
234    }
235
236    /**
237     * Get the VM URL for the embedded ActiveMQ Broker
238     * <p/>
239     * NOTE:  The create=false option is appended to the URL to avoid the automatic creation of brokers
240     * and the resulting duplicate broker errors
241     *
242     * @param failoverURL if true a failover URL will be returned
243     * @return the VM URL for the embedded broker
244     */
245    public String getVmURL(boolean failoverURL) {
246        if (failoverURL) {
247            return String.format("failover:(%s?create=false)", brokerService.getVmConnectorURI().toString());
248        }
249
250        return brokerService.getVmConnectorURI().toString() + "?create=false";
251    }
252
253    /**
254     * Get the failover VM URI for the embedded ActiveMQ Broker
255     * <p/>
256     * NOTE:  The create=false option is appended to the URI to avoid the automatic creation of brokers
257     * and the resulting duplicate broker errors
258     *
259     * @return the VM URI for the embedded broker
260     */
261    public URI getVmURI() {
262        return getVmURI(true);
263    }
264
265    /**
266     * Get the VM URI for the embedded ActiveMQ Broker
267     * <p/>
268     * NOTE:  The create=false option is appended to the URI to avoid the automatic creation of brokers
269     * and the resulting duplicate broker errors
270     *
271     * @param failoverURI if true a failover URI will be returned
272     * @return the VM URI for the embedded broker
273     */
274    public URI getVmURI(boolean failoverURI) {
275        URI result;
276        try {
277            result = new URI(getVmURL(failoverURI));
278        } catch (URISyntaxException uriEx) {
279            throw new RuntimeException("Unable to create failover URI", uriEx);
280        }
281
282        return result;
283    }
284
285    /**
286     * Get the name of the embedded ActiveMQ Broker
287     *
288     * @return name of the embedded broker
289     */
290    public String getBrokerName() {
291        return brokerService.getBrokerName();
292    }
293
294    public void setBrokerName(String brokerName) {
295        brokerService.setBrokerName(brokerName);
296    }
297
298    public boolean isStatisticsPluginEnabled() {
299        BrokerPlugin[] plugins = brokerService.getPlugins();
300
301        if (null != plugins) {
302            for (BrokerPlugin plugin : plugins) {
303                if (plugin instanceof StatisticsBrokerPlugin) {
304                    return true;
305                }
306            }
307        }
308
309        return false;
310    }
311
312    public void enableStatisticsPlugin() {
313        if (!isStatisticsPluginEnabled()) {
314            BrokerPlugin[] newPlugins;
315            BrokerPlugin[] currentPlugins = brokerService.getPlugins();
316            if (null != currentPlugins && 0 < currentPlugins.length) {
317                newPlugins = new BrokerPlugin[currentPlugins.length + 1];
318
319                System.arraycopy(currentPlugins, 0, newPlugins, 0, currentPlugins.length);
320            } else {
321                newPlugins = new BrokerPlugin[1];
322            }
323
324            newPlugins[newPlugins.length - 1] = new StatisticsBrokerPlugin();
325
326            brokerService.setPlugins(newPlugins);
327        }
328    }
329
330    public void disableStatisticsPlugin() {
331        if (isStatisticsPluginEnabled()) {
332            BrokerPlugin[] currentPlugins = brokerService.getPlugins();
333            if (1 < currentPlugins.length) {
334                BrokerPlugin[] newPlugins = new BrokerPlugin[currentPlugins.length - 1];
335
336                int i = 0;
337                for (BrokerPlugin plugin : currentPlugins) {
338                    if (!(plugin instanceof StatisticsBrokerPlugin)) {
339                        newPlugins[i++] = plugin;
340                    }
341                }
342                brokerService.setPlugins(newPlugins);
343            } else {
344                brokerService.setPlugins(null);
345            }
346
347        }
348    }
349
350    public boolean isAdvisoryForDeliveryEnabled() {
351        return getDefaultPolicyEntry().isAdvisoryForDelivery();
352    }
353
354    public void enableAdvisoryForDelivery() {
355        getDefaultPolicyEntry().setAdvisoryForDelivery(true);
356    }
357
358    public void disableAdvisoryForDelivery() {
359        getDefaultPolicyEntry().setAdvisoryForDelivery(false);
360    }
361
362    public boolean isAdvisoryForConsumedEnabled() {
363        return getDefaultPolicyEntry().isAdvisoryForConsumed();
364    }
365
366    public void enableAdvisoryForConsumed() {
367        getDefaultPolicyEntry().setAdvisoryForConsumed(true);
368    }
369
370    public void disableAdvisoryForConsumed() {
371        getDefaultPolicyEntry().setAdvisoryForConsumed(false);
372    }
373
374    public boolean isAdvisoryForDiscardingMessagesEnabled() {
375        return getDefaultPolicyEntry().isAdvisoryForDiscardingMessages();
376    }
377
378    public void enableAdvisoryForDiscardingMessages() {
379        getDefaultPolicyEntry().setAdvisoryForDiscardingMessages(true);
380    }
381
382    public void disableAdvisoryForDiscardingMessages() {
383        getDefaultPolicyEntry().setAdvisoryForDiscardingMessages(false);
384    }
385
386    public boolean isAdvisoryForFastProducersEnabled() {
387        return getDefaultPolicyEntry().isAdvisoryForFastProducers();
388    }
389
390    public void enableAdvisoryForFastProducers() {
391        getDefaultPolicyEntry().setAdvisoryForFastProducers(true);
392    }
393
394    public void disableAdvisoryForFastProducers() {
395        getDefaultPolicyEntry().setAdvisoryForFastProducers(false);
396    }
397
398    public boolean isAdvisoryForSlowConsumersEnabled() {
399        return getDefaultPolicyEntry().isAdvisoryForSlowConsumers();
400    }
401
402    public void enableAdvisoryForSlowConsumers() {
403        getDefaultPolicyEntry().setAdvisoryForSlowConsumers(true);
404    }
405
406    public void disableAdvisoryForSlowConsumers() {
407        getDefaultPolicyEntry().setAdvisoryForSlowConsumers(false);
408    }
409
410    /**
411     * Get the number of messages in a specific JMS Destination.
412     * <p/>
413     * The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
414     * or topic://myTopic.  If the destination type prefix is not included in the destination name, a prefix
415     * of "queue://" is assumed.
416     *
417     * @param destinationName the full name of the JMS Destination
418     * @return the number of messages in the JMS Destination
419     */
420    public long getMessageCount(String destinationName) {
421        if (null == brokerService) {
422            throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
423        }
424
425        // TODO: Figure out how to do this for Topics
426        Destination destination = getDestination(destinationName);
427        if (destination == null) {
428            throw new RuntimeException("Failed to find destination: " + destinationName);
429        }
430
431        // return destination.getMessageStore().getMessageCount();
432        return destination.getDestinationStatistics().getMessages().getCount();
433    }
434
435    /**
436     * Get the ActiveMQ destination
437     * <p/>
438     * The full name of the JMS destination including the prefix should be provided - i.e. queue://myQueue
439     * or topic://myTopic.  If the destination type prefix is not included in the destination name, a prefix
440     * of "queue://" is assumed.
441     *
442     * @param destinationName the full name of the JMS Destination
443     * @return the ActiveMQ destination, null if not found
444     */
445    public Destination getDestination(String destinationName) {
446        if (null == brokerService) {
447            throw new IllegalStateException("BrokerService has not yet been created - was before() called?");
448        }
449
450        Destination destination = null;
451        try {
452            destination = brokerService.getDestination(ActiveMQDestination.createDestination(destinationName, QUEUE_TYPE));
453        } catch (RuntimeException runtimeEx) {
454            throw runtimeEx;
455        } catch (Exception ex) {
456            throw new EmbeddedActiveMQBrokerException("Unexpected exception getting destination from broker", ex);
457        }
458
459        return destination;
460    }
461
462    private PolicyEntry getDefaultPolicyEntry() {
463        PolicyMap destinationPolicy = brokerService.getDestinationPolicy();
464        if (null == destinationPolicy) {
465            destinationPolicy = new PolicyMap();
466            brokerService.setDestinationPolicy(destinationPolicy);
467        }
468
469        PolicyEntry defaultEntry = destinationPolicy.getDefaultEntry();
470        if (null == defaultEntry) {
471            defaultEntry = new PolicyEntry();
472            destinationPolicy.setDefaultEntry(defaultEntry);
473        }
474
475        return defaultEntry;
476    }
477
478    public BytesMessage createBytesMessage() {
479        return internalClient.createBytesMessage();
480    }
481
482    public TextMessage createTextMessage() {
483        return internalClient.createTextMessage();
484    }
485
486    public MapMessage createMapMessage() {
487        return internalClient.createMapMessage();
488    }
489
490    public ObjectMessage createObjectMessage() {
491        return internalClient.createObjectMessage();
492    }
493
494    public StreamMessage createStreamMessage() {
495        return internalClient.createStreamMessage();
496    }
497
498    public BytesMessage createMessage(byte[] body) {
499        return this.createMessage(body, null);
500    }
501
502    public TextMessage createMessage(String body) {
503        return this.createMessage(body, null);
504    }
505
506    public MapMessage createMessage(Map<String, Object> body) {
507        return this.createMessage(body, null);
508    }
509
510    public ObjectMessage createMessage(Serializable body) {
511        return this.createMessage(body, null);
512    }
513
514    public BytesMessage createMessage(byte[] body, Map<String, Object> properties) {
515        BytesMessage message = this.createBytesMessage();
516        if (body != null) {
517            try {
518                message.writeBytes(body);
519            } catch (JMSException jmsEx) {
520                throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on BytesMessage", new String(body)), jmsEx);
521            }
522        }
523
524        setMessageProperties(message, properties);
525
526        return message;
527    }
528
529    public TextMessage createMessage(String body, Map<String, Object> properties) {
530        TextMessage message = this.createTextMessage();
531        if (body != null) {
532            try {
533                message.setText(body);
534            } catch (JMSException jmsEx) {
535                throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on TextMessage", body), jmsEx);
536            }
537        }
538
539        setMessageProperties(message, properties);
540
541        return message;
542    }
543
544    public MapMessage createMessage(Map<String, Object> body, Map<String, Object> properties) {
545        MapMessage message = this.createMapMessage();
546
547        if (body != null) {
548            for (Map.Entry<String, Object> entry : body.entrySet()) {
549                try {
550                    message.setObject(entry.getKey(), entry.getValue());
551                } catch (JMSException jmsEx) {
552                    throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body entry {%s = %s} on MapMessage", entry.getKey(), entry.getValue().toString()), jmsEx);
553                }
554            }
555        }
556
557        setMessageProperties(message, properties);
558
559        return message;
560    }
561
562    public ObjectMessage createMessage(Serializable body, Map<String, Object> properties) {
563        ObjectMessage message = this.createObjectMessage();
564
565        if (body != null) {
566            try {
567                message.setObject(body);
568            } catch (JMSException jmsEx) {
569                throw new EmbeddedActiveMQBrokerException(String.format("Failed to set body {%s} on ObjectMessage", body.toString()), jmsEx);
570            }
571        }
572
573        setMessageProperties(message, properties);
574
575        return message;
576    }
577
578    public void pushMessage(String destinationName, Message message) {
579        if (destinationName == null) {
580            throw new IllegalArgumentException("pushMessage failure - destination name is required");
581        } else if (message == null) {
582            throw new IllegalArgumentException("pushMessage failure - a Message is required");
583        }
584        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
585
586        internalClient.pushMessage(destination, message);
587    }
588
589    public BytesMessage pushMessage(String destinationName, byte[] body) {
590        BytesMessage message = createMessage(body, null);
591        pushMessage(destinationName, message);
592        return message;
593    }
594
595    public TextMessage pushMessage(String destinationName, String body) {
596        TextMessage message = createMessage(body, null);
597        pushMessage(destinationName, message);
598        return message;
599    }
600
601    public MapMessage pushMessage(String destinationName, Map<String, Object> body) {
602        MapMessage message = createMessage(body, null);
603        pushMessage(destinationName, message);
604        return message;
605    }
606
607    public ObjectMessage pushMessage(String destinationName, Serializable body) {
608        ObjectMessage message = createMessage(body, null);
609        pushMessage(destinationName, message);
610        return message;
611    }
612
613    public BytesMessage pushMessageWithProperties(String destinationName, byte[] body, Map<String, Object> properties) {
614        BytesMessage message = createMessage(body, properties);
615        pushMessage(destinationName, message);
616        return message;
617    }
618
619    public TextMessage pushMessageWithProperties(String destinationName, String body, Map<String, Object> properties) {
620        TextMessage message = createMessage(body, properties);
621        pushMessage(destinationName, message);
622        return message;
623    }
624
625    public MapMessage pushMessageWithProperties(String destinationName, Map<String, Object> body, Map<String, Object> properties) {
626        MapMessage message = createMessage(body, properties);
627        pushMessage(destinationName, message);
628        return message;
629    }
630
631    public ObjectMessage pushMessageWithProperties(String destinationName, Serializable body, Map<String, Object> properties) {
632        ObjectMessage message = createMessage(body, properties);
633        pushMessage(destinationName, message);
634        return message;
635    }
636
637
638    public Message peekMessage(String destinationName) {
639        if (null == brokerService) {
640            throw new NullPointerException("peekMessage failure  - BrokerService is null");
641        }
642
643        if (destinationName == null) {
644            throw new IllegalArgumentException("peekMessage failure - destination name is required");
645        }
646
647        ActiveMQDestination destination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
648        Destination brokerDestination = null;
649
650        try {
651            brokerDestination = brokerService.getDestination(destination);
652        } catch (Exception ex) {
653            throw new EmbeddedActiveMQBrokerException("peekMessage failure - unexpected exception getting destination from BrokerService", ex);
654        }
655
656        if (brokerDestination == null) {
657            throw new IllegalStateException(String.format("peekMessage failure - destination %s not found in broker %s", destination.toString(), brokerService.getBrokerName()));
658        }
659
660        org.apache.activemq.command.Message[] messages = brokerDestination.browse();
661        if (messages != null && messages.length > 0) {
662            return (Message) messages[0];
663        }
664
665        return null;
666    }
667
668    public BytesMessage peekBytesMessage(String destinationName) {
669        return (BytesMessage) peekMessage(destinationName);
670    }
671
672    public TextMessage peekTextMessage(String destinationName) {
673        return (TextMessage) peekMessage(destinationName);
674    }
675
676    public MapMessage peekMapMessage(String destinationName) {
677        return (MapMessage) peekMessage(destinationName);
678    }
679
680    public ObjectMessage peekObjectMessage(String destinationName) {
681        return (ObjectMessage) peekMessage(destinationName);
682    }
683
684    public StreamMessage peekStreamMessage(String destinationName) {
685        return (StreamMessage) peekMessage(destinationName);
686    }
687
688    public static class EmbeddedActiveMQBrokerException extends RuntimeException {
689        public EmbeddedActiveMQBrokerException(String message) {
690            super(message);
691        }
692
693        public EmbeddedActiveMQBrokerException(String message, Exception cause) {
694            super(message, cause);
695        }
696    }
697
698    private class InternalClient {
699        ActiveMQConnectionFactory connectionFactory;
700        Connection connection;
701        Session session;
702        MessageProducer producer;
703
704        public InternalClient() {
705        }
706
707        void start() {
708            connectionFactory = createConnectionFactory();
709            try {
710                connection = connectionFactory.createConnection();
711                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
712                producer = session.createProducer(null);
713                connection.start();
714            } catch (JMSException jmsEx) {
715                throw new EmbeddedActiveMQBrokerException("Internal Client creation failure", jmsEx);
716            }
717        }
718
719        void stop() {
720            if (null != connection) {
721                try {
722                    connection.close();
723                } catch (JMSException jmsEx) {
724                    log.warn("JMSException encounter closing InternalClient connection - ignoring", jmsEx);
725                }
726            }
727        }
728
729        public BytesMessage createBytesMessage() {
730            checkSession();
731
732            try {
733                return session.createBytesMessage();
734            } catch (JMSException jmsEx) {
735                throw new EmbeddedActiveMQBrokerException("Failed to create BytesMessage", jmsEx);
736            }
737        }
738
739        public TextMessage createTextMessage() {
740            checkSession();
741
742            try {
743                return session.createTextMessage();
744            } catch (JMSException jmsEx) {
745                throw new EmbeddedActiveMQBrokerException("Failed to create TextMessage", jmsEx);
746            }
747        }
748
749        public MapMessage createMapMessage() {
750            checkSession();
751
752            try {
753                return session.createMapMessage();
754            } catch (JMSException jmsEx) {
755                throw new EmbeddedActiveMQBrokerException("Failed to create MapMessage", jmsEx);
756            }
757        }
758
759        public ObjectMessage createObjectMessage() {
760            checkSession();
761
762            try {
763                return session.createObjectMessage();
764            } catch (JMSException jmsEx) {
765                throw new EmbeddedActiveMQBrokerException("Failed to create ObjectMessage", jmsEx);
766            }
767        }
768
769        public StreamMessage createStreamMessage() {
770            checkSession();
771            try {
772                return session.createStreamMessage();
773            } catch (JMSException jmsEx) {
774                throw new EmbeddedActiveMQBrokerException("Failed to create StreamMessage", jmsEx);
775            }
776        }
777
778        public void pushMessage(ActiveMQDestination destination, Message message) {
779            if (producer == null) {
780                throw new IllegalStateException("JMS MessageProducer is null - has the InternalClient been started?");
781            }
782
783            try {
784                producer.send(destination, message);
785            } catch (JMSException jmsEx) {
786                throw new EmbeddedActiveMQBrokerException(String.format("Failed to push %s to %s", message.getClass().getSimpleName(), destination.toString()), jmsEx);
787            }
788        }
789
790        void checkSession() {
791            if (session == null) {
792                throw new IllegalStateException("JMS Session is null - has the InternalClient been started?");
793            }
794        }
795    }
796}