001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.nio;
018    
019    import java.nio.channels.CancelledKeyException;
020    import java.nio.channels.ClosedChannelException;
021    import java.nio.channels.SelectionKey;
022    import java.nio.channels.SocketChannel;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    
025    import org.apache.activemq.transport.nio.SelectorManager.Listener;
026    
027    /**
028     * @author chirino
029     */
030    public final class SelectorSelection {
031    
032        private final SelectorWorker worker;
033        private final Listener listener;
034        private int interest;
035        private SelectionKey key;
036        private AtomicBoolean closed = new AtomicBoolean();
037    
038        public SelectorSelection(final SelectorWorker worker, final SocketChannel socketChannel, Listener listener) throws ClosedChannelException {
039            this.worker = worker;
040            this.listener = listener;
041            worker.addIoTask(new Runnable() {
042                public void run() {
043                    try {
044                        SelectorSelection.this.key = socketChannel.register(worker.selector, 0, SelectorSelection.this);
045                    } catch (Exception e) {
046                        e.printStackTrace();
047                    }
048                }
049            });
050        }
051    
052        public void setInterestOps(int ops) {
053            interest = ops;
054        }
055    
056        public void enable() {
057            worker.addIoTask(new Runnable() {
058                public void run() {
059                    try {
060                        key.interestOps(interest);
061                    } catch (CancelledKeyException e) {
062                    }
063                }
064            });        
065        }
066    
067        public void disable() {
068            worker.addIoTask(new Runnable() {
069                public void run() {
070                    try {
071                        key.interestOps(0);
072                    } catch (CancelledKeyException e) {
073                    }
074                }
075            });        
076        }
077    
078        public void close() {
079            // guard against multiple closes.
080            if( closed.compareAndSet(false, true) ) {
081                worker.addIoTask(new Runnable() {
082                    public void run() {
083                        try {
084                            key.cancel();
085                        } catch (CancelledKeyException e) {
086                        }
087                        worker.release();
088                    }
089                });        
090            }
091        }
092    
093        public void onSelect() {
094            listener.onSelect(this);
095        }
096    
097        public void onError(Throwable e) {
098            listener.onError(this, e);
099        }
100    
101    }