001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.partition;
018    
019    import org.apache.activemq.broker.*;
020    import org.apache.activemq.command.*;
021    import org.apache.activemq.partition.dto.Partitioning;
022    import org.apache.activemq.partition.dto.Target;
023    import org.apache.activemq.state.ConsumerState;
024    import org.apache.activemq.state.SessionState;
025    import org.apache.activemq.transport.Transport;
026    import org.apache.activemq.util.LRUCache;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    import java.net.InetSocketAddress;
031    import java.net.Socket;
032    import java.net.SocketAddress;
033    import java.util.*;
034    import java.util.concurrent.ConcurrentHashMap;
035    
036    /**
037     * A BrokerFilter which partitions client connections over a cluster of brokers.
038     *
039     * It can use a client identifier like client id, authenticated user name, source ip
040     * address or even destination being used by the connection to figure out which
041     * is the best broker in the cluster that the connection should be using and then
042     * redirects failover clients to that broker.
043     */
044    public class PartitionBroker extends BrokerFilter {
045    
046        protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
047        protected final PartitionBrokerPlugin plugin;
048        protected boolean reloadConfigOnPoll = true;
049    
050        public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
051            super(broker);
052            this.plugin = plugin;
053        }
054    
055        @Override
056        public void start() throws Exception {
057            super.start();
058            getExecutor().execute(new Runnable() {
059                @Override
060                public void run() {
061                    Thread.currentThread().setName("Partition Monitor");
062                    onMonitorStart();
063                    try {
064                        runPartitionMonitor();
065                    } catch (Exception e) {
066                        onMonitorStop();
067                    }
068                }
069            });
070        }
071    
072        protected void onMonitorStart() {
073        }
074        protected void onMonitorStop() {
075        }
076    
077        protected void runPartitionMonitor() {
078            while( !isStopped() ) {
079                try {
080                    monitorWait();
081                } catch (InterruptedException e) {
082                    break;
083                }
084    
085                if(reloadConfigOnPoll) {
086                    try {
087                        reloadConfiguration();
088                    } catch (Exception e) {
089                        continue;
090                    }
091                }
092    
093                for( ConnectionMonitor monitor: monitors.values()) {
094                    checkTarget(monitor);
095                }
096            }
097        }
098    
099        protected void monitorWait() throws InterruptedException {
100            synchronized (this) {
101                this.wait(1000);
102            }
103        }
104    
105        protected void monitorWakeup()  {
106            synchronized (this) {
107                this.notifyAll();
108            }
109        }
110    
111        protected void reloadConfiguration() throws Exception {
112        }
113    
114        protected void checkTarget(ConnectionMonitor monitor) {
115    
116            // can we find a preferred target for the connection?
117            Target targetDTO = pickBestBroker(monitor);
118            if( targetDTO == null || targetDTO.ids==null) {
119                LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
120                return;
121            }
122    
123            // Are we one the the targets?
124            if( targetDTO.ids.contains(getBrokerName()) ) {
125                LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
126                return;
127            }
128    
129            // Then we need to move the connection over.
130            String connectionString = getConnectionString(targetDTO.ids);
131            if( connectionString==null ) {
132                LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
133                return;
134            }
135    
136            LOG.info("Redirecting connection to: " + connectionString);
137            TransportConnection connection = (TransportConnection)monitor.context.getConnection();
138            ConnectionControl cc = new ConnectionControl();
139            cc.setConnectedBrokers(connectionString);
140            cc.setRebalanceConnection(true);
141            connection.dispatchAsync(cc);
142        }
143    
144        protected String getConnectionString(HashSet<String> ids) {
145            StringBuilder rc = new StringBuilder();
146            for (String id : ids) {
147                String url = plugin.getBrokerURL(this, id);
148                if( url!=null ) {
149                    if( rc.length()!=0 ) {
150                        rc.append(',');
151                    }
152                    rc.append(url);
153                }
154            }
155            if( rc.length()==0 )
156                return null;
157            return rc.toString();
158        }
159    
160        static private class Score {
161            int value;
162        }
163    
164        protected Target pickBestBroker(ConnectionMonitor monitor) {
165    
166            if( getConfig() ==null )
167                return null;
168    
169            if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
170                TransportConnection connection = (TransportConnection)monitor.context.getConnection();
171                Transport transport = connection.getTransport();
172                Socket socket = transport.narrow(Socket.class);
173                if( socket !=null ) {
174                    SocketAddress address = socket.getRemoteSocketAddress();
175                    if( address instanceof InetSocketAddress) {
176                        String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
177                        Target targetDTO = getConfig().bySourceIp.get(ip);
178                        if( targetDTO!=null ) {
179                            return targetDTO;
180                        }
181                    }
182                }
183            }
184    
185            if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
186                String userName = monitor.context.getUserName();
187                if( userName !=null ) {
188                    Target targetDTO = getConfig().byUserName.get(userName);
189                    if( targetDTO!=null ) {
190                        return targetDTO;
191                    }
192                }
193            }
194    
195            if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
196                String clientId = monitor.context.getClientId();
197                if( clientId!=null ) {
198                    Target targetDTO = getConfig().byClientId.get(clientId);
199                    if( targetDTO!=null ) {
200                        return targetDTO;
201                    }
202                }
203            }
204    
205            if(
206                 (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
207              || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
208              ) {
209    
210                // Collect the destinations the connection is consuming from...
211                HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
212                for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
213                    for (ConsumerState consumer : session.getConsumerStates()) {
214                        ActiveMQDestination destination = consumer.getInfo().getDestination();
215                        if( destination.isComposite() ) {
216                            dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
217                        } else {
218                            dests.addAll(Collections.singletonList(destination));
219                        }
220                    }
221                }
222    
223                // Group them by the partitioning target for the destinations and score them..
224                HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
225                for (ActiveMQDestination dest : dests) {
226                    Target target = getTarget(dest);
227                    if( target!=null ) {
228                        Score score = targetScores.get(target);
229                        if( score == null ) {
230                            score = new Score();
231                            targetScores.put(target, score);
232                        }
233                        score.value++;
234                    }
235                }
236    
237                // The target with largest score wins..
238                if( !targetScores.isEmpty() ) {
239                    Target bestTarget = null;
240                    int bestScore=0;
241                    for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
242                        if( entry.getValue().value > bestScore ) {
243                            bestTarget = entry.getKey();
244                        }
245                    }
246                    return bestTarget;
247                }
248    
249                // If we get here is because there were no consumers, or the destinations for those
250                // consumers did not have an assigned destination..  So partition based on producer
251                // usage.
252                Target best = monitor.findBestProducerTarget(this);
253                if( best!=null ) {
254                    return best;
255                }
256            }
257            return null;
258        }
259    
260        protected Target getTarget(ActiveMQDestination dest) {
261            Partitioning config = getConfig();
262            if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
263                return config.byQueue.get(dest.getPhysicalName());
264            } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
265                return config.byTopic.get(dest.getPhysicalName());
266            }
267            return null;
268        }
269    
270        protected final ConcurrentHashMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
271    
272        @Override
273        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
274            if( info.isFaultTolerant() ) {
275                ConnectionMonitor monitor = new ConnectionMonitor(context);
276                monitors.put(info.getConnectionId(), monitor);
277                super.addConnection(context, info);
278                checkTarget(monitor);
279            } else {
280                super.addConnection(context, info);
281            }
282        }
283    
284        @Override
285        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
286            super.removeConnection(context, info, error);
287            if( info.isFaultTolerant() ) {
288                monitors.remove(info.getConnectionId());
289            }
290        }
291    
292        @Override
293        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
294            ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
295            if( monitor!=null ) {
296                monitor.onSend(producerExchange, messageSend);
297            }
298        }
299    
300        protected Partitioning getConfig() {
301            return plugin.getConfig();
302        }
303    
304    
305        static class Traffic {
306            long messages;
307            long bytes;
308        }
309    
310        static class ConnectionMonitor {
311    
312            final ConnectionContext context;
313            LRUCache<ActiveMQDestination, Traffic> trafficPerDestination =  new LRUCache<ActiveMQDestination, Traffic>();
314    
315            public ConnectionMonitor(ConnectionContext context) {
316                this.context = context;
317            }
318    
319            synchronized public Target findBestProducerTarget(PartitionBroker broker) {
320                Target best = null;
321                long bestSize = 0 ;
322                for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
323                    Traffic t = entry.getValue();
324                    // Once we get enough messages...
325                    if( t.messages < broker.plugin.getMinTransferCount()) {
326                        continue;
327                    }
328                    if( t.bytes > bestSize) {
329                        bestSize = t.bytes;
330                        Target target = broker.getTarget(entry.getKey());
331                        if( target!=null ) {
332                            best = target;
333                        }
334                    }
335                }
336                return best;
337            }
338    
339            synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
340                ActiveMQDestination dest = message.getDestination();
341                Traffic traffic = trafficPerDestination.get(dest);
342                if( traffic == null ) {
343                    traffic = new Traffic();
344                    trafficPerDestination.put(dest, traffic);
345                }
346                traffic.messages += 1;
347                traffic.bytes += message.getSize();
348            }
349    
350    
351        }
352    
353    }