001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.vm;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.concurrent.BlockingQueue;
023    import java.util.concurrent.LinkedBlockingQueue;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicLong;
027    
028    import org.apache.activemq.command.ShutdownInfo;
029    import org.apache.activemq.thread.Task;
030    import org.apache.activemq.thread.TaskRunner;
031    import org.apache.activemq.thread.TaskRunnerFactory;
032    import org.apache.activemq.transport.FutureResponse;
033    import org.apache.activemq.transport.ResponseCallback;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.TransportDisposedIOException;
036    import org.apache.activemq.transport.TransportListener;
037    
038    /**
039     * A Transport implementation that uses direct method invocations.
040     */
041    public class VMTransport implements Transport, Task {
042    
043        private static final Object DISCONNECT = new Object();
044        private static final AtomicLong NEXT_ID = new AtomicLong(0);
045    
046        // Transport Configuration
047        protected VMTransport peer;
048        protected TransportListener transportListener;
049        protected boolean marshal;
050        protected boolean network;
051        protected boolean async = true;
052        protected int asyncQueueDepth = 2000;
053        protected final URI location;
054        protected final long id;
055    
056        // Implementation
057        private LinkedBlockingQueue<Object> messageQueue;
058        private TaskRunnerFactory taskRunnerFactory;
059        private TaskRunner taskRunner;
060    
061        // Transport State
062        protected final AtomicBoolean started = new AtomicBoolean();
063        protected final AtomicBoolean disposed = new AtomicBoolean();
064    
065        private volatile int receiveCounter;
066    
067        public VMTransport(URI location) {
068            this.location = location;
069            this.id = NEXT_ID.getAndIncrement();
070        }
071    
072        public void setPeer(VMTransport peer) {
073            this.peer = peer;
074        }
075    
076        public void oneway(Object command) throws IOException {
077    
078            if (disposed.get()) {
079                throw new TransportDisposedIOException("Transport disposed.");
080            }
081    
082            if (peer == null) {
083                throw new IOException("Peer not connected.");
084            }
085    
086            try {
087    
088                if (peer.disposed.get()) {
089                    throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
090                }
091    
092                if (peer.async || !peer.started.get()) {
093                    peer.getMessageQueue().put(command);
094                    peer.wakeup();
095                    return;
096                }
097    
098            } catch (InterruptedException e) {
099                InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
100                iioe.initCause(e);
101                throw iioe;
102            }
103    
104            dispatch(peer, peer.messageQueue, command);
105        }
106    
107        public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
108            TransportListener transportListener = transport.getTransportListener();
109            if (transportListener != null) {
110                // Lock here on the target transport's started since we want to wait for its start()
111                // method to finish dispatching out of the queue before we do our own.
112                synchronized (transport.started) {
113    
114                    // Ensure that no additional commands entered the queue in the small time window
115                    // before the start method locks the dispatch lock and the oneway method was in
116                    // an put operation.
117                    while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
118                        doDispatch(transport, transportListener, pending.poll());
119                    }
120    
121                    // We are now in sync mode and won't enqueue any more commands to the target
122                    // transport so lets clean up its resources.
123                    transport.messageQueue = null;
124    
125                    // Don't dispatch if either end was disposed already.
126                    if (command != null && !this.disposed.get() && !transport.isDisposed()) {
127                        doDispatch(transport, transportListener, command);
128                    }
129                }
130            }
131        }
132    
133        public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
134            if (command == DISCONNECT) {
135                transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
136            } else {
137                transport.receiveCounter++;
138                transportListener.onCommand(command);
139            }
140        }
141    
142        public void start() throws Exception {
143    
144            if (transportListener == null) {
145                throw new IOException("TransportListener not set.");
146            }
147    
148            // If we are not in async mode we lock the dispatch lock here and then start to
149            // prevent any sync dispatches from occurring until we dispatch the pending messages
150            // to maintain delivery order.  When async this happens automatically so just set
151            // started and wakeup the task runner.
152            if (!async) {
153                synchronized (started) {
154                    if (started.compareAndSet(false, true)) {
155                        LinkedBlockingQueue<Object> mq = getMessageQueue();
156                        Object command;
157                        while ((command = mq.poll()) != null && !disposed.get() ) {
158                            receiveCounter++;
159                            doDispatch(this, transportListener, command);
160                        }
161                    }
162                }
163            } else {
164                if (started.compareAndSet(false, true)) {
165                    wakeup();
166                }
167            }
168        }
169    
170        public void stop() throws Exception {
171            // Only need to do this once, all future oneway calls will now
172            // fail as will any asnyc jobs in the task runner.
173            if (disposed.compareAndSet(false, true)) {
174    
175                TaskRunner tr = taskRunner;
176                LinkedBlockingQueue<Object> mq = this.messageQueue;
177    
178                taskRunner = null;
179                messageQueue = null;
180    
181                if (mq != null) {
182                    mq.clear();
183                }
184    
185                // Allow pending deliveries to finish up, but don't wait
186                // forever in case of an stalled onCommand.
187                if (tr != null) {
188                    try {
189                        tr.shutdown(TimeUnit.SECONDS.toMillis(1));
190                    } catch(Exception e) {
191                    }
192                    taskRunner = null;
193                }
194    
195                // let the peer know that we are disconnecting after attempting
196                // to cleanly shutdown the async tasks so that this is the last
197                // command it see's.
198                try {
199                    peer.transportListener.onCommand(new ShutdownInfo());
200                } catch (Exception ignore) {
201                }
202    
203                // let any requests pending a response see an exception
204                try {
205                    peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
206                } catch (Exception ignore) {
207                }
208    
209                // shutdown task runner factory
210                if (taskRunnerFactory != null) {
211                    taskRunnerFactory.shutdownNow();
212                    taskRunnerFactory = null;
213                }
214            }
215        }
216    
217        protected void wakeup() {
218            if (async && started.get()) {
219                try {
220                    getTaskRunner().wakeup();
221                } catch (InterruptedException e) {
222                    Thread.currentThread().interrupt();
223                } catch (TransportDisposedIOException e) {
224                }
225            }
226        }
227    
228        /**
229         * @see org.apache.activemq.thread.Task#iterate()
230         */
231        public boolean iterate() {
232    
233            final TransportListener tl = transportListener;
234    
235            LinkedBlockingQueue<Object> mq;
236            try {
237                mq = getMessageQueue();
238            } catch (TransportDisposedIOException e) {
239                return false;
240            }
241    
242            Object command = mq.poll();
243            if (command != null && !disposed.get()) {
244                if( command == DISCONNECT ) {
245                    tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
246                } else {
247                    tl.onCommand(command);
248                }
249                return !mq.isEmpty() && !disposed.get();
250            } else {
251                if(disposed.get()) {
252                    mq.clear();
253                }
254                return false;
255            }
256        }
257    
258        public void setTransportListener(TransportListener commandListener) {
259            this.transportListener = commandListener;
260        }
261    
262        public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
263            synchronized (this) {
264                if (messageQueue == null) {
265                    messageQueue = asyncQueue;
266                }
267            }
268        }
269    
270        public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
271            LinkedBlockingQueue<Object> result = messageQueue;
272            if (result == null) {
273                synchronized (this) {
274                    result = messageQueue;
275                    if (result == null) {
276                        if (disposed.get()) {
277                            throw new TransportDisposedIOException("The Transport has been disposed");
278                        }
279    
280                        messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
281                    }
282                }
283            }
284            return result;
285        }
286    
287        protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
288            TaskRunner result = taskRunner;
289            if (result == null) {
290                synchronized (this) {
291                    result = taskRunner;
292                    if (result == null) {
293                        if (disposed.get()) {
294                            throw new TransportDisposedIOException("The Transport has been disposed");
295                        }
296    
297                        String name = "ActiveMQ VMTransport: " + toString();
298                        if (taskRunnerFactory == null) {
299                            taskRunnerFactory = new TaskRunnerFactory(name);
300                            taskRunnerFactory.init();
301                        }
302                        taskRunner = result = taskRunnerFactory.createTaskRunner(this, name);
303                    }
304                }
305            }
306            return result;
307        }
308    
309        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
310            throw new AssertionError("Unsupported Method");
311        }
312    
313        public Object request(Object command) throws IOException {
314            throw new AssertionError("Unsupported Method");
315        }
316    
317        public Object request(Object command, int timeout) throws IOException {
318            throw new AssertionError("Unsupported Method");
319        }
320    
321        public TransportListener getTransportListener() {
322            return transportListener;
323        }
324    
325        public <T> T narrow(Class<T> target) {
326            if (target.isAssignableFrom(getClass())) {
327                return target.cast(this);
328            }
329            return null;
330        }
331    
332        public boolean isMarshal() {
333            return marshal;
334        }
335    
336        public void setMarshal(boolean marshal) {
337            this.marshal = marshal;
338        }
339    
340        public boolean isNetwork() {
341            return network;
342        }
343    
344        public void setNetwork(boolean network) {
345            this.network = network;
346        }
347    
348        @Override
349        public String toString() {
350            return location + "#" + id;
351        }
352    
353        public String getRemoteAddress() {
354            if (peer != null) {
355                return peer.toString();
356            }
357            return null;
358        }
359    
360        /**
361         * @return the async
362         */
363        public boolean isAsync() {
364            return async;
365        }
366    
367        /**
368         * @param async the async to set
369         */
370        public void setAsync(boolean async) {
371            this.async = async;
372        }
373    
374        /**
375         * @return the asyncQueueDepth
376         */
377        public int getAsyncQueueDepth() {
378            return asyncQueueDepth;
379        }
380    
381        /**
382         * @param asyncQueueDepth the asyncQueueDepth to set
383         */
384        public void setAsyncQueueDepth(int asyncQueueDepth) {
385            this.asyncQueueDepth = asyncQueueDepth;
386        }
387    
388        public boolean isFaultTolerant() {
389            return false;
390        }
391    
392        public boolean isDisposed() {
393            return disposed.get();
394        }
395    
396        public boolean isConnected() {
397            return !disposed.get();
398        }
399    
400        public void reconnect(URI uri) throws IOException {
401            throw new IOException("Transport reconnect is not supported");
402        }
403    
404        public boolean isReconnectSupported() {
405            return false;
406        }
407    
408        public boolean isUpdateURIsSupported() {
409            return false;
410        }
411    
412        public void updateURIs(boolean reblance,URI[] uris) throws IOException {
413            throw new IOException("URI update feature not supported");
414        }
415    
416        public int getReceiveCounter() {
417            return receiveCounter;
418        }
419    }