001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.IOException;
020    import java.nio.channels.SocketChannel;
021    import java.util.LinkedList;
022    import java.util.concurrent.Executor;
023    import java.util.concurrent.ExecutorService;
024    import java.util.concurrent.SynchronousQueue;
025    import java.util.concurrent.ThreadFactory;
026    import java.util.concurrent.ThreadPoolExecutor;
027    import java.util.concurrent.TimeUnit;
028    
029    /**
030     * The SelectorManager will manage one Selector and the thread that checks the
031     * selector.
032     *
033     * We may need to consider running more than one thread to check the selector if
034     * servicing the selector takes too long.
035     */
036    public final class SelectorManager {
037    
038        public static final SelectorManager SINGLETON = new SelectorManager();
039    
040        private Executor selectorExecutor = createDefaultExecutor();
041        private Executor channelExecutor = selectorExecutor;
042        private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
043        private int maxChannelsPerWorker = 1024;
044    
045        protected ExecutorService createDefaultExecutor() {
046            ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
047    
048                private long i = 0;
049    
050                public Thread newThread(Runnable runnable) {
051                    this.i++;
052                    final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
053                    return t;
054                }
055            });
056    
057            return rc;
058        }
059    
060        public static SelectorManager getInstance() {
061            return SINGLETON;
062        }
063    
064        public interface Listener {
065            void onSelect(SelectorSelection selector);
066            void onError(SelectorSelection selection, Throwable error);
067        }
068    
069        public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
070            throws IOException {
071    
072            SelectorSelection selection = null;
073            while( selection == null ) {
074                if (freeWorkers.size() > 0) {
075                    SelectorWorker worker = freeWorkers.getFirst();
076                    if( worker.isReleased() ) {
077                        freeWorkers.remove(worker);
078                    } else {
079                        worker.retain();
080                        selection = new SelectorSelection(worker, socketChannel, listener);
081                    }
082                } else {
083                    // Worker starts /w retain count of 1
084                    SelectorWorker worker = new SelectorWorker(this);
085                    freeWorkers.addFirst(worker);
086                    selection = new SelectorSelection(worker, socketChannel, listener);
087                }
088            }
089    
090            return selection;
091        }
092    
093        synchronized void onWorkerFullEvent(SelectorWorker worker) {
094            freeWorkers.remove(worker);
095        }
096    
097        public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
098            freeWorkers.remove(worker);
099        }
100    
101        public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
102            freeWorkers.addFirst(worker);
103        }
104    
105        public Executor getChannelExecutor() {
106            return channelExecutor;
107        }
108    
109        public void setChannelExecutor(Executor channelExecutor) {
110            this.channelExecutor = channelExecutor;
111        }
112    
113        public int getMaxChannelsPerWorker() {
114            return maxChannelsPerWorker;
115        }
116    
117        public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
118            this.maxChannelsPerWorker = maxChannelsPerWorker;
119        }
120    
121        public Executor getSelectorExecutor() {
122            return selectorExecutor;
123        }
124    
125        public void setSelectorExecutor(Executor selectorExecutor) {
126            this.selectorExecutor = selectorExecutor;
127        }
128    }