001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.ft;
018    
019    import java.util.Map;
020    import java.util.concurrent.ConcurrentHashMap;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import org.apache.activemq.broker.Connection;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.ConsumerBrokerExchange;
026    import org.apache.activemq.broker.InsertableMutableBrokerFilter;
027    import org.apache.activemq.broker.MutableBrokerFilter;
028    import org.apache.activemq.broker.ProducerBrokerExchange;
029    import org.apache.activemq.broker.region.Subscription;
030    import org.apache.activemq.command.Command;
031    import org.apache.activemq.command.ConnectionControl;
032    import org.apache.activemq.command.ConnectionInfo;
033    import org.apache.activemq.command.ConsumerId;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.DestinationInfo;
036    import org.apache.activemq.command.ExceptionResponse;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.MessageDispatch;
040    import org.apache.activemq.command.MessageDispatchNotification;
041    import org.apache.activemq.command.ProducerInfo;
042    import org.apache.activemq.command.RemoveInfo;
043    import org.apache.activemq.command.RemoveSubscriptionInfo;
044    import org.apache.activemq.command.Response;
045    import org.apache.activemq.command.SessionInfo;
046    import org.apache.activemq.command.TransactionId;
047    import org.apache.activemq.command.TransactionInfo;
048    import org.apache.activemq.transport.MutexTransport;
049    import org.apache.activemq.transport.ResponseCorrelator;
050    import org.apache.activemq.transport.Transport;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     * The Message Broker which passes messages to a slave
056     * 
057     * 
058     */
059    public class MasterBroker extends InsertableMutableBrokerFilter {
060    
061        private static final Logger LOG = LoggerFactory.getLogger(MasterBroker.class);
062        private Transport slave;
063        private AtomicBoolean started = new AtomicBoolean(false);
064    
065        private Map<ConsumerId, ConsumerId> consumers = new ConcurrentHashMap<ConsumerId, ConsumerId>();
066        
067        /**
068         * Constructor
069         * 
070         * @param parent
071         * @param transport
072         */
073        public MasterBroker(MutableBrokerFilter parent, Transport transport) {
074            super(parent);
075            this.slave = transport;
076            this.slave = new MutexTransport(slave);
077            this.slave = new ResponseCorrelator(slave);
078            this.slave.setTransportListener(transport.getTransportListener());
079        }
080    
081        /**
082         * start processing this broker
083         */
084        public void startProcessing() {
085            started.set(true);
086            try {
087                Connection[] connections = getClients();
088                ConnectionControl command = new ConnectionControl();
089                command.setFaultTolerant(true);
090                if (connections != null) {
091                    for (int i = 0; i < connections.length; i++) {
092                        if (connections[i].isActive() && connections[i].isManageable()) {
093                            connections[i].dispatchAsync(command);
094                        }
095                    }
096                }
097            } catch (Exception e) {
098                LOG.error("Failed to get Connections", e);
099            }
100        }
101    
102        /**
103         * stop the broker
104         * 
105         * @throws Exception
106         */
107        public void stop() throws Exception {
108            stopProcessing();
109        }
110    
111        /**
112         * stop processing this broker
113         */
114        public void stopProcessing() {
115            if (started.compareAndSet(true, false)) {
116                remove();
117            }
118        }
119    
120        /**
121         * A client is establishing a connection with the broker.
122         * 
123         * @param context
124         * @param info
125         * @throws Exception
126         */
127        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
128            super.addConnection(context, info);
129            sendAsyncToSlave(info);
130        }
131    
132        /**
133         * A client is disconnecting from the broker.
134         * 
135         * @param context the environment the operation is being executed under.
136         * @param info
137         * @param error null if the client requested the disconnect or the error
138         *                that caused the client to disconnect.
139         * @throws Exception
140         */
141        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
142            super.removeConnection(context, info, error);
143            sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
144        }
145    
146        /**
147         * Adds a session.
148         * 
149         * @param context
150         * @param info
151         * @throws Exception
152         */
153        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
154            super.addSession(context, info);
155            sendAsyncToSlave(info);
156        }
157    
158        /**
159         * Removes a session.
160         * 
161         * @param context
162         * @param info
163         * @throws Exception
164         */
165        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
166            super.removeSession(context, info);
167            sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
168        }
169    
170        /**
171         * Adds a producer.
172         * 
173         * @param context the enviorment the operation is being executed under.
174         * @param info
175         * @throws Exception
176         */
177        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
178            super.addProducer(context, info);
179            sendAsyncToSlave(info);
180        }
181    
182        /**
183         * Removes a producer.
184         * 
185         * @param context the environment the operation is being executed under.
186         * @param info
187         * @throws Exception
188         */
189        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
190            super.removeProducer(context, info);
191            sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
192        }
193    
194        /**
195         * add a consumer
196         * 
197         * @param context
198         * @param info
199         * @return the associated subscription
200         * @throws Exception
201         */
202        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
203            sendSyncToSlave(info);
204            consumers.put(info.getConsumerId(), info.getConsumerId());
205            return super.addConsumer(context, info);
206        }
207    
208        @Override
209        public void removeConsumer(ConnectionContext context, ConsumerInfo info)
210                throws Exception {
211            super.removeConsumer(context, info);
212            consumers.remove(info.getConsumerId());
213            sendSyncToSlave(new RemoveInfo(info.getConsumerId()));
214       }
215    
216        /**
217         * remove a subscription
218         * 
219         * @param context
220         * @param info
221         * @throws Exception
222         */
223        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
224            super.removeSubscription(context, info);
225            sendAsyncToSlave(info);
226        }
227        
228        @Override
229        public void addDestinationInfo(ConnectionContext context,
230                DestinationInfo info) throws Exception {
231            super.addDestinationInfo(context, info);
232            if (info.getDestination().isTemporary()) {
233                sendAsyncToSlave(info);
234            }
235        }
236    
237        @Override
238        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
239            super.removeDestinationInfo(context, info);
240            if (info.getDestination().isTemporary()) {
241                sendAsyncToSlave(info);
242            }
243        }
244        
245        /**
246         * begin a transaction
247         * 
248         * @param context
249         * @param xid
250         * @throws Exception
251         */
252        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
253            TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN);
254            sendAsyncToSlave(info);
255            super.beginTransaction(context, xid);
256        }
257    
258        /**
259         * Prepares a transaction. Only valid for xa transactions.
260         * 
261         * @param context
262         * @param xid
263         * @return the state
264         * @throws Exception
265         */
266        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
267            TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE);
268            sendSyncToSlave(info);
269            int result = super.prepareTransaction(context, xid);
270            return result;
271        }
272    
273        /**
274         * Rollsback a transaction.
275         * 
276         * @param context
277         * @param xid
278         * @throws Exception
279         */
280        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
281            TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK);
282            sendAsyncToSlave(info);
283            super.rollbackTransaction(context, xid);
284        }
285    
286        /**
287         * Commits a transaction.
288         * 
289         * @param context
290         * @param xid
291         * @param onePhase
292         * @throws Exception
293         */
294        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
295            TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.COMMIT_ONE_PHASE);
296            sendSyncToSlave(info);
297            super.commitTransaction(context, xid, onePhase);
298        }
299    
300        /**
301         * Forgets a transaction.
302         * 
303         * @param context
304         * @param xid
305         * @throws Exception
306         */
307        public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
308            TransactionInfo info = new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET);
309            sendAsyncToSlave(info);
310            super.forgetTransaction(context, xid);
311        }
312    
313        /**
314         * Notifiy the Broker that a dispatch will happen
315         * Do in 'pre' so that slave will avoid getting ack before dispatch
316         * similar logic to send() below.
317         * @param messageDispatch
318         */
319        public void preProcessDispatch(MessageDispatch messageDispatch) {
320            super.preProcessDispatch(messageDispatch);
321            MessageDispatchNotification mdn = new MessageDispatchNotification();
322            mdn.setConsumerId(messageDispatch.getConsumerId());
323            mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
324            mdn.setDestination(messageDispatch.getDestination());
325            if (messageDispatch.getMessage() != null) {
326                Message msg = messageDispatch.getMessage();
327                mdn.setMessageId(msg.getMessageId());
328                if (consumers.containsKey(messageDispatch.getConsumerId())) {
329                    sendSyncToSlave(mdn);
330                }
331            }
332        }
333    
334        /**
335         * @param context
336         * @param message
337         * @throws Exception
338         */
339        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
340            /**
341             * A message can be dispatched before the super.send() method returns so -
342             * here the order is switched to avoid problems on the slave with
343             * receiving acks for messages not received yet
344             * copy ensures we don't mess with the correlator and command ids
345             */
346            sendSyncToSlave(message.copy());
347            super.send(producerExchange, message);
348        }
349    
350        /**
351         * @param context
352         * @param ack
353         * @throws Exception
354         */
355        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
356            sendToSlave(ack);
357            super.acknowledge(consumerExchange, ack);
358        }
359    
360        public boolean isFaultTolerantConfiguration() {
361            return true;
362        }
363    
364        protected void sendToSlave(Message message) {
365            if (message.isResponseRequired()) {
366                sendSyncToSlave(message);
367            } else {
368                sendAsyncToSlave(message);
369            }
370        }
371    
372        protected void sendToSlave(MessageAck ack) {
373            if (ack.isResponseRequired()) {
374                sendAsyncToSlave(ack);
375            } else {
376                sendSyncToSlave(ack);
377            }
378        }
379    
380        protected void sendAsyncToSlave(Command command) {
381            try {
382                slave.oneway(command);
383            } catch (Throwable e) {
384                LOG.error("Slave Failed", e);
385                stopProcessing();
386            }
387        }
388    
389        protected void sendSyncToSlave(Command command) {
390            try {
391                Response response = (Response)slave.request(command);
392                if (response.isException()) {
393                    ExceptionResponse er = (ExceptionResponse)response;
394                    LOG.error("Slave Failed", er.getException());
395                }
396            } catch (Throwable e) {
397                LOG.error("Slave Failed", e);
398            }
399        }
400    }