001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import javax.jms.Connection;
020    import javax.jms.Destination;
021    import javax.jms.ExceptionListener;
022    import javax.jms.JMSException;
023    import javax.jms.Session;
024    import javax.jms.Topic;
025    import javax.jms.TopicConnection;
026    import javax.jms.TopicConnectionFactory;
027    import javax.jms.TopicSession;
028    import javax.naming.NamingException;
029    
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * A Bridge to other JMS Topic providers
035     *
036     * @org.apache.xbean.XBean
037     */
038    public class JmsTopicConnector extends JmsConnector {
039        private static final Logger LOG = LoggerFactory.getLogger(JmsTopicConnector.class);
040        private String outboundTopicConnectionFactoryName;
041        private String localConnectionFactoryName;
042        private TopicConnectionFactory outboundTopicConnectionFactory;
043        private TopicConnectionFactory localTopicConnectionFactory;
044        private InboundTopicBridge[] inboundTopicBridges;
045        private OutboundTopicBridge[] outboundTopicBridges;
046    
047        /**
048         * @return Returns the inboundTopicBridges.
049         */
050        public InboundTopicBridge[] getInboundTopicBridges() {
051            return inboundTopicBridges;
052        }
053    
054        /**
055         * @param inboundTopicBridges The inboundTopicBridges to set.
056         */
057        public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
058            this.inboundTopicBridges = inboundTopicBridges;
059        }
060    
061        /**
062         * @return Returns the outboundTopicBridges.
063         */
064        public OutboundTopicBridge[] getOutboundTopicBridges() {
065            return outboundTopicBridges;
066        }
067    
068        /**
069         * @param outboundTopicBridges The outboundTopicBridges to set.
070         */
071        public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
072            this.outboundTopicBridges = outboundTopicBridges;
073        }
074    
075        /**
076         * @return Returns the localTopicConnectionFactory.
077         */
078        public TopicConnectionFactory getLocalTopicConnectionFactory() {
079            return localTopicConnectionFactory;
080        }
081    
082        /**
083         * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
084         */
085        public void setLocalTopicConnectionFactory(TopicConnectionFactory localConnectionFactory) {
086            this.localTopicConnectionFactory = localConnectionFactory;
087        }
088    
089        /**
090         * @return Returns the outboundTopicConnectionFactory.
091         */
092        public TopicConnectionFactory getOutboundTopicConnectionFactory() {
093            return outboundTopicConnectionFactory;
094        }
095    
096        /**
097         * @return Returns the outboundTopicConnectionFactoryName.
098         */
099        public String getOutboundTopicConnectionFactoryName() {
100            return outboundTopicConnectionFactoryName;
101        }
102    
103        /**
104         * @param outboundTopicConnectionFactoryName The outboundTopicConnectionFactoryName to set.
105         */
106        public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
107            this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
108        }
109    
110        /**
111         * @return Returns the localConnectionFactoryName.
112         */
113        public String getLocalConnectionFactoryName() {
114            return localConnectionFactoryName;
115        }
116    
117        /**
118         * @param localConnectionFactoryName The localConnectionFactoryName to set.
119         */
120        public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
121            this.localConnectionFactoryName = localConnectionFactoryName;
122        }
123    
124        /**
125         * @return Returns the localTopicConnection.
126         */
127        public TopicConnection getLocalTopicConnection() {
128            return (TopicConnection) localConnection.get();
129        }
130    
131        /**
132         * @param localTopicConnection The localTopicConnection to set.
133         */
134        public void setLocalTopicConnection(TopicConnection localTopicConnection) {
135            this.localConnection.set(localTopicConnection);
136        }
137    
138        /**
139         * @return Returns the outboundTopicConnection.
140         */
141        public TopicConnection getOutboundTopicConnection() {
142            return (TopicConnection) foreignConnection.get();
143        }
144    
145        /**
146         * @param outboundTopicConnection The outboundTopicConnection to set.
147         */
148        public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
149            this.foreignConnection.set(foreignTopicConnection);
150        }
151    
152        /**
153         * @param outboundTopicConnectionFactory The outboundTopicConnectionFactory to set.
154         */
155        public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
156            this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
157        }
158    
159        @Override
160        protected void initializeForeignConnection() throws NamingException, JMSException {
161    
162            final TopicConnection newConnection;
163    
164            if (foreignConnection.get() == null) {
165                // get the connection factories
166                if (outboundTopicConnectionFactory == null) {
167                    // look it up from JNDI
168                    if (outboundTopicConnectionFactoryName != null) {
169                        outboundTopicConnectionFactory = (TopicConnectionFactory)jndiOutboundTemplate
170                            .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
171                        if (outboundUsername != null) {
172                            newConnection = outboundTopicConnectionFactory
173                                .createTopicConnection(outboundUsername, outboundPassword);
174                        } else {
175                            newConnection = outboundTopicConnectionFactory.createTopicConnection();
176                        }
177                    } else {
178                        throw new JMSException("Cannot create foreignConnection - no information");
179                    }
180                } else {
181                    if (outboundUsername != null) {
182                        newConnection = outboundTopicConnectionFactory
183                            .createTopicConnection(outboundUsername, outboundPassword);
184                    } else {
185                        newConnection = outboundTopicConnectionFactory.createTopicConnection();
186                    }
187                }
188            } else {
189                // Clear if for now in case something goes wrong during the init.
190                newConnection = (TopicConnection) foreignConnection.getAndSet(null);
191            }
192    
193            if (outboundClientId != null && outboundClientId.length() > 0) {
194                newConnection.setClientID(getOutboundClientId());
195            }
196            newConnection.start();
197    
198            outboundMessageConvertor.setConnection(newConnection);
199    
200            // Configure the bridges with the new Outbound connection.
201            initializeInboundDestinationBridgesOutboundSide(newConnection);
202            initializeOutboundDestinationBridgesOutboundSide(newConnection);
203    
204            // Register for any async error notifications now so we can reset in the
205            // case where there's not a lot of activity and a connection drops.
206            newConnection.setExceptionListener(new ExceptionListener() {
207                @Override
208                public void onException(JMSException exception) {
209                    handleConnectionFailure(newConnection);
210                }
211            });
212    
213            // At this point all looks good, so this our current connection now.
214            foreignConnection.set(newConnection);
215        }
216    
217        @Override
218        protected void initializeLocalConnection() throws NamingException, JMSException {
219    
220            final TopicConnection newConnection;
221    
222            if (localConnection.get() == null) {
223                // get the connection factories
224                if (localTopicConnectionFactory == null) {
225                    if (embeddedConnectionFactory == null) {
226                        // look it up from JNDI
227                        if (localConnectionFactoryName != null) {
228                            localTopicConnectionFactory = (TopicConnectionFactory)jndiLocalTemplate
229                                .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
230                            if (localUsername != null) {
231                                newConnection = localTopicConnectionFactory
232                                    .createTopicConnection(localUsername, localPassword);
233                            } else {
234                                newConnection = localTopicConnectionFactory.createTopicConnection();
235                            }
236                        } else {
237                            throw new JMSException("Cannot create localConnection - no information");
238                        }
239                    } else {
240                        newConnection = embeddedConnectionFactory.createTopicConnection();
241                    }
242                } else {
243                    if (localUsername != null) {
244                        newConnection = localTopicConnectionFactory.
245                                createTopicConnection(localUsername, localPassword);
246                    } else {
247                        newConnection = localTopicConnectionFactory.createTopicConnection();
248                    }
249                }
250    
251            } else {
252                // Clear if for now in case something goes wrong during the init.
253                newConnection = (TopicConnection) localConnection.getAndSet(null);
254            }
255    
256            if (localClientId != null && localClientId.length() > 0) {
257                newConnection.setClientID(getLocalClientId());
258            }
259            newConnection.start();
260    
261            inboundMessageConvertor.setConnection(newConnection);
262    
263            // Configure the bridges with the new Local connection.
264            initializeInboundDestinationBridgesLocalSide(newConnection);
265            initializeOutboundDestinationBridgesLocalSide(newConnection);
266    
267            // Register for any async error notifications now so we can reset in the
268            // case where there's not a lot of activity and a connection drops.
269            newConnection.setExceptionListener(new ExceptionListener() {
270                @Override
271                public void onException(JMSException exception) {
272                    handleConnectionFailure(newConnection);
273                }
274            });
275    
276            // At this point all looks good, so this our current connection now.
277            localConnection.set(newConnection);
278        }
279    
280        protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
281            if (inboundTopicBridges != null) {
282                TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
283    
284                for (InboundTopicBridge bridge : inboundTopicBridges) {
285                    String TopicName = bridge.getInboundTopicName();
286                    Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
287                    bridge.setConsumer(null);
288                    bridge.setConsumerTopic(foreignTopic);
289                    bridge.setConsumerConnection(connection);
290                    bridge.setJmsConnector(this);
291                    addInboundBridge(bridge);
292                }
293                outboundSession.close();
294            }
295        }
296    
297        protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
298            if (inboundTopicBridges != null) {
299                TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
300    
301                for (InboundTopicBridge bridge : inboundTopicBridges) {
302                    String localTopicName = bridge.getLocalTopicName();
303                    Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
304                    bridge.setProducerTopic(activemqTopic);
305                    bridge.setProducerConnection(connection);
306                    if (bridge.getJmsMessageConvertor() == null) {
307                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
308                    }
309                    bridge.setJmsConnector(this);
310                    addInboundBridge(bridge);
311                }
312                localSession.close();
313            }
314        }
315    
316        protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
317            if (outboundTopicBridges != null) {
318                TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
319    
320                for (OutboundTopicBridge bridge : outboundTopicBridges) {
321                    String topicName = bridge.getOutboundTopicName();
322                    Topic foreignTopic = createForeignTopic(outboundSession, topicName);
323                    bridge.setProducerTopic(foreignTopic);
324                    bridge.setProducerConnection(connection);
325                    if (bridge.getJmsMessageConvertor() == null) {
326                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
327                    }
328                    bridge.setJmsConnector(this);
329                    addOutboundBridge(bridge);
330                }
331                outboundSession.close();
332            }
333        }
334    
335        protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
336            if (outboundTopicBridges != null) {
337                TopicSession localSession =
338                        connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
339    
340                for (OutboundTopicBridge bridge : outboundTopicBridges) {
341                    String localTopicName = bridge.getLocalTopicName();
342                    Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
343                    bridge.setConsumer(null);
344                    bridge.setConsumerTopic(activemqTopic);
345                    bridge.setConsumerConnection(connection);
346                    bridge.setJmsConnector(this);
347                    addOutboundBridge(bridge);
348                }
349                localSession.close();
350            }
351        }
352    
353        protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
354                                                  Connection replyToConsumerConnection) {
355            Topic replyToProducerTopic = (Topic)destination;
356            boolean isInbound = replyToProducerConnection.equals(localConnection.get());
357    
358            if (isInbound) {
359                InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
360                if (bridge == null) {
361                    bridge = new InboundTopicBridge() {
362                        protected Destination processReplyToDestination(Destination destination) {
363                            return null;
364                        }
365                    };
366                    try {
367                        TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
368                            .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
369                        Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
370                        replyToConsumerSession.close();
371                        bridge.setConsumerTopic(replyToConsumerTopic);
372                        bridge.setProducerTopic(replyToProducerTopic);
373                        bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
374                        bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
375                        bridge.setDoHandleReplyTo(false);
376                        if (bridge.getJmsMessageConvertor() == null) {
377                            bridge.setJmsMessageConvertor(getInboundMessageConvertor());
378                        }
379                        bridge.setJmsConnector(this);
380                        bridge.start();
381                        LOG.info("Created replyTo bridge for " + replyToProducerTopic);
382                    } catch (Exception e) {
383                        LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
384                        return null;
385                    }
386                    replyToBridges.put(replyToProducerTopic, bridge);
387                }
388                return bridge.getConsumerTopic();
389            } else {
390                OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
391                if (bridge == null) {
392                    bridge = new OutboundTopicBridge() {
393                        protected Destination processReplyToDestination(Destination destination) {
394                            return null;
395                        }
396                    };
397                    try {
398                        TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
399                            .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
400                        Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
401                        replyToConsumerSession.close();
402                        bridge.setConsumerTopic(replyToConsumerTopic);
403                        bridge.setProducerTopic(replyToProducerTopic);
404                        bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
405                        bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
406                        bridge.setDoHandleReplyTo(false);
407                        if (bridge.getJmsMessageConvertor() == null) {
408                            bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
409                        }
410                        bridge.setJmsConnector(this);
411                        bridge.start();
412                        LOG.info("Created replyTo bridge for " + replyToProducerTopic);
413                    } catch (Exception e) {
414                        LOG.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
415                        return null;
416                    }
417                    replyToBridges.put(replyToProducerTopic, bridge);
418                }
419                return bridge.getConsumerTopic();
420            }
421        }
422    
423        protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
424            return session.createTopic(topicName);
425        }
426    
427        protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
428            Topic result = null;
429    
430            if (preferJndiDestinationLookup) {
431                try {
432                    // look-up the Queue
433                    result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
434                } catch (NamingException e) {
435                    try {
436                        result = session.createTopic(topicName);
437                    } catch (JMSException e1) {
438                        String errStr = "Failed to look-up or create Topic for name: " + topicName;
439                        LOG.error(errStr, e);
440                        JMSException jmsEx = new JMSException(errStr);
441                        jmsEx.setLinkedException(e1);
442                        throw jmsEx;
443                    }
444                }
445            } else {
446                try {
447                    result = session.createTopic(topicName);
448                } catch (JMSException e) {
449                    // look-up the Topic
450                    try {
451                        result = (Topic)jndiOutboundTemplate.lookup(topicName, Topic.class);
452                    } catch (NamingException e1) {
453                        String errStr = "Failed to look-up Topic for name: " + topicName;
454                        LOG.error(errStr, e);
455                        JMSException jmsEx = new JMSException(errStr);
456                        jmsEx.setLinkedException(e1);
457                        throw jmsEx;
458                    }
459                }
460            }
461            return result;
462        }
463    
464    }