001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.concurrent.atomic.AtomicBoolean;
020    
021    import javax.jms.Connection;
022    import javax.jms.Destination;
023    import javax.jms.JMSException;
024    import javax.jms.Message;
025    import javax.jms.MessageConsumer;
026    import javax.jms.MessageListener;
027    import javax.jms.MessageProducer;
028    
029    import org.apache.activemq.Service;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * A Destination bridge is used to bridge between to different JMS systems
035     */
036    public abstract class DestinationBridge implements Service, MessageListener {
037    
038        private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
039    
040        protected MessageConsumer consumer;
041        protected AtomicBoolean started = new AtomicBoolean(false);
042        protected JmsMesageConvertor jmsMessageConvertor;
043        protected boolean doHandleReplyTo = true;
044        protected JmsConnector jmsConnector;
045    
046        /**
047         * @return Returns the consumer.
048         */
049        public MessageConsumer getConsumer() {
050            return consumer;
051        }
052    
053        /**
054         * @param consumer The consumer to set.
055         */
056        public void setConsumer(MessageConsumer consumer) {
057            this.consumer = consumer;
058        }
059    
060        /**
061         * @param connector
062         */
063        public void setJmsConnector(JmsConnector connector) {
064            this.jmsConnector = connector;
065        }
066    
067        /**
068         * @return Returns the inboundMessageConvertor.
069         */
070        public JmsMesageConvertor getJmsMessageConvertor() {
071            return jmsMessageConvertor;
072        }
073    
074        /**
075         * @param jmsMessageConvertor
076         */
077        public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
078            this.jmsMessageConvertor = jmsMessageConvertor;
079        }
080    
081        protected Destination processReplyToDestination(Destination destination) {
082            return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
083        }
084    
085        public void start() throws Exception {
086            if (started.compareAndSet(false, true)) {
087                createConsumer();
088                createProducer();
089            }
090        }
091    
092        public void stop() throws Exception {
093            started.set(false);
094        }
095    
096        public void onMessage(Message message) {
097    
098            int attempt = 0;
099            final int maxRetries = jmsConnector.getReconnectionPolicy().getMaxSendRetries();
100    
101            while (started.get() && message != null && attempt <= maxRetries) {
102    
103                try {
104    
105                    if (attempt++ > 0) {
106                        try {
107                            Thread.sleep(jmsConnector.getReconnectionPolicy().getNextDelay(attempt));
108                        } catch(InterruptedException e) {
109                            break;
110                        }
111                    }
112    
113                    Message converted;
114                    if (jmsMessageConvertor != null) {
115                        if (doHandleReplyTo) {
116                            Destination replyTo = message.getJMSReplyTo();
117                            if (replyTo != null) {
118                                converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
119                            } else {
120                                converted = jmsMessageConvertor.convert(message);
121                            }
122                        } else {
123                            message.setJMSReplyTo(null);
124                            converted = jmsMessageConvertor.convert(message);
125                        }
126                    } else {
127                        // The Producer side is not up or not yet configured, retry.
128                        continue;
129                    }
130    
131                    try {
132                        sendMessage(converted);
133                    } catch(Exception e) {
134                        jmsConnector.handleConnectionFailure(getConnectionForProducer());
135                        continue;
136                    }
137    
138                    try {
139                        message.acknowledge();
140                    } catch(Exception e) {
141                        jmsConnector.handleConnectionFailure(getConnnectionForConsumer());
142                        continue;
143                    }
144    
145                    // if we got here then it made it out and was ack'd
146                    return;
147    
148                } catch (Exception e) {
149                    LOG.info("failed to forward message on attempt: " + attempt +
150                             " reason: " + e + " message: " + message, e);
151                }
152            }
153        }
154    
155        /**
156         * @return Returns the doHandleReplyTo.
157         */
158        protected boolean isDoHandleReplyTo() {
159            return doHandleReplyTo;
160        }
161    
162        /**
163         * @param doHandleReplyTo The doHandleReplyTo to set.
164         */
165        protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
166            this.doHandleReplyTo = doHandleReplyTo;
167        }
168    
169        protected abstract MessageConsumer createConsumer() throws JMSException;
170    
171        protected abstract MessageProducer createProducer() throws JMSException;
172    
173        protected abstract void sendMessage(Message message) throws JMSException;
174    
175        protected abstract Connection getConnnectionForConsumer();
176    
177        protected abstract Connection getConnectionForProducer();
178    
179    }