001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.ft;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.net.URISyntaxException;
022    import java.util.List;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    
025    import org.apache.activemq.Service;
026    import org.apache.activemq.broker.BrokerService;
027    import org.apache.activemq.broker.BrokerServiceAware;
028    import org.apache.activemq.broker.TransportConnector;
029    import org.apache.activemq.command.BrokerInfo;
030    import org.apache.activemq.command.Command;
031    import org.apache.activemq.command.CommandTypes;
032    import org.apache.activemq.command.ConnectionId;
033    import org.apache.activemq.command.ConnectionInfo;
034    import org.apache.activemq.command.MessageDispatch;
035    import org.apache.activemq.command.ProducerInfo;
036    import org.apache.activemq.command.Response;
037    import org.apache.activemq.command.SessionInfo;
038    import org.apache.activemq.command.ShutdownInfo;
039    import org.apache.activemq.transport.DefaultTransportListener;
040    import org.apache.activemq.transport.Transport;
041    import org.apache.activemq.transport.TransportDisposedIOException;
042    import org.apache.activemq.transport.TransportFactory;
043    import org.apache.activemq.util.IdGenerator;
044    import org.apache.activemq.util.ServiceStopper;
045    import org.apache.activemq.util.ServiceSupport;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * Connects a Slave Broker to a Master when using <a
051     * href="http://activemq.apache.org/masterslave.html">Master Slave</a> for High
052     * Availability of messages.
053     * 
054     * @org.apache.xbean.XBean
055     * 
056     */
057    public class MasterConnector implements Service, BrokerServiceAware {
058    
059        private static final Logger LOG = LoggerFactory.getLogger(MasterConnector.class);
060        private BrokerService broker;
061        private URI remoteURI;
062        private URI localURI;
063        private Transport localBroker;
064        private Transport remoteBroker;
065        private TransportConnector connector;
066        private AtomicBoolean started = new AtomicBoolean(false);
067        private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
068        private final IdGenerator idGenerator = new IdGenerator();
069        private String userName;
070        private String password;
071        private ConnectionInfo connectionInfo;
072        private SessionInfo sessionInfo;
073        private ProducerInfo producerInfo;
074        private final AtomicBoolean masterActive = new AtomicBoolean();
075        private BrokerInfo brokerInfo;
076        private boolean firstConnection=true;
077        private boolean failedToStart;
078    
079        public MasterConnector() {
080        }
081    
082        public MasterConnector(String remoteUri) throws URISyntaxException {
083            remoteURI = new URI(remoteUri);
084        }
085    
086        public void setBrokerService(BrokerService broker) {
087            this.broker = broker;
088            if (localURI == null) {
089                localURI = broker.getVmConnectorURI();
090            }
091            if (connector == null) {
092                List transportConnectors = broker.getTransportConnectors();
093                if (!transportConnectors.isEmpty()) {
094                    connector = (TransportConnector)transportConnectors.get(0);
095                }
096            }
097        }
098    
099        public boolean isSlave() {
100            return masterActive.get();
101        }
102    
103        protected void restartBridge() throws Exception {
104            localBroker.oneway(connectionInfo);
105            remoteBroker.oneway(connectionInfo);
106            localBroker.oneway(sessionInfo);
107            remoteBroker.oneway(sessionInfo);
108            remoteBroker.oneway(producerInfo);
109            remoteBroker.oneway(brokerInfo);
110        }
111        
112        public void start() throws Exception {
113            if (!started.compareAndSet(false, true)) {
114                return;
115            }
116            if (remoteURI == null) {
117                throw new IllegalArgumentException("You must specify a remoteURI");
118            }
119            localBroker = TransportFactory.connect(localURI);
120            remoteBroker = TransportFactory.connect(remoteURI);
121            LOG.info("Starting a slave connection between " + localBroker + " and " + remoteBroker);
122            localBroker.setTransportListener(new DefaultTransportListener() {
123    
124                public void onCommand(Object command) {
125                }
126    
127                public void onException(IOException error) {
128                    if (started.get()) {
129                        serviceLocalException(error);
130                    }
131                }
132            });
133            remoteBroker.setTransportListener(new DefaultTransportListener() {
134    
135                public void onCommand(Object o) {
136                    Command command = (Command)o;
137                    if (started.get()) {
138                        serviceRemoteCommand(command);
139                    }
140                }
141    
142                public void onException(IOException error) {
143                    if (started.get()) {
144                        serviceRemoteException(error);
145                    }
146                }
147                
148                public void transportResumed() {
149                    try{
150                            if(!firstConnection){
151                                    localBroker = TransportFactory.connect(localURI);
152                                    localBroker.setTransportListener(new DefaultTransportListener() {
153            
154                                    public void onCommand(Object command) {
155                                    }
156            
157                                    public void onException(IOException error) {
158                                        if (started.get()) {
159                                            serviceLocalException(error);
160                                        }
161                                    }
162                                });
163                                    localBroker.start();
164                                    restartBridge();
165                                    LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been reestablished.");
166                            }else{
167                                    firstConnection=false;
168                            }
169                    }catch(IOException e){
170                            LOG.error("MasterConnector failed to send BrokerInfo in transportResumed:", e);
171                    }catch(Exception e){
172                            LOG.error("MasterConnector failed to restart localBroker in transportResumed:", e);
173                    }
174                    
175                }
176            });
177            try {
178                localBroker.start();
179                remoteBroker.start();
180                startBridge();
181                masterActive.set(true);
182            } catch (Exception e) {
183                masterActive.set(false);
184                if(!stoppedBeforeStart.get()){
185                    LOG.error("Failed to start network bridge: " + e, e);
186                }else{
187                    LOG.info("Slave stopped before connected to the master.");
188                }
189                setFailedToStart(true);
190            }    
191        }
192    
193        protected void startBridge() throws Exception {
194            connectionInfo = new ConnectionInfo();
195            connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
196            connectionInfo.setClientId(idGenerator.generateId());
197            connectionInfo.setUserName(userName);
198            connectionInfo.setPassword(password);
199            connectionInfo.setBrokerMasterConnector(true);
200            sessionInfo = new SessionInfo(connectionInfo, 1);
201            producerInfo = new ProducerInfo(sessionInfo, 1);
202            producerInfo.setResponseRequired(false);
203            if (connector != null) {
204                brokerInfo = connector.getBrokerInfo();
205            } else {
206                brokerInfo = new BrokerInfo();
207            }
208            brokerInfo.setBrokerName(broker.getBrokerName());
209            brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
210            brokerInfo.setSlaveBroker(true);
211            brokerInfo.setPassiveSlave(broker.isPassiveSlave());
212            restartBridge();
213            LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has been established.");
214        }
215    
216        public void stop() throws Exception {
217            if (!started.compareAndSet(true, false)||!masterActive.get()) {
218                return;
219            }
220            masterActive.set(false);
221            try {
222                // if (connectionInfo!=null){
223                // localBroker.request(connectionInfo.createRemoveCommand());
224                // }
225                // localBroker.setTransportListener(null);
226                // remoteBroker.setTransportListener(null);
227                remoteBroker.oneway(new ShutdownInfo());
228                localBroker.oneway(new ShutdownInfo());
229            } catch (IOException e) {
230                LOG.debug("Caught exception stopping", e);
231            } finally {
232                ServiceStopper ss = new ServiceStopper();
233                ss.stop(localBroker);
234                ss.stop(remoteBroker);
235                ss.throwFirstException();
236            }
237        }
238        
239        public void stopBeforeConnected()throws Exception{
240            masterActive.set(false);
241            started.set(false);
242            stoppedBeforeStart.set(true);
243            ServiceStopper ss = new ServiceStopper();
244            ss.stop(localBroker);
245            ss.stop(remoteBroker);
246        }
247    
248        protected void serviceRemoteException(IOException error) {
249            LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
250            shutDown();
251        }
252    
253        protected void serviceRemoteCommand(Command command) {
254            try {
255                if (command.isMessageDispatch()) {
256                    MessageDispatch md = (MessageDispatch)command;
257                    command = md.getMessage();
258                }
259                if (command.getDataStructureType() == CommandTypes.SHUTDOWN_INFO) {
260                    LOG.warn("The Master has shutdown");
261                    shutDown();
262                } else {
263                    boolean responseRequired = command.isResponseRequired();
264                    int commandId = command.getCommandId();
265                    if (responseRequired) {
266                        Response response = (Response)localBroker.request(command);
267                        response.setCorrelationId(commandId);
268                        remoteBroker.oneway(response);
269                    } else {
270                        localBroker.oneway(command);
271                    }
272                }
273            } catch (IOException e) {
274                serviceRemoteException(e);
275            }
276        }
277    
278        protected void serviceLocalException(Throwable error) {
279            if (!(error instanceof TransportDisposedIOException) || localBroker.isDisposed()){
280                    LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown: " + error.getMessage(), error);
281                    ServiceSupport.dispose(this);
282            }else{
283                    LOG.info(error.getMessage());
284            }
285        }
286    
287        /**
288         * @return Returns the localURI.
289         */
290        public URI getLocalURI() {
291            return localURI;
292        }
293    
294        /**
295         * @param localURI The localURI to set.
296         */
297        public void setLocalURI(URI localURI) {
298            this.localURI = localURI;
299        }
300    
301        /**
302         * @return Returns the remoteURI.
303         */
304        public URI getRemoteURI() {
305            return remoteURI;
306        }
307    
308        /**
309         * @param remoteURI The remoteURI to set.
310         */
311        public void setRemoteURI(URI remoteURI) {
312            this.remoteURI = remoteURI;
313        }
314    
315        /**
316         * @return Returns the password.
317         */
318        public String getPassword() {
319            return password;
320        }
321    
322        /**
323         * @param password The password to set.
324         */
325        public void setPassword(String password) {
326            this.password = password;
327        }
328    
329        /**
330         * @return Returns the userName.
331         */
332        public String getUserName() {
333            return userName;
334        }
335    
336        /**
337         * @param userName The userName to set.
338         */
339        public void setUserName(String userName) {
340            this.userName = userName;
341        }
342    
343        private void shutDown() {
344            masterActive.set(false);
345            broker.masterFailed();
346            ServiceSupport.dispose(this);
347        }
348    
349            public boolean isStoppedBeforeStart() {
350                    return stoppedBeforeStart.get();
351            }
352    
353        /**
354         * Get the failedToStart
355         * @return the failedToStart
356         */
357        public boolean isFailedToStart() {
358            return this.failedToStart;
359        }
360    
361        /**
362         * Set the failedToStart
363         * @param failedToStart the failedToStart to set
364         */
365        public void setFailedToStart(boolean failedToStart) {
366            this.failedToStart = failedToStart;
367        }
368    
369    }