001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import javax.jms.Connection;
020    import javax.jms.Destination;
021    import javax.jms.ExceptionListener;
022    import javax.jms.JMSException;
023    import javax.jms.Queue;
024    import javax.jms.QueueConnection;
025    import javax.jms.QueueConnectionFactory;
026    import javax.jms.QueueSession;
027    import javax.jms.Session;
028    import javax.naming.NamingException;
029    
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     */
035    public class SimpleJmsQueueConnector extends JmsConnector {
036        private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsQueueConnector.class);
037        private String outboundQueueConnectionFactoryName;
038        private String localConnectionFactoryName;
039        private QueueConnectionFactory outboundQueueConnectionFactory;
040        private QueueConnectionFactory localQueueConnectionFactory;
041        private InboundQueueBridge[] inboundQueueBridges;
042        private OutboundQueueBridge[] outboundQueueBridges;
043    
044        /**
045         * @return Returns the inboundQueueBridges.
046         */
047        public InboundQueueBridge[] getInboundQueueBridges() {
048            return inboundQueueBridges;
049        }
050    
051        /**
052         * @param inboundQueueBridges The inboundQueueBridges to set.
053         */
054        public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) {
055            this.inboundQueueBridges = inboundQueueBridges;
056        }
057    
058        /**
059         * @return Returns the outboundQueueBridges.
060         */
061        public OutboundQueueBridge[] getOutboundQueueBridges() {
062            return outboundQueueBridges;
063        }
064    
065        /**
066         * @param outboundQueueBridges The outboundQueueBridges to set.
067         */
068        public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) {
069            this.outboundQueueBridges = outboundQueueBridges;
070        }
071    
072        /**
073         * @return Returns the localQueueConnectionFactory.
074         */
075        public QueueConnectionFactory getLocalQueueConnectionFactory() {
076            return localQueueConnectionFactory;
077        }
078    
079        /**
080         * @param localQueueConnectionFactory The localQueueConnectionFactory to
081         *                set.
082         */
083        public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) {
084            this.localQueueConnectionFactory = localConnectionFactory;
085        }
086    
087        /**
088         * @return Returns the outboundQueueConnectionFactory.
089         */
090        public QueueConnectionFactory getOutboundQueueConnectionFactory() {
091            return outboundQueueConnectionFactory;
092        }
093    
094        /**
095         * @return Returns the outboundQueueConnectionFactoryName.
096         */
097        public String getOutboundQueueConnectionFactoryName() {
098            return outboundQueueConnectionFactoryName;
099        }
100    
101        /**
102         * @param outboundQueueConnectionFactoryName The
103         *                outboundQueueConnectionFactoryName to set.
104         */
105        public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) {
106            this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName;
107        }
108    
109        /**
110         * @return Returns the localConnectionFactoryName.
111         */
112        public String getLocalConnectionFactoryName() {
113            return localConnectionFactoryName;
114        }
115    
116        /**
117         * @param localConnectionFactoryName The localConnectionFactoryName to set.
118         */
119        public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
120            this.localConnectionFactoryName = localConnectionFactoryName;
121        }
122    
123        /**
124         * @return Returns the localQueueConnection.
125         */
126        public QueueConnection getLocalQueueConnection() {
127            return (QueueConnection) localConnection.get();
128        }
129    
130        /**
131         * @param localQueueConnection The localQueueConnection to set.
132         */
133        public void setLocalQueueConnection(QueueConnection localQueueConnection) {
134            this.localConnection.set(localQueueConnection);
135        }
136    
137        /**
138         * @return Returns the outboundQueueConnection.
139         */
140        public QueueConnection getOutboundQueueConnection() {
141            return (QueueConnection) foreignConnection.get();
142        }
143    
144        /**
145         * @param outboundQueueConnection The outboundQueueConnection to set.
146         */
147        public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) {
148            this.foreignConnection.set(foreignQueueConnection);
149        }
150    
151        /**
152         * @param outboundQueueConnectionFactory The outboundQueueConnectionFactory
153         *                to set.
154         */
155        public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) {
156            this.outboundQueueConnectionFactory = foreignQueueConnectionFactory;
157        }
158    
159        @Override
160        protected void initializeForeignConnection() throws NamingException, JMSException {
161    
162            final QueueConnection newConnection;
163    
164            if (foreignConnection.get() == null) {
165                // get the connection factories
166                if (outboundQueueConnectionFactory == null) {
167                    // look it up from JNDI
168                    if (outboundQueueConnectionFactoryName != null) {
169                        outboundQueueConnectionFactory = (QueueConnectionFactory)jndiOutboundTemplate
170                            .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class);
171                        if (outboundUsername != null) {
172                            newConnection = outboundQueueConnectionFactory
173                                .createQueueConnection(outboundUsername, outboundPassword);
174                        } else {
175                            newConnection = outboundQueueConnectionFactory.createQueueConnection();
176                        }
177                    } else {
178                        throw new JMSException("Cannot create foreignConnection - no information");
179                    }
180                } else {
181                    if (outboundUsername != null) {
182                        newConnection = outboundQueueConnectionFactory
183                            .createQueueConnection(outboundUsername, outboundPassword);
184                    } else {
185                        newConnection = outboundQueueConnectionFactory.createQueueConnection();
186                    }
187                }
188            } else {
189                // Clear if for now in case something goes wrong during the init.
190                newConnection = (QueueConnection) foreignConnection.getAndSet(null);
191            }
192    
193            if (outboundClientId != null && outboundClientId.length() > 0) {
194                newConnection.setClientID(getOutboundClientId());
195            }
196            newConnection.start();
197    
198            outboundMessageConvertor.setConnection(newConnection);
199    
200            // Configure the bridges with the new Outbound connection.
201            initializeInboundDestinationBridgesOutboundSide(newConnection);
202            initializeOutboundDestinationBridgesOutboundSide(newConnection);
203    
204            // Register for any async error notifications now so we can reset in the
205            // case where there's not a lot of activity and a connection drops.
206            newConnection.setExceptionListener(new ExceptionListener() {
207                @Override
208                public void onException(JMSException exception) {
209                    handleConnectionFailure(newConnection);
210                }
211            });
212    
213            // At this point all looks good, so this our current connection now.
214            foreignConnection.set(newConnection);
215        }
216    
217        @Override
218        protected void initializeLocalConnection() throws NamingException, JMSException {
219    
220            final QueueConnection newConnection;
221    
222            if (localConnection.get() == null) {
223                // get the connection factories
224                if (localQueueConnectionFactory == null) {
225                    if (embeddedConnectionFactory == null) {
226                        // look it up from JNDI
227                        if (localConnectionFactoryName != null) {
228                            localQueueConnectionFactory = (QueueConnectionFactory)jndiLocalTemplate
229                                .lookup(localConnectionFactoryName, QueueConnectionFactory.class);
230                            if (localUsername != null) {
231                                newConnection = localQueueConnectionFactory
232                                    .createQueueConnection(localUsername, localPassword);
233                            } else {
234                                newConnection = localQueueConnectionFactory.createQueueConnection();
235                            }
236                        } else {
237                            throw new JMSException("Cannot create localConnection - no information");
238                        }
239                    } else {
240                        newConnection = embeddedConnectionFactory.createQueueConnection();
241                    }
242                } else {
243                    if (localUsername != null) {
244                        newConnection = localQueueConnectionFactory.
245                                createQueueConnection(localUsername, localPassword);
246                    } else {
247                        newConnection = localQueueConnectionFactory.createQueueConnection();
248                    }
249                }
250    
251            } else {
252                // Clear if for now in case something goes wrong during the init.
253                newConnection = (QueueConnection) localConnection.getAndSet(null);
254            }
255    
256            if (localClientId != null && localClientId.length() > 0) {
257                newConnection.setClientID(getLocalClientId());
258            }
259            newConnection.start();
260    
261            inboundMessageConvertor.setConnection(newConnection);
262    
263            // Configure the bridges with the new Local connection.
264            initializeInboundDestinationBridgesLocalSide(newConnection);
265            initializeOutboundDestinationBridgesLocalSide(newConnection);
266    
267            // Register for any async error notifications now so we can reset in the
268            // case where there's not a lot of activity and a connection drops.
269            newConnection.setExceptionListener(new ExceptionListener() {
270                @Override
271                public void onException(JMSException exception) {
272                    handleConnectionFailure(newConnection);
273                }
274            });
275    
276            // At this point all looks good, so this our current connection now.
277            localConnection.set(newConnection);
278        }
279    
280        protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
281            if (inboundQueueBridges != null) {
282                QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
283    
284                for (InboundQueueBridge bridge : inboundQueueBridges) {
285                    String queueName = bridge.getInboundQueueName();
286                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
287                    bridge.setConsumer(null);
288                    bridge.setConsumerQueue(foreignQueue);
289                    bridge.setConsumerConnection(connection);
290                    bridge.setJmsConnector(this);
291                    addInboundBridge(bridge);
292                }
293                outboundSession.close();
294            }
295        }
296    
297        protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
298            if (inboundQueueBridges != null) {
299                QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
300    
301                for (InboundQueueBridge bridge : inboundQueueBridges) {
302                    String localQueueName = bridge.getLocalQueueName();
303                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
304                    bridge.setProducerQueue(activemqQueue);
305                    bridge.setProducerConnection(connection);
306                    if (bridge.getJmsMessageConvertor() == null) {
307                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
308                    }
309                    bridge.setJmsConnector(this);
310                    addInboundBridge(bridge);
311                }
312                localSession.close();
313            }
314        }
315    
316        protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException {
317            if (outboundQueueBridges != null) {
318                QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
319    
320                for (OutboundQueueBridge bridge : outboundQueueBridges) {
321                    String queueName = bridge.getOutboundQueueName();
322                    Queue foreignQueue = createForeignQueue(outboundSession, queueName);
323                    bridge.setProducerQueue(foreignQueue);
324                    bridge.setProducerConnection(connection);
325                    if (bridge.getJmsMessageConvertor() == null) {
326                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
327                    }
328                    bridge.setJmsConnector(this);
329                    addOutboundBridge(bridge);
330                }
331                outboundSession.close();
332            }
333        }
334    
335        protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException {
336            if (outboundQueueBridges != null) {
337                QueueSession localSession =
338                        connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
339    
340                for (OutboundQueueBridge bridge : outboundQueueBridges) {
341                    String localQueueName = bridge.getLocalQueueName();
342                    Queue activemqQueue = createActiveMQQueue(localSession, localQueueName);
343                    bridge.setConsumer(null);
344                    bridge.setConsumerQueue(activemqQueue);
345                    bridge.setConsumerConnection(connection);
346                    bridge.setJmsConnector(this);
347                    addOutboundBridge(bridge);
348                }
349                localSession.close();
350            }
351        }
352    
353        protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
354                                                  Connection replyToConsumerConnection) {
355            Queue replyToProducerQueue = (Queue)destination;
356            boolean isInbound = replyToProducerConnection.equals(localConnection.get());
357    
358            if (isInbound) {
359                InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue);
360                if (bridge == null) {
361                    bridge = new InboundQueueBridge() {
362                        protected Destination processReplyToDestination(Destination destination) {
363                            return null;
364                        }
365                    };
366                    try {
367                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
368                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
369                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
370                        replyToConsumerSession.close();
371                        bridge.setConsumerQueue(replyToConsumerQueue);
372                        bridge.setProducerQueue(replyToProducerQueue);
373                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
374                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
375                        bridge.setDoHandleReplyTo(false);
376                        if (bridge.getJmsMessageConvertor() == null) {
377                            bridge.setJmsMessageConvertor(getInboundMessageConvertor());
378                        }
379                        bridge.setJmsConnector(this);
380                        bridge.start();
381                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
382                    } catch (Exception e) {
383                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
384                        return null;
385                    }
386                    replyToBridges.put(replyToProducerQueue, bridge);
387                }
388                return bridge.getConsumerQueue();
389            } else {
390                OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue);
391                if (bridge == null) {
392                    bridge = new OutboundQueueBridge() {
393                        protected Destination processReplyToDestination(Destination destination) {
394                            return null;
395                        }
396                    };
397                    try {
398                        QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection)
399                            .createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
400                        Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
401                        replyToConsumerSession.close();
402                        bridge.setConsumerQueue(replyToConsumerQueue);
403                        bridge.setProducerQueue(replyToProducerQueue);
404                        bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
405                        bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
406                        bridge.setDoHandleReplyTo(false);
407                        if (bridge.getJmsMessageConvertor() == null) {
408                            bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
409                        }
410                        bridge.setJmsConnector(this);
411                        bridge.start();
412                        LOG.info("Created replyTo bridge for " + replyToProducerQueue);
413                    } catch (Exception e) {
414                        LOG.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
415                        return null;
416                    }
417                    replyToBridges.put(replyToProducerQueue, bridge);
418                }
419                return bridge.getConsumerQueue();
420            }
421        }
422    
423        protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException {
424            return session.createQueue(queueName);
425        }
426    
427        protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException {
428            Queue result = null;
429    
430            if (preferJndiDestinationLookup) {
431                try {
432                    // look-up the Queue
433                    result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
434                } catch (NamingException e) {
435                    try {
436                        result = session.createQueue(queueName);
437                    } catch (JMSException e1) {
438                        String errStr = "Failed to look-up or create Queue for name: " + queueName;
439                        LOG.error(errStr, e);
440                        JMSException jmsEx = new JMSException(errStr);
441                        jmsEx.setLinkedException(e1);
442                        throw jmsEx;
443                    }
444                }
445            } else {
446                try {
447                    result = session.createQueue(queueName);
448                } catch (JMSException e) {
449                    // look-up the Queue
450                    try {
451                        result = (Queue)jndiOutboundTemplate.lookup(queueName, Queue.class);
452                    } catch (NamingException e1) {
453                        String errStr = "Failed to look-up Queue for name: " + queueName;
454                        LOG.error(errStr, e);
455                        JMSException jmsEx = new JMSException(errStr);
456                        jmsEx.setLinkedException(e1);
457                        throw jmsEx;
458                    }
459                }
460            }
461    
462            return result;
463        }
464    }