001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import javax.jms.Connection;
020    import javax.jms.Destination;
021    import javax.jms.ExceptionListener;
022    import javax.jms.JMSException;
023    import javax.jms.Queue;
024    import javax.jms.QueueConnection;
025    import javax.jms.QueueConnectionFactory;
026    import javax.jms.QueueSession;
027    import javax.jms.Session;
028    import javax.naming.NamingException;
029    
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * A Bridge to other JMS Queue providers
035     *
036     * @org.apache.xbean.XBean
037     */
038    public class JmsQueueConnector extends JmsConnector {
039        private static final Logger LOG = LoggerFactory.getLogger(JmsQueueConnector.class);
040        private String outboundQueueConnectionFactoryName;
041        private String localConnectionFactoryName;
042        private QueueConnectionFactory outboundQueueConnectionFactory;
043        private QueueConnectionFactory localQueueConnectionFactory;
044        private InboundQueueBridge[] inboundQueueBridges;
045        private OutboundQueueBridge[] outboundQueueBridges;
046    
047        /**
048         * @return Returns the inboundQueueBridges.
049         */
050        public InboundQueueBridge[] getInboundQueueBridges() {
051            return inboundQueueBridges;
052        }
053    
054        /**
055         * @param inboundQueueBridges The inboundQueueBridges to set.
056         */
057        public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
058            this.inboundQueueBridges = inboundQueueBridges;
059        }
060    
061        /**
062         * @return Returns the outboundQueueBridges.
063         */
064        public OutboundQueueBridge[] getOutboundQueueBridges() {
065            return outboundQueueBridges;
066        }
067    
068        /**
069         * @param outboundQueueBridges The outboundQueueBridges to set.
070         */
071        public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
072            this.outboundQueueBridges = outboundQueueBridges;
073        }
074    
075        /**
076         * @return Returns the localQueueConnectionFactory.
077         */
078        public QueueConnectionFactory getLocalQueueConnectionFactory() {
079            return localQueueConnectionFactory;
080        }
081    
082        /**
083         * @param localQueueConnectionFactory The localQueueConnectionFactory to
084         *                set.
085         */
086        public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
087            this.localQueueConnectionFactory = localConnectionFactory;
088        }
089    
090        /**
091         * @return Returns the outboundQueueConnectionFactory.
092         */
093        public QueueConnectionFactory getOutboundQueueConnectionFactory() {
094            return outboundQueueConnectionFactory;
095        }
096    
097        /**
098         * @return Returns the outboundQueueConnectionFactoryName.
099         */
100        public String getOutboundQueueConnectionFactoryName() {
101            return outboundQueueConnectionFactoryName;
102        }
103    
104        /**
105         * @param outboundQueueConnectionFactoryName The
106         *                outboundQueueConnectionFactoryName to set.
107         */
108        public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
109            this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
110        }
111    
112        /**
113         * @return Returns the localConnectionFactoryName.
114         */
115        public String getLocalConnectionFactoryName() {
116            return localConnectionFactoryName;
117        }
118    
119        /**
120         * @param localConnectionFactoryName The localConnectionFactoryName to set.
121         */
122        public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
123            this.localConnectionFactoryName = localConnectionFactoryName;
124        }
125    
126        /**
127         * @return Returns the localQueueConnection.
128         */
129        public QueueConnection getLocalQueueConnection() {
130            return (QueueConnection) localConnection.get();
131        }
132    
133        /**
134         * @param localQueueConnection The localQueueConnection to set.
135         */
136        public void setLocalQueueConnection(QueueConnection localQueueConnection) {
137            this.localConnection.set(localQueueConnection);
138        }
139    
140        /**
141         * @return Returns the outboundQueueConnection.
142         */
143        public QueueConnection getOutboundQueueConnection() {
144            return (QueueConnection) foreignConnection.get();
145        }
146    
147        /**
148         * @param outboundQueueConnection The outboundQueueConnection to set.
149         */
150        public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
151            this.foreignConnection.set(foreignQueueConnection);
152        }
153    
154        /**
155         * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
156         *                to set.
157         */
158        public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
159            this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
160        }
161    
162        @Override
163        protected void initializeForeignConnection() throws NamingException, JMSException {
164    
165            final QueueConnection newConnection;
166    
167            if (foreignConnection.get() == null) {
168                // get the connection factories
169                if (outboundQueueConnectionFactory == null) {
170                    // look it up from JNDI
171                    if (outboundQueueConnectionFactoryName != null) {
172                        outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
173                            .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
174                        if (outboundUsername != null) {
175                            newConnection = outboundQueueConnectionFactory
176                                .createQueueConnection(outboundUsername, outboundPassword);
177                        } else {
178                            newConnection = outboundQueueConnectionFactory.createQueueConnection();
179                        }
180                    } else {
181                        throw new JMSException("Cannot create foreignConnection - no information");
182                    }
183                } else {
184                    if (outboundUsername != null) {
185                        newConnection = outboundQueueConnectionFactory
186                            .createQueueConnection(outboundUsername, outboundPassword);
187                    } else {
188                        newConnection = outboundQueueConnectionFactory.createQueueConnection();
189                    }
190                }
191            } else {
192                // Clear if for now in case something goes wrong during the init.
193                newConnection = (QueueConnection) foreignConnection.getAndSet(null);
194            }
195    
196            if (outboundClientId != null && outboundClientId.length() > 0) {
197                newConnection.setClientID(getOutboundClientId());
198            }
199            newConnection.start();
200    
201            outboundMessageConvertor.setConnection(newConnection);
202    
203            // Configure the bridges with the new Outbound connection.
204            initializeInboundDestinationBridgesOutboundSide(newConnection);
205            initializeOutboundDestinationBridgesOutboundSide(newConnection);
206    
207            // Register for any async error notifications now so we can reset in the
208            // case where there's not a lot of activity and a connection drops.
209            newConnection.setExceptionListener(new ExceptionListener() {
210                @Override
211                public void onException(JMSException exception) {
212                    handleConnectionFailure(newConnection);
213                }
214            });
215    
216            // At this point all looks good, so this our current connection now.
217            foreignConnection.set(newConnection);
218        }
219    
220        @Override
221        protected void initializeLocalConnection() throws NamingException, JMSException {
222    
223            final QueueConnection newConnection;
224    
225            if (localConnection.get() == null) {
226                // get the connection factories
227                if (localQueueConnectionFactory == null) {
228                    if (embeddedConnectionFactory == null) {
229                        // look it up from JNDI
230                        if (localConnectionFactoryName != null) {
231                            localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
232                                .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
233                            if (localUsername != null) {
234                                newConnection = localQueueConnectionFactory
235                                    .createQueueConnection(localUsername, localPassword);
236                            } else {
237                                newConnection = localQueueConnectionFactory.createQueueConnection();
238                            }
239                        } else {
240                            throw new JMSException("Cannot create localConnection - no information");
241                        }
242                    } else {
243                        newConnection = embeddedConnectionFactory.createQueueConnection();
244                    }
245                } else {
246                    if (localUsername != null) {
247                        newConnection = localQueueConnectionFactory.
248                                createQueueConnection(localUsername, localPassword);
249                    } else {
250                        newConnection = localQueueConnectionFactory.createQueueConnection();
251                    }
252                }
253    
254            } else {
255                // Clear if for now in case something goes wrong during the init.
256                newConnection = (QueueConnection) localConnection.getAndSet(null);
257            }
258    
259            if (localClientId != null && localClientId.length() > 0) {
260                newConnection.setClientID(getLocalClientId());
261            }
262            newConnection.start();
263    
264            inboundMessageConvertor.setConnection(newConnection);
265    
266            // Configure the bridges with the new Local connection.
267            initializeInboundDestinationBridgesLocalSide(newConnection);
268            initializeOutboundDestinationBridgesLocalSide(newConnection);
269    
270            // Register for any async error notifications now so we can reset in the
271            // case where there's not a lot of activity and a connection drops.
272            newConnection.setExceptionListener(new ExceptionListener() {
273                @Override
274                public void onException(JMSException exception) {
275                    handleConnectionFailure(newConnection);
276                }
277            });
278    
279            // At this point all looks good, so this our current connection now.
280            localConnection.set(newConnection);
281        }
282    
283        protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
284            if (inboundQueueBridges != null) {
285                QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
286    
287                for (InboundQueueBridge bridge : inboundQueueBridges) {
288                    String queueName = bridge.getInboundQueueName();
289                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
290                    bridge.setConsumer(null);
291                    bridge.setConsumerQueue(foreignQueue);
292                    bridge.setConsumerConnection(connection);
293                    bridge.setJmsConnector(this);
294                    addInboundBridge(bridge);
295                }
296                outboundSession.close();
297            }
298        }
299    
300        protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
301            if (inboundQueueBridges != null) {
302                QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
303    
304                for (InboundQueueBridge bridge : inboundQueueBridges) {
305                    String localQueueName = bridge.getLocalQueueName();
306                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
307                    bridge.setProducerQueue(activemqQueue);
308                    bridge.setProducerConnection(connection);
309                    if (bridge.getJmsMessageConvertor() == null) {
310                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
311                    }
312                    bridge.setJmsConnector(this);
313                    addInboundBridge(bridge);
314                }
315                localSession.close();
316            }
317        }
318    
319        protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
320            if (outboundQueueBridges != null) {
321                QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
322    
323                for (OutboundQueueBridge bridge : outboundQueueBridges) {
324                    String queueName = bridge.getOutboundQueueName();
325                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
326                    bridge.setProducerQueue(foreignQueue);
327                    bridge.setProducerConnection(connection);
328                    if (bridge.getJmsMessageConvertor() == null) {
329                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
330                    }
331                    bridge.setJmsConnector(this);
332                    addOutboundBridge(bridge);
333                }
334                outboundSession.close();
335            }
336        }
337    
338        protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
339            if (outboundQueueBridges != null) {
340                QueueSession localSession =
341                        connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
342    
343                for (OutboundQueueBridge bridge : outboundQueueBridges) {
344                    String localQueueName = bridge.getLocalQueueName();
345                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
346                    bridge.setConsumer(null);
347                    bridge.setConsumerQueue(activemqQueue);
348                    bridge.setConsumerConnection(connection);
349                    bridge.setJmsConnector(this);
350                    addOutboundBridge(bridge);
351                }
352                localSession.close();
353            }
354        }
355    
356        protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
357                                                  Connection replyToConsumerConnection) {
358            Queue replyToProducerQueue = (Queue)destination;
359            boolean isInbound = replyToProducerConnection.equals(localConnection.get());
360    
361            if (isInbound) {
362                InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
363                if (bridge == null) {
364                    bridge = new InboundQueueBridge() {
365                        protected Destination processReplyToDestination(Destination destination) {
366                            return null;
367                        }
368                    };
369                    try {
370                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
371                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
372                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
373                        replyToConsumerSession.close();
374                        bridge.setConsumerQueue(replyToConsumerQueue);
375                        bridge.setProducerQueue(replyToProducerQueue);
376                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
377                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
378                        bridge.setDoHandleReplyTo(false);
379                        if (bridge.getJmsMessageConvertor() == null) {
380                            bridge.setJmsMessageConvertor(getInboundMessageConvertor());
381                        }
382                        bridge.setJmsConnector(this);
383                        bridge.start();
384                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
385                    } catch (Exception e) {
386                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
387                        return null;
388                    }
389                    replyToBridges.put(replyToProducerQueue, bridge);
390                }
391                return bridge.getConsumerQueue();
392            } else {
393                OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
394                if (bridge == null) {
395                    bridge = new OutboundQueueBridge() {
396                        protected Destination processReplyToDestination(Destination destination) {
397                            return null;
398                        }
399                    };
400                    try {
401                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
402                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
403                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
404                        replyToConsumerSession.close();
405                        bridge.setConsumerQueue(replyToConsumerQueue);
406                        bridge.setProducerQueue(replyToProducerQueue);
407                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
408                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
409                        bridge.setDoHandleReplyTo(false);
410                        if (bridge.getJmsMessageConvertor() == null) {
411                            bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
412                        }
413                        bridge.setJmsConnector(this);
414                        bridge.start();
415                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
416                    } catch (Exception e) {
417                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
418                        return null;
419                    }
420                    replyToBridges.put(replyToProducerQueue, bridge);
421                }
422                return bridge.getConsumerQueue();
423            }
424        }
425    
426        protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
427            return session.createQueue(queueName);
428        }
429    
430        protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
431            Queue result = null;
432    
433            if (preferJndiDestinationLookup) {
434                try {
435                    // look-up the Queue
436                    result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
437                } catch (NamingException e) {
438                    try {
439                        result = session.createQueue(queueName);
440                    } catch (JMSException e1) {
441                        String errStr = "Failed to look-up or create Queue for name: " + queueName;
442                        LOG.error(errStr, e);
443                        JMSException jmsEx = new JMSException(errStr);
444                        jmsEx.setLinkedException(e1);
445                        throw jmsEx;
446                    }
447                }
448            } else {
449                try {
450                    result = session.createQueue(queueName);
451                } catch (JMSException e) {
452                    // look-up the Queue
453                    try {
454                        result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
455                    } catch (NamingException e1) {
456                        String errStr = "Failed to look-up Queue for name: " + queueName;
457                        LOG.error(errStr, e);
458                        JMSException jmsEx = new JMSException(errStr);
459                        jmsEx.setLinkedException(e1);
460                        throw jmsEx;
461                    }
462                }
463            }
464    
465            return result;
466        }
467    }