001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.http;
018
019import java.io.IOException;
020import java.security.cert.X509Certificate;
021import java.util.Queue;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.activemq.transport.TransportSupport;
026import org.apache.activemq.util.ServiceStopper;
027import org.apache.activemq.wireformat.WireFormat;
028
029/**
030 * A server side HTTP based TransportChannel which processes incoming packets
031 * and adds outgoing packets onto a {@link Queue} so that they can be dispatched
032 * by the HTTP GET requests from the client.
033 */
034public class BlockingQueueTransport extends TransportSupport {
035
036    public static final long MAX_TIMEOUT = 30000L;
037
038    private BlockingQueue<Object> queue;
039
040    public BlockingQueueTransport(BlockingQueue<Object> channel) {
041        this.queue = channel;
042    }
043
044    public BlockingQueue<Object> getQueue() {
045        return queue;
046    }
047
048    @Override
049    public void oneway(Object command) throws IOException {
050        try {
051            boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS);
052            if (!success) {
053                throw new IOException("Fail to add to BlockingQueue. Add timed out after " + MAX_TIMEOUT + "ms: size=" + queue.size());
054            }
055        } catch (InterruptedException e) {
056            throw new IOException("Fail to add to BlockingQueue. Interrupted while waiting for space: size=" + queue.size());
057        }
058    }
059
060    @Override
061    public String getRemoteAddress() {
062        return "blockingQueue_" + queue.hashCode();
063    }
064
065    @Override
066    protected void doStart() throws Exception {
067    }
068
069    @Override
070    protected void doStop(ServiceStopper stopper) throws Exception {
071    }
072
073    @Override
074    public int getReceiveCounter() {
075        return 0;
076    }
077
078    @Override
079    public X509Certificate[] getPeerCertificates() {
080        return null;
081    }
082
083    @Override
084    public void setPeerCertificates(X509Certificate[] certificates) {
085    }
086
087    @Override
088    public WireFormat getWireFormat() {
089        return null;
090    }
091}