001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.vm;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.net.URI;
022 import java.util.concurrent.BlockingQueue;
023 import java.util.concurrent.LinkedBlockingQueue;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicBoolean;
026 import java.util.concurrent.atomic.AtomicLong;
027
028 import org.apache.activemq.command.ShutdownInfo;
029 import org.apache.activemq.thread.Task;
030 import org.apache.activemq.thread.TaskRunner;
031 import org.apache.activemq.thread.TaskRunnerFactory;
032 import org.apache.activemq.transport.FutureResponse;
033 import org.apache.activemq.transport.ResponseCallback;
034 import org.apache.activemq.transport.Transport;
035 import org.apache.activemq.transport.TransportDisposedIOException;
036 import org.apache.activemq.transport.TransportListener;
037
038 /**
039 * A Transport implementation that uses direct method invocations.
040 */
041 public class VMTransport implements Transport, Task {
042
043 private static final Object DISCONNECT = new Object();
044 private static final AtomicLong NEXT_ID = new AtomicLong(0);
045
046 // Transport Configuration
047 protected VMTransport peer;
048 protected TransportListener transportListener;
049 protected boolean marshal;
050 protected boolean network;
051 protected boolean async = true;
052 protected int asyncQueueDepth = 2000;
053 protected final URI location;
054 protected final long id;
055
056 // Implementation
057 private LinkedBlockingQueue<Object> messageQueue;
058 private TaskRunnerFactory taskRunnerFactory;
059 private TaskRunner taskRunner;
060
061 // Transport State
062 protected final AtomicBoolean started = new AtomicBoolean();
063 protected final AtomicBoolean disposed = new AtomicBoolean();
064
065 private volatile int receiveCounter;
066
067 public VMTransport(URI location) {
068 this.location = location;
069 this.id = NEXT_ID.getAndIncrement();
070 }
071
072 public void setPeer(VMTransport peer) {
073 this.peer = peer;
074 }
075
076 public void oneway(Object command) throws IOException {
077
078 if (disposed.get()) {
079 throw new TransportDisposedIOException("Transport disposed.");
080 }
081
082 if (peer == null) {
083 throw new IOException("Peer not connected.");
084 }
085
086 try {
087
088 if (peer.disposed.get()) {
089 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
090 }
091
092 if (peer.async || !peer.started.get()) {
093 peer.getMessageQueue().put(command);
094 peer.wakeup();
095 return;
096 }
097
098 } catch (InterruptedException e) {
099 InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
100 iioe.initCause(e);
101 throw iioe;
102 }
103
104 dispatch(peer, peer.messageQueue, command);
105 }
106
107 public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
108 TransportListener transportListener = transport.getTransportListener();
109 if (transportListener != null) {
110 // Lock here on the target transport's started since we want to wait for its start()
111 // method to finish dispatching out of the queue before we do our own.
112 synchronized (transport.started) {
113
114 // Ensure that no additional commands entered the queue in the small time window
115 // before the start method locks the dispatch lock and the oneway method was in
116 // an put operation.
117 while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
118 doDispatch(transport, transportListener, pending.poll());
119 }
120
121 // We are now in sync mode and won't enqueue any more commands to the target
122 // transport so lets clean up its resources.
123 transport.messageQueue = null;
124
125 // Don't dispatch if either end was disposed already.
126 if (command != null && !this.disposed.get() && !transport.isDisposed()) {
127 doDispatch(transport, transportListener, command);
128 }
129 }
130 }
131 }
132
133 public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
134 if (command == DISCONNECT) {
135 transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
136 } else {
137 transport.receiveCounter++;
138 transportListener.onCommand(command);
139 }
140 }
141
142 public void start() throws Exception {
143
144 if (transportListener == null) {
145 throw new IOException("TransportListener not set.");
146 }
147
148 // If we are not in async mode we lock the dispatch lock here and then start to
149 // prevent any sync dispatches from occurring until we dispatch the pending messages
150 // to maintain delivery order. When async this happens automatically so just set
151 // started and wakeup the task runner.
152 if (!async) {
153 synchronized (started) {
154 if (started.compareAndSet(false, true)) {
155 LinkedBlockingQueue<Object> mq = getMessageQueue();
156 Object command;
157 while ((command = mq.poll()) != null && !disposed.get() ) {
158 receiveCounter++;
159 doDispatch(this, transportListener, command);
160 }
161 }
162 }
163 } else {
164 if (started.compareAndSet(false, true)) {
165 wakeup();
166 }
167 }
168 }
169
170 public void stop() throws Exception {
171 // Only need to do this once, all future oneway calls will now
172 // fail as will any asnyc jobs in the task runner.
173 if (disposed.compareAndSet(false, true)) {
174
175 TaskRunner tr = taskRunner;
176 LinkedBlockingQueue<Object> mq = this.messageQueue;
177
178 taskRunner = null;
179 messageQueue = null;
180
181 if (mq != null) {
182 mq.clear();
183 }
184
185 // Allow pending deliveries to finish up, but don't wait
186 // forever in case of an stalled onCommand.
187 if (tr != null) {
188 try {
189 tr.shutdown(TimeUnit.SECONDS.toMillis(1));
190 } catch(Exception e) {
191 }
192 taskRunner = null;
193 }
194
195 // let the peer know that we are disconnecting after attempting
196 // to cleanly shutdown the async tasks so that this is the last
197 // command it see's.
198 try {
199 peer.transportListener.onCommand(new ShutdownInfo());
200 } catch (Exception ignore) {
201 }
202
203 // shutdown task runner factory
204 if (taskRunnerFactory != null) {
205 taskRunnerFactory.shutdownNow();
206 taskRunnerFactory = null;
207 }
208 }
209 }
210
211 protected void wakeup() {
212 if (async && started.get()) {
213 try {
214 getTaskRunner().wakeup();
215 } catch (InterruptedException e) {
216 Thread.currentThread().interrupt();
217 } catch (TransportDisposedIOException e) {
218 }
219 }
220 }
221
222 /**
223 * @see org.apache.activemq.thread.Task#iterate()
224 */
225 public boolean iterate() {
226
227 final TransportListener tl = transportListener;
228
229 LinkedBlockingQueue<Object> mq;
230 try {
231 mq = getMessageQueue();
232 } catch (TransportDisposedIOException e) {
233 return false;
234 }
235
236 Object command = mq.poll();
237 if (command != null && !disposed.get()) {
238 if( command == DISCONNECT ) {
239 tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
240 } else {
241 tl.onCommand(command);
242 }
243 return !mq.isEmpty() && !disposed.get();
244 } else {
245 if(disposed.get()) {
246 mq.clear();
247 }
248 return false;
249 }
250 }
251
252 public void setTransportListener(TransportListener commandListener) {
253 this.transportListener = commandListener;
254 }
255
256 public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
257 synchronized (this) {
258 if (messageQueue == null) {
259 messageQueue = asyncQueue;
260 }
261 }
262 }
263
264 public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
265 LinkedBlockingQueue<Object> result = messageQueue;
266 if (result == null) {
267 synchronized (this) {
268 result = messageQueue;
269 if (result == null) {
270 if (disposed.get()) {
271 throw new TransportDisposedIOException("The Transport has been disposed");
272 }
273
274 messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
275 }
276 }
277 }
278 return result;
279 }
280
281 protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
282 TaskRunner result = taskRunner;
283 if (result == null) {
284 synchronized (this) {
285 result = taskRunner;
286 if (result == null) {
287 if (disposed.get()) {
288 throw new TransportDisposedIOException("The Transport has been disposed");
289 }
290
291 String name = "ActiveMQ VMTransport: " + toString();
292 if (taskRunnerFactory == null) {
293 taskRunnerFactory = new TaskRunnerFactory(name);
294 taskRunnerFactory.init();
295 }
296 taskRunner = result = taskRunnerFactory.createTaskRunner(this, name);
297 }
298 }
299 }
300 return result;
301 }
302
303 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
304 throw new AssertionError("Unsupported Method");
305 }
306
307 public Object request(Object command) throws IOException {
308 throw new AssertionError("Unsupported Method");
309 }
310
311 public Object request(Object command, int timeout) throws IOException {
312 throw new AssertionError("Unsupported Method");
313 }
314
315 public TransportListener getTransportListener() {
316 return transportListener;
317 }
318
319 public <T> T narrow(Class<T> target) {
320 if (target.isAssignableFrom(getClass())) {
321 return target.cast(this);
322 }
323 return null;
324 }
325
326 public boolean isMarshal() {
327 return marshal;
328 }
329
330 public void setMarshal(boolean marshal) {
331 this.marshal = marshal;
332 }
333
334 public boolean isNetwork() {
335 return network;
336 }
337
338 public void setNetwork(boolean network) {
339 this.network = network;
340 }
341
342 @Override
343 public String toString() {
344 return location + "#" + id;
345 }
346
347 public String getRemoteAddress() {
348 if (peer != null) {
349 return peer.toString();
350 }
351 return null;
352 }
353
354 /**
355 * @return the async
356 */
357 public boolean isAsync() {
358 return async;
359 }
360
361 /**
362 * @param async the async to set
363 */
364 public void setAsync(boolean async) {
365 this.async = async;
366 }
367
368 /**
369 * @return the asyncQueueDepth
370 */
371 public int getAsyncQueueDepth() {
372 return asyncQueueDepth;
373 }
374
375 /**
376 * @param asyncQueueDepth the asyncQueueDepth to set
377 */
378 public void setAsyncQueueDepth(int asyncQueueDepth) {
379 this.asyncQueueDepth = asyncQueueDepth;
380 }
381
382 public boolean isFaultTolerant() {
383 return false;
384 }
385
386 public boolean isDisposed() {
387 return disposed.get();
388 }
389
390 public boolean isConnected() {
391 return !disposed.get();
392 }
393
394 public void reconnect(URI uri) throws IOException {
395 throw new IOException("Transport reconnect is not supported");
396 }
397
398 public boolean isReconnectSupported() {
399 return false;
400 }
401
402 public boolean isUpdateURIsSupported() {
403 return false;
404 }
405
406 public void updateURIs(boolean reblance,URI[] uris) throws IOException {
407 throw new IOException("URI update feature not supported");
408 }
409
410 public int getReceiveCounter() {
411 return receiveCounter;
412 }
413 }