001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import javax.jms.Connection;
020    import javax.jms.Destination;
021    import javax.jms.ExceptionListener;
022    import javax.jms.JMSException;
023    import javax.jms.Session;
024    import javax.jms.Topic;
025    import javax.jms.TopicConnection;
026    import javax.jms.TopicConnectionFactory;
027    import javax.jms.TopicSession;
028    import javax.naming.NamingException;
029    
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * A Bridge to other JMS Topic providers
035     */
036    public class SimpleJmsTopicConnector extends JmsConnector {
037        private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsTopicConnector.class);
038        private String outboundTopicConnectionFactoryName;
039        private String localConnectionFactoryName;
040        private TopicConnectionFactory outboundTopicConnectionFactory;
041        private TopicConnectionFactory localTopicConnectionFactory;
042        private InboundTopicBridge[] inboundTopicBridges;
043        private OutboundTopicBridge[] outboundTopicBridges;
044    
045        /**
046         * @return Returns the inboundTopicBridges.
047         */
048        public InboundTopicBridge[] getInboundTopicBridges() {
049            return inboundTopicBridges;
050        }
051    
052        /**
053         * @param inboundTopicBridges The inboundTopicBridges to set.
054         */
055        public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
056            this.inboundTopicBridges = inboundTopicBridges;
057        }
058    
059        /**
060         * @return Returns the outboundTopicBridges.
061         */
062        public OutboundTopicBridge[] getOutboundTopicBridges() {
063            return outboundTopicBridges;
064        }
065    
066        /**
067         * @param outboundTopicBridges The outboundTopicBridges to set.
068         */
069        public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
070            this.outboundTopicBridges = outboundTopicBridges;
071        }
072    
073        /**
074         * @return Returns the localTopicConnectionFactory.
075         */
076        public TopicConnectionFactory getLocalTopicConnectionFactory() {
077            return localTopicConnectionFactory;
078        }
079    
080        /**
081         * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
082         */
083        public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
084            this.localTopicConnectionFactory = localConnectionFactory;
085        }
086    
087        /**
088         * @return Returns the outboundTopicConnectionFactory.
089         */
090        public TopicConnectionFactory getOutboundTopicConnectionFactory() {
091            return outboundTopicConnectionFactory;
092        }
093    
094        /**
095         * @return Returns the outboundTopicConnectionFactoryName.
096         */
097        public String getOutboundTopicConnectionFactoryName() {
098            return outboundTopicConnectionFactoryName;
099        }
100    
101        /**
102         * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
103         */
104        public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
105            this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
106        }
107    
108        /**
109         * @return Returns the localConnectionFactoryName.
110         */
111        public String getLocalConnectionFactoryName() {
112            return localConnectionFactoryName;
113        }
114    
115        /**
116         * @param localConnectionFactoryName The localConnectionFactoryName to set.
117         */
118        public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
119            this.localConnectionFactoryName = localConnectionFactoryName;
120        }
121    
122        /**
123         * @return Returns the localTopicConnection.
124         */
125        public TopicConnection getLocalTopicConnection() {
126            return (TopicConnection) localConnection.get();
127        }
128    
129        /**
130         * @param localTopicConnection The localTopicConnection to set.
131         */
132        public void setLocalTopicConnection(TopicConnection localTopicConnection) {
133            this.localConnection.set(localTopicConnection);
134        }
135    
136        /**
137         * @return Returns the outboundTopicConnection.
138         */
139        public TopicConnection getOutboundTopicConnection() {
140            return (TopicConnection) foreignConnection.get();
141        }
142    
143        /**
144         * @param outboundTopicConnection The outboundTopicConnection to set.
145         */
146        public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
147            this.foreignConnection.set(foreignTopicConnection);
148        }
149    
150        /**
151         * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
152         */
153        public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
154            this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
155        }
156    
157        @Override
158        protected void initializeForeignConnection() throws NamingException, JMSException {
159    
160            final TopicConnection newConnection;
161    
162            if (foreignConnection.get() == null) {
163                // get the connection factories
164                if (outboundTopicConnectionFactory == null) {
165                    // look it up from JNDI
166                    if (outboundTopicConnectionFactoryName != null) {
167                        outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
168                            .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
169                        if (outboundUsername != null) {
170                            newConnection = outboundTopicConnectionFactory
171                                .createTopicConnection(outboundUsername, outboundPassword);
172                        } else {
173                            newConnection = outboundTopicConnectionFactory.createTopicConnection();
174                        }
175                    } else {
176                        throw new JMSException("Cannot create foreignConnection - no information");
177                    }
178                } else {
179                    if (outboundUsername != null) {
180                        newConnection = outboundTopicConnectionFactory
181                            .createTopicConnection(outboundUsername, outboundPassword);
182                    } else {
183                        newConnection = outboundTopicConnectionFactory.createTopicConnection();
184                    }
185                }
186            } else {
187                // Clear if for now in case something goes wrong during the init.
188                newConnection = (TopicConnection) foreignConnection.getAndSet(null);
189            }
190    
191            if (outboundClientId != null && outboundClientId.length() > 0) {
192                newConnection.setClientID(getOutboundClientId());
193            }
194            newConnection.start();
195    
196            outboundMessageConvertor.setConnection(newConnection);
197    
198            // Configure the bridges with the new Outbound connection.
199            initializeInboundDestinationBridgesOutboundSide(newConnection);
200            initializeOutboundDestinationBridgesOutboundSide(newConnection);
201    
202            // Register for any async error notifications now so we can reset in the
203            // case where there's not a lot of activity and a connection drops.
204            newConnection.setExceptionListener(new ExceptionListener() {
205                @Override
206                public void onException(JMSException exception) {
207                    handleConnectionFailure(newConnection);
208                }
209            });
210    
211            // At this point all looks good, so this our current connection now.
212            foreignConnection.set(newConnection);
213        }
214    
215        @Override
216        protected void initializeLocalConnection() throws NamingException, JMSException {
217    
218            final TopicConnection newConnection;
219    
220            if (localConnection.get() == null) {
221                // get the connection factories
222                if (localTopicConnectionFactory == null) {
223                    if (embeddedConnectionFactory == null) {
224                        // look it up from JNDI
225                        if (localConnectionFactoryName != null) {
226                            localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
227                                .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
228                            if (localUsername != null) {
229                                newConnection = localTopicConnectionFactory
230                                    .createTopicConnection(localUsername, localPassword);
231                            } else {
232                                newConnection = localTopicConnectionFactory.createTopicConnection();
233                            }
234                        } else {
235                            throw new JMSException("Cannot create localConnection - no information");
236                        }
237                    } else {
238                        newConnection = embeddedConnectionFactory.createTopicConnection();
239                    }
240                } else {
241                    if (localUsername != null) {
242                        newConnection = localTopicConnectionFactory.
243                                createTopicConnection(localUsername, localPassword);
244                    } else {
245                        newConnection = localTopicConnectionFactory.createTopicConnection();
246                    }
247                }
248    
249            } else {
250                // Clear if for now in case something goes wrong during the init.
251                newConnection = (TopicConnection) localConnection.getAndSet(null);
252            }
253    
254            if (localClientId != null && localClientId.length() > 0) {
255                newConnection.setClientID(getLocalClientId());
256            }
257            newConnection.start();
258    
259            inboundMessageConvertor.setConnection(newConnection);
260    
261            // Configure the bridges with the new Local connection.
262            initializeInboundDestinationBridgesLocalSide(newConnection);
263            initializeOutboundDestinationBridgesLocalSide(newConnection);
264    
265            // Register for any async error notifications now so we can reset in the
266            // case where there's not a lot of activity and a connection drops.
267            newConnection.setExceptionListener(new ExceptionListener() {
268                @Override
269                public void onException(JMSException exception) {
270                    handleConnectionFailure(newConnection);
271                }
272            });
273    
274            // At this point all looks good, so this our current connection now.
275            localConnection.set(newConnection);
276        }
277    
278        protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
279            if (inboundTopicBridges != null) {
280                TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
281    
282                for (InboundTopicBridge bridge : inboundTopicBridges) {
283                    String TopicName = bridge.getInboundTopicName();
284                    Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
285                    bridge.setConsumer(null);
286                    bridge.setConsumerTopic(foreignTopic);
287                    bridge.setConsumerConnection(connection);
288                    bridge.setJmsConnector(this);
289                    addInboundBridge(bridge);
290                }
291                outboundSession.close();
292            }
293        }
294    
295        protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
296            if (inboundTopicBridges != null) {
297                TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
298    
299                for (InboundTopicBridge bridge : inboundTopicBridges) {
300                    String localTopicName = bridge.getLocalTopicName();
301                    Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
302                    bridge.setProducerTopic(activemqTopic);
303                    bridge.setProducerConnection(connection);
304                    if (bridge.getJmsMessageConvertor() == null) {
305                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
306                    }
307                    bridge.setJmsConnector(this);
308                    addInboundBridge(bridge);
309                }
310                localSession.close();
311            }
312        }
313    
314        protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
315            if (outboundTopicBridges != null) {
316                TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
317    
318                for (OutboundTopicBridge bridge : outboundTopicBridges) {
319                    String topicName = bridge.getOutboundTopicName();
320                    Topic foreignTopic = createForeignTopic(outboundSession, topicName);
321                    bridge.setProducerTopic(foreignTopic);
322                    bridge.setProducerConnection(connection);
323                    if (bridge.getJmsMessageConvertor() == null) {
324                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
325                    }
326                    bridge.setJmsConnector(this);
327                    addOutboundBridge(bridge);
328                }
329                outboundSession.close();
330            }
331        }
332    
333        protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
334            if (outboundTopicBridges != null) {
335                TopicSession localSession =
336                        connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
337    
338                for (OutboundTopicBridge bridge : outboundTopicBridges) {
339                    String localTopicName = bridge.getLocalTopicName();
340                    Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
341                    bridge.setConsumer(null);
342                    bridge.setConsumerTopic(activemqTopic);
343                    bridge.setConsumerConnection(connection);
344                    bridge.setJmsConnector(this);
345                    addOutboundBridge(bridge);
346                }
347                localSession.close();
348            }
349        }
350    
351        protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
352                                                  Connection replyToConsumerConnection) {
353            Topic replyToProducerTopic = (Topic)destination;
354            boolean isInbound = replyToProducerConnection.equals(localConnection.get());
355    
356            if (isInbound) {
357                InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
358                if (bridge == null) {
359                    bridge = new InboundTopicBridge() {
360                        protected Destination processReplyToDestination(Destination destination) {
361                            return null;
362                        }
363                    };
364                    try {
365                        TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
366                            .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
367                        Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
368                        replyToConsumerSession.close();
369                        bridge.setConsumerTopic(replyToConsumerTopic);
370                        bridge.setProducerTopic(replyToProducerTopic);
371                        bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
372                        bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
373                        bridge.setDoHandleReplyTo(false);
374                        if (bridge.getJmsMessageConvertor() == null) {
375                            bridge.setJmsMessageConvertor(getInboundMessageConvertor());
376                        }
377                        bridge.setJmsConnector(this);
378                        bridge.start();
379                        LOG.info("Created replyTo bridge for " + replyToProducerTopic);
380                    } catch (Exception e) {
381                        LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
382                        return null;
383                    }
384                    replyToBridges.put(replyToProducerTopic, bridge);
385                }
386                return bridge.getConsumerTopic();
387            } else {
388                OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
389                if (bridge == null) {
390                    bridge = new OutboundTopicBridge() {
391                        protected Destination processReplyToDestination(Destination destination) {
392                            return null;
393                        }
394                    };
395                    try {
396                        TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
397                            .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
398                        Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
399                        replyToConsumerSession.close();
400                        bridge.setConsumerTopic(replyToConsumerTopic);
401                        bridge.setProducerTopic(replyToProducerTopic);
402                        bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
403                        bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
404                        bridge.setDoHandleReplyTo(false);
405                        if (bridge.getJmsMessageConvertor() == null) {
406                            bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
407                        }
408                        bridge.setJmsConnector(this);
409                        bridge.start();
410                        LOG.info("Created replyTo bridge for " + replyToProducerTopic);
411                    } catch (Exception e) {
412                        LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
413                        return null;
414                    }
415                    replyToBridges.put(replyToProducerTopic, bridge);
416                }
417                return bridge.getConsumerTopic();
418            }
419        }
420    
421        protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
422            return session.createTopic(topicName);
423        }
424    
425        protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
426            Topic result = null;
427    
428            if (preferJndiDestinationLookup) {
429                try {
430                    // look-up the Queue
431                    result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
432                } catch (NamingException e) {
433                    try {
434                        result = session.createTopic(topicName);
435                    } catch (JMSException e1) {
436                        String errStr = "Failed to look-up or create Topic for name: " + topicName;
437                        LOG.error(errStr, e);
438                        JMSException jmsEx = new JMSException(errStr);
439                        jmsEx.setLinkedException(e1);
440                        throw jmsEx;
441                    }
442                }
443            } else {
444                try {
445                    result = session.createTopic(topicName);
446                } catch (JMSException e) {
447                    // look-up the Topic
448                    try {
449                        result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
450                    } catch (NamingException e1) {
451                        String errStr = "Failed to look-up Topic for name: " + topicName;
452                        LOG.error(errStr, e);
453                        JMSException jmsEx = new JMSException(errStr);
454                        jmsEx.setLinkedException(e1);
455                        throw jmsEx;
456                    }
457                }
458            }
459            return result;
460        }
461    
462    }