001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.security.cert.X509Certificate;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.activemq.command.ShutdownInfo;
030import org.apache.activemq.thread.Task;
031import org.apache.activemq.thread.TaskRunner;
032import org.apache.activemq.thread.TaskRunnerFactory;
033import org.apache.activemq.transport.FutureResponse;
034import org.apache.activemq.transport.ResponseCallback;
035import org.apache.activemq.transport.Transport;
036import org.apache.activemq.transport.TransportDisposedIOException;
037import org.apache.activemq.transport.TransportListener;
038import org.apache.activemq.wireformat.WireFormat;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A Transport implementation that uses direct method invocations.
044 */
045public class VMTransport implements Transport, Task {
046    protected static final Logger LOG = LoggerFactory.getLogger(VMTransport.class);
047
048    private static final AtomicLong NEXT_ID = new AtomicLong(0);
049
050    // Transport Configuration
051    protected VMTransport peer;
052    protected TransportListener transportListener;
053    protected boolean marshal;
054    protected boolean async = true;
055    protected int asyncQueueDepth = 2000;
056    protected final URI location;
057    protected final long id;
058
059    // Implementation
060    private volatile LinkedBlockingQueue<Object> messageQueue;
061    private volatile TaskRunnerFactory taskRunnerFactory;
062    private volatile TaskRunner taskRunner;
063
064    // Transport State
065    protected final AtomicBoolean started = new AtomicBoolean();
066    protected final AtomicBoolean disposed = new AtomicBoolean();
067
068    private volatile int receiveCounter;
069
070    public VMTransport(URI location) {
071        this.location = location;
072        this.id = NEXT_ID.getAndIncrement();
073    }
074
075    public void setPeer(VMTransport peer) {
076        this.peer = peer;
077    }
078
079    @Override
080    public void oneway(Object command) throws IOException {
081
082        if (disposed.get()) {
083            throw new TransportDisposedIOException("Transport disposed.");
084        }
085
086        if (peer == null) {
087            throw new IOException("Peer not connected.");
088        }
089
090        try {
091
092            if (peer.disposed.get()) {
093                throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
094            }
095
096            if (peer.async) {
097                peer.getMessageQueue().put(command);
098                peer.wakeup();
099                return;
100            }
101
102            if (!peer.started.get()) {
103                LinkedBlockingQueue<Object> pending = peer.getMessageQueue();
104                int sleepTimeMillis;
105                boolean accepted = false;
106                do {
107                    sleepTimeMillis = 0;
108                    // the pending queue is drained on start so we need to ensure we add before
109                    // the drain commences, otherwise we never get the command dispatched!
110                    synchronized (peer.started) {
111                        if (!peer.started.get()) {
112                            accepted = pending.offer(command);
113                            if (!accepted) {
114                                sleepTimeMillis = 500;
115                            }
116                        }
117                    }
118                    // give start thread a chance if we will loop
119                    TimeUnit.MILLISECONDS.sleep(sleepTimeMillis);
120
121                } while (!accepted && !peer.started.get());
122                if (accepted) {
123                    return;
124                }
125            }
126        } catch (InterruptedException e) {
127            Thread.currentThread().interrupt();
128            InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
129            iioe.initCause(e);
130            throw iioe;
131        }
132
133        dispatch(peer, peer.messageQueue, command);
134    }
135
136    public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
137        TransportListener transportListener = transport.getTransportListener();
138        if (transportListener != null) {
139            // Lock here on the target transport's started since we want to wait for its start()
140            // method to finish dispatching out of the queue before we do our own.
141            synchronized (transport.started) {
142
143                // Ensure that no additional commands entered the queue in the small time window
144                // before the start method locks the dispatch lock and the oneway method was in
145                // an put operation.
146                while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
147                    doDispatch(transport, transportListener, pending.poll());
148                }
149
150                // We are now in sync mode and won't enqueue any more commands to the target
151                // transport so lets clean up its resources.
152                transport.messageQueue = null;
153
154                // Don't dispatch if either end was disposed already.
155                if (command != null && !this.disposed.get() && !transport.isDisposed()) {
156                    doDispatch(transport, transportListener, command);
157                }
158            }
159        }
160    }
161
162    public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
163        transport.receiveCounter++;
164        transportListener.onCommand(command);
165    }
166
167    @Override
168    public void start() throws Exception {
169
170        if (transportListener == null) {
171            throw new IOException("TransportListener not set.");
172        }
173
174        // If we are not in async mode we lock the dispatch lock here and then start to
175        // prevent any sync dispatches from occurring until we dispatch the pending messages
176        // to maintain delivery order.  When async this happens automatically so just set
177        // started and wakeup the task runner.
178        if (!async) {
179            synchronized (started) {
180                if (started.compareAndSet(false, true)) {
181                    LinkedBlockingQueue<Object> mq = getMessageQueue();
182                    Object command;
183                    while ((command = mq.poll()) != null && !disposed.get() ) {
184                        receiveCounter++;
185                        doDispatch(this, transportListener, command);
186                    }
187                }
188            }
189        } else {
190            if (started.compareAndSet(false, true)) {
191                wakeup();
192            }
193        }
194    }
195
196    @Override
197    public void stop() throws Exception {
198        // Only need to do this once, all future oneway calls will now
199        // fail as will any asnyc jobs in the task runner.
200        if (disposed.compareAndSet(false, true)) {
201
202            TaskRunner tr = taskRunner;
203            LinkedBlockingQueue<Object> mq = this.messageQueue;
204
205            taskRunner = null;
206            messageQueue = null;
207
208            if (mq != null) {
209                mq.clear();
210            }
211
212            // don't wait for completion
213            if (tr != null) {
214                try {
215                    tr.shutdown(1);
216                } catch(Exception e) {
217                }
218                tr = null;
219            }
220
221            if (peer.transportListener != null) {
222                // let the peer know that we are disconnecting after attempting
223                // to cleanly shutdown the async tasks so that this is the last
224                // command it see's.
225                try {
226                    peer.transportListener.onCommand(new ShutdownInfo());
227                } catch (Exception ignore) {
228                }
229
230                // let any requests pending a response see an exception
231                try {
232                    peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
233                } catch (Exception ignore) {
234                }
235            }
236
237            // shutdown task runner factory
238            if (taskRunnerFactory != null) {
239                taskRunnerFactory.shutdownNow();
240                taskRunnerFactory = null;
241            }
242        }
243    }
244
245    protected void wakeup() {
246        if (async && started.get()) {
247            try {
248                getTaskRunner().wakeup();
249            } catch (InterruptedException e) {
250                Thread.currentThread().interrupt();
251            } catch (TransportDisposedIOException e) {
252            }
253        }
254    }
255
256    /**
257     * @see org.apache.activemq.thread.Task#iterate()
258     */
259    @Override
260    public boolean iterate() {
261
262        final TransportListener tl = transportListener;
263
264        LinkedBlockingQueue<Object> mq;
265        try {
266            mq = getMessageQueue();
267        } catch (TransportDisposedIOException e) {
268            return false;
269        }
270
271        Object command = mq.poll();
272        if (command != null && !disposed.get()) {
273            tl.onCommand(command);
274            return !mq.isEmpty() && !disposed.get();
275        } else {
276            if(disposed.get()) {
277                mq.clear();
278            }
279            return false;
280        }
281    }
282
283    @Override
284    public void setTransportListener(TransportListener commandListener) {
285        this.transportListener = commandListener;
286    }
287
288    public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
289        LinkedBlockingQueue<Object> result = messageQueue;
290        if (result == null) {
291            synchronized (this) {
292                result = messageQueue;
293                if (result == null) {
294                    if (disposed.get()) {
295                        throw new TransportDisposedIOException("The Transport has been disposed");
296                    }
297
298                    messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
299                }
300            }
301        }
302        return result;
303    }
304
305    protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
306        TaskRunner result = taskRunner;
307        if (result == null) {
308            synchronized (this) {
309                result = taskRunner;
310                if (result == null) {
311                    if (disposed.get()) {
312                        throw new TransportDisposedIOException("The Transport has been disposed");
313                    }
314
315                    String name = "ActiveMQ VMTransport: " + toString();
316                    if (taskRunnerFactory == null) {
317                        taskRunnerFactory = new TaskRunnerFactory(name);
318                        taskRunnerFactory.init();
319                    }
320                    taskRunner = result = taskRunnerFactory.createTaskRunner(this, name);
321                }
322            }
323        }
324        return result;
325    }
326
327    @Override
328    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
329        throw new AssertionError("Unsupported Method");
330    }
331
332    @Override
333    public Object request(Object command) throws IOException {
334        throw new AssertionError("Unsupported Method");
335    }
336
337    @Override
338    public Object request(Object command, int timeout) throws IOException {
339        throw new AssertionError("Unsupported Method");
340    }
341
342    @Override
343    public TransportListener getTransportListener() {
344        return transportListener;
345    }
346
347    @Override
348    public <T> T narrow(Class<T> target) {
349        if (target.isAssignableFrom(getClass())) {
350            return target.cast(this);
351        }
352        return null;
353    }
354
355    public boolean isMarshal() {
356        return marshal;
357    }
358
359    public void setMarshal(boolean marshal) {
360        this.marshal = marshal;
361    }
362
363    @Override
364    public String toString() {
365        return location + "#" + id;
366    }
367
368    @Override
369    public String getRemoteAddress() {
370        if (peer != null) {
371            return peer.toString();
372        }
373        return null;
374    }
375
376    /**
377     * @return the async
378     */
379    public boolean isAsync() {
380        return async;
381    }
382
383    /**
384     * @param async the async to set
385     */
386    public void setAsync(boolean async) {
387        this.async = async;
388    }
389
390    /**
391     * @return the asyncQueueDepth
392     */
393    public int getAsyncQueueDepth() {
394        return asyncQueueDepth;
395    }
396
397    /**
398     * @param asyncQueueDepth the asyncQueueDepth to set
399     */
400    public void setAsyncQueueDepth(int asyncQueueDepth) {
401        this.asyncQueueDepth = asyncQueueDepth;
402    }
403
404    @Override
405    public boolean isFaultTolerant() {
406        return false;
407    }
408
409    @Override
410    public boolean isDisposed() {
411        return disposed.get();
412    }
413
414    @Override
415    public boolean isConnected() {
416        return !disposed.get();
417    }
418
419    @Override
420    public void reconnect(URI uri) throws IOException {
421        throw new IOException("Transport reconnect is not supported");
422    }
423
424    @Override
425    public boolean isReconnectSupported() {
426        return false;
427    }
428
429    @Override
430    public boolean isUpdateURIsSupported() {
431        return false;
432    }
433
434    @Override
435    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
436        throw new IOException("URI update feature not supported");
437    }
438
439    @Override
440    public int getReceiveCounter() {
441        return receiveCounter;
442    }
443
444    @Override
445    public X509Certificate[] getPeerCertificates() {
446        return null;
447    }
448
449    @Override
450    public void setPeerCertificates(X509Certificate[] certificates) {
451
452    }
453
454    @Override
455    public WireFormat getWireFormat() {
456        return null;
457    }
458}