001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.concurrent.BlockingQueue;
023    import java.util.concurrent.LinkedBlockingQueue;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicLong;
027    
028    import org.apache.activemq.command.ShutdownInfo;
029    import org.apache.activemq.thread.Task;
030    import org.apache.activemq.thread.TaskRunner;
031    import org.apache.activemq.thread.TaskRunnerFactory;
032    import org.apache.activemq.transport.FutureResponse;
033    import org.apache.activemq.transport.ResponseCallback;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.TransportDisposedIOException;
036    import org.apache.activemq.transport.TransportListener;
037    
038    /**
039     * A Transport implementation that uses direct method invocations.
040     */
041    public class VMTransport implements Transport, Task {
042    
043        private static final Object DISCONNECT = new Object();
044        private static final AtomicLong NEXT_ID = new AtomicLong(0);
045    
046        // Transport Configuration
047        protected VMTransport peer;
048        protected TransportListener transportListener;
049        protected boolean marshal;
050        protected boolean network;
051        protected boolean async = true;
052        protected int asyncQueueDepth = 2000;
053        protected final URI location;
054        protected final long id;
055    
056        // Implementation
057        private LinkedBlockingQueue<Object> messageQueue;
058        private TaskRunnerFactory taskRunnerFactory;
059        private TaskRunner taskRunner;
060    
061        // Transport State
062        protected final AtomicBoolean started = new AtomicBoolean();
063        protected final AtomicBoolean disposed = new AtomicBoolean();
064    
065        private volatile int receiveCounter;
066    
067        public VMTransport(URI location) {
068            this.location = location;
069            this.id = NEXT_ID.getAndIncrement();
070        }
071    
072        public void setPeer(VMTransport peer) {
073            this.peer = peer;
074        }
075    
076        public void oneway(Object command) throws IOException {
077    
078            if (disposed.get()) {
079                throw new TransportDisposedIOException("Transport disposed.");
080            }
081    
082            if (peer == null) {
083                throw new IOException("Peer not connected.");
084            }
085    
086            try {
087    
088                if (peer.disposed.get()) {
089                    throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
090                }
091    
092                if (peer.async || !peer.started.get()) {
093                    peer.getMessageQueue().put(command);
094                    peer.wakeup();
095                    return;
096                }
097    
098            } catch (InterruptedException e) {
099                InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
100                iioe.initCause(e);
101                throw iioe;
102            }
103    
104            dispatch(peer, peer.messageQueue, command);
105        }
106    
107        public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
108            TransportListener transportListener = transport.getTransportListener();
109            if (transportListener != null) {
110                // Lock here on the target transport's started since we want to wait for its start()
111                // method to finish dispatching out of the queue before we do our own.
112                synchronized (transport.started) {
113    
114                    // Ensure that no additional commands entered the queue in the small time window
115                    // before the start method locks the dispatch lock and the oneway method was in
116                    // an put operation.
117                    while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
118                        doDispatch(transport, transportListener, pending.poll());
119                    }
120    
121                    // We are now in sync mode and won't enqueue any more commands to the target
122                    // transport so lets clean up its resources.
123                    transport.messageQueue = null;
124    
125                    // Don't dispatch if either end was disposed already.
126                    if (command != null && !this.disposed.get() && !transport.isDisposed()) {
127                        doDispatch(transport, transportListener, command);
128                    }
129                }
130            }
131        }
132    
133        public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
134            if (command == DISCONNECT) {
135                transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
136            } else {
137                transport.receiveCounter++;
138                transportListener.onCommand(command);
139            }
140        }
141    
142        public void start() throws Exception {
143    
144            if (transportListener == null) {
145                throw new IOException("TransportListener not set.");
146            }
147    
148            // If we are not in async mode we lock the dispatch lock here and then start to
149            // prevent any sync dispatches from occurring until we dispatch the pending messages
150            // to maintain delivery order.  When async this happens automatically so just set
151            // started and wakeup the task runner.
152            if (!async) {
153                synchronized (started) {
154                    if (started.compareAndSet(false, true)) {
155                        LinkedBlockingQueue<Object> mq = getMessageQueue();
156                        Object command;
157                        while ((command = mq.poll()) != null && !disposed.get() ) {
158                            receiveCounter++;
159                            doDispatch(this, transportListener, command);
160                        }
161                    }
162                }
163            } else {
164                if (started.compareAndSet(false, true)) {
165                    wakeup();
166                }
167            }
168        }
169    
170        public void stop() throws Exception {
171            // Only need to do this once, all future oneway calls will now
172            // fail as will any asnyc jobs in the task runner.
173            if (disposed.compareAndSet(false, true)) {
174    
175                TaskRunner tr = taskRunner;
176                LinkedBlockingQueue<Object> mq = this.messageQueue;
177    
178                taskRunner = null;
179                messageQueue = null;
180    
181                if (mq != null) {
182                    mq.clear();
183                }
184    
185                // Allow pending deliveries to finish up, but don't wait
186                // forever in case of an stalled onCommand.
187                if (tr != null) {
188                    try {
189                        tr.shutdown(TimeUnit.SECONDS.toMillis(1));
190                    } catch(Exception e) {
191                    }
192                    taskRunner = null;
193                }
194    
195                // let the peer know that we are disconnecting after attempting
196                // to cleanly shutdown the async tasks so that this is the last
197                // command it see's.
198                try {
199                    peer.transportListener.onCommand(new ShutdownInfo());
200                } catch (Exception ignore) {
201                }
202    
203                // shutdown task runner factory
204                if (taskRunnerFactory != null) {
205                    taskRunnerFactory.shutdownNow();
206                    taskRunnerFactory = null;
207                }
208            }
209        }
210    
211        protected void wakeup() {
212            if (async && started.get()) {
213                try {
214                    getTaskRunner().wakeup();
215                } catch (InterruptedException e) {
216                    Thread.currentThread().interrupt();
217                } catch (TransportDisposedIOException e) {
218                }
219            }
220        }
221    
222        /**
223         * @see org.apache.activemq.thread.Task#iterate()
224         */
225        public boolean iterate() {
226    
227            final TransportListener tl = transportListener;
228    
229            LinkedBlockingQueue<Object> mq;
230            try {
231                mq = getMessageQueue();
232            } catch (TransportDisposedIOException e) {
233                return false;
234            }
235    
236            Object command = mq.poll();
237            if (command != null && !disposed.get()) {
238                if( command == DISCONNECT ) {
239                    tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
240                } else {
241                    tl.onCommand(command);
242                }
243                return !mq.isEmpty() && !disposed.get();
244            } else {
245                if(disposed.get()) {
246                    mq.clear();
247                }
248                return false;
249            }
250        }
251    
252        public void setTransportListener(TransportListener commandListener) {
253            this.transportListener = commandListener;
254        }
255    
256        public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
257            synchronized (this) {
258                if (messageQueue == null) {
259                    messageQueue = asyncQueue;
260                }
261            }
262        }
263    
264        public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
265            LinkedBlockingQueue<Object> result = messageQueue;
266            if (result == null) {
267                synchronized (this) {
268                    result = messageQueue;
269                    if (result == null) {
270                        if (disposed.get()) {
271                            throw new TransportDisposedIOException("The Transport has been disposed");
272                        }
273    
274                        messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
275                    }
276                }
277            }
278            return result;
279        }
280    
281        protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
282            TaskRunner result = taskRunner;
283            if (result == null) {
284                synchronized (this) {
285                    result = taskRunner;
286                    if (result == null) {
287                        if (disposed.get()) {
288                            throw new TransportDisposedIOException("The Transport has been disposed");
289                        }
290    
291                        String name = "ActiveMQ VMTransport: " + toString();
292                        if (taskRunnerFactory == null) {
293                            taskRunnerFactory = new TaskRunnerFactory(name);
294                            taskRunnerFactory.init();
295                        }
296                        taskRunner = result = taskRunnerFactory.createTaskRunner(this, name);
297                    }
298                }
299            }
300            return result;
301        }
302    
303        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
304            throw new AssertionError("Unsupported Method");
305        }
306    
307        public Object request(Object command) throws IOException {
308            throw new AssertionError("Unsupported Method");
309        }
310    
311        public Object request(Object command, int timeout) throws IOException {
312            throw new AssertionError("Unsupported Method");
313        }
314    
315        public TransportListener getTransportListener() {
316            return transportListener;
317        }
318    
319        public <T> T narrow(Class<T> target) {
320            if (target.isAssignableFrom(getClass())) {
321                return target.cast(this);
322            }
323            return null;
324        }
325    
326        public boolean isMarshal() {
327            return marshal;
328        }
329    
330        public void setMarshal(boolean marshal) {
331            this.marshal = marshal;
332        }
333    
334        public boolean isNetwork() {
335            return network;
336        }
337    
338        public void setNetwork(boolean network) {
339            this.network = network;
340        }
341    
342        @Override
343        public String toString() {
344            return location + "#" + id;
345        }
346    
347        public String getRemoteAddress() {
348            if (peer != null) {
349                return peer.toString();
350            }
351            return null;
352        }
353    
354        /**
355         * @return the async
356         */
357        public boolean isAsync() {
358            return async;
359        }
360    
361        /**
362         * @param async the async to set
363         */
364        public void setAsync(boolean async) {
365            this.async = async;
366        }
367    
368        /**
369         * @return the asyncQueueDepth
370         */
371        public int getAsyncQueueDepth() {
372            return asyncQueueDepth;
373        }
374    
375        /**
376         * @param asyncQueueDepth the asyncQueueDepth to set
377         */
378        public void setAsyncQueueDepth(int asyncQueueDepth) {
379            this.asyncQueueDepth = asyncQueueDepth;
380        }
381    
382        public boolean isFaultTolerant() {
383            return false;
384        }
385    
386        public boolean isDisposed() {
387            return disposed.get();
388        }
389    
390        public boolean isConnected() {
391            return !disposed.get();
392        }
393    
394        public void reconnect(URI uri) throws IOException {
395            throw new IOException("Transport reconnect is not supported");
396        }
397    
398        public boolean isReconnectSupported() {
399            return false;
400        }
401    
402        public boolean isUpdateURIsSupported() {
403            return false;
404        }
405    
406        public void updateURIs(boolean reblance,URI[] uris) throws IOException {
407            throw new IOException("URI update feature not supported");
408        }
409    
410        public int getReceiveCounter() {
411            return receiveCounter;
412        }
413    }