001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.io.IOException;
020    import java.nio.channels.SelectionKey;
021    import java.nio.channels.Selector;
022    import java.util.Iterator;
023    import java.util.Set;
024    import java.util.concurrent.ConcurrentLinkedQueue;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    public class SelectorWorker implements Runnable {
028    
029        private static final AtomicInteger NEXT_ID = new AtomicInteger();
030    
031        final SelectorManager manager;
032        final Selector selector;
033        final int id = NEXT_ID.getAndIncrement();
034        private final int maxChannelsPerWorker;
035    
036        final AtomicInteger retainCounter = new AtomicInteger(1);
037        private final ConcurrentLinkedQueue<Runnable> ioTasks = new ConcurrentLinkedQueue<Runnable>();
038           
039        public SelectorWorker(SelectorManager manager) throws IOException {
040            this.manager = manager;
041            selector = Selector.open();
042            maxChannelsPerWorker = manager.getMaxChannelsPerWorker();
043            manager.getSelectorExecutor().execute(this);
044        }
045    
046        void retain() {
047            if (retainCounter.incrementAndGet() == maxChannelsPerWorker) {
048                manager.onWorkerFullEvent(this);
049            }
050        }
051    
052        void release() {
053            int use = retainCounter.decrementAndGet();
054            if (use == 0) {
055                manager.onWorkerEmptyEvent(this);
056            } else if (use == maxChannelsPerWorker - 1) {
057                manager.onWorkerNotFullEvent(this);
058            }
059        }
060        
061        boolean isReleased() {
062            return retainCounter.get()==0;
063        }
064    
065    
066        public void addIoTask(Runnable work) {
067            ioTasks.add(work);
068            selector.wakeup();
069        }
070        
071        private void processIoTasks() {
072            Runnable task; 
073            while( (task= ioTasks.poll()) !=null ) {
074                try {
075                    task.run();
076                } catch (Throwable e) {
077                    e.printStackTrace();
078                }
079            }
080        }
081    
082        
083    
084        public void run() {
085    
086            String origName = Thread.currentThread().getName();
087            try {
088                Thread.currentThread().setName("Selector Worker: " + id);
089                while (!isReleased()) {
090                    
091                    processIoTasks();
092                    
093                    int count = selector.select(10);
094                    
095                    if (count == 0) {
096                        continue;
097                    }
098    
099                    // Get a java.util.Set containing the SelectionKey objects
100                    // for all channels that are ready for I/O.
101                    Set keys = selector.selectedKeys();
102    
103                    for (Iterator i = keys.iterator(); i.hasNext();) {
104                        final SelectionKey key = (SelectionKey)i.next();
105                        i.remove();
106    
107                        final SelectorSelection s = (SelectorSelection)key.attachment();
108                        try {
109                            if( key.isValid() ) {
110                                key.interestOps(0);
111                            }
112    
113                            // Kick off another thread to find newly selected keys
114                            // while we process the
115                            // currently selected keys
116                            manager.getChannelExecutor().execute(new Runnable() {
117                                public void run() {
118                                    try {
119                                        s.onSelect();
120                                        s.enable();
121                                    } catch (Throwable e) {
122                                        s.onError(e);
123                                    }
124                                }
125                            });
126    
127                        } catch (Throwable e) {
128                            s.onError(e);
129                        }
130    
131                    }
132    
133                }
134            } catch (Throwable e) {                 
135                e.printStackTrace();
136                // Notify all the selections that the error occurred.
137                Set keys = selector.keys();
138                for (Iterator i = keys.iterator(); i.hasNext();) {
139                    SelectionKey key = (SelectionKey)i.next();
140                    SelectorSelection s = (SelectorSelection)key.attachment();
141                    s.onError(e);
142                }
143            } finally {
144                try {
145                    manager.onWorkerEmptyEvent(this);
146                    selector.close();
147                } catch (IOException ignore) {
148                    ignore.printStackTrace();
149                }
150                Thread.currentThread().setName(origName);
151            }
152        }
153    
154    }