001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.http;
018
019import java.io.IOException;
020import java.util.Queue;
021import java.util.concurrent.BlockingQueue;
022import java.util.concurrent.TimeUnit;
023
024import org.apache.activemq.transport.TransportSupport;
025import org.apache.activemq.util.ServiceStopper;
026
027/**
028 * A server side HTTP based TransportChannel which processes incoming packets
029 * and adds outgoing packets onto a {@link Queue} so that they can be dispatched
030 * by the HTTP GET requests from the client.
031 *
032 * 
033 */
034public class  BlockingQueueTransport extends TransportSupport {
035    public static final long MAX_TIMEOUT = 30000L;
036
037    private BlockingQueue<Object> queue;
038
039    public BlockingQueueTransport(BlockingQueue<Object> channel) {
040        this.queue = channel;
041    }
042
043    public BlockingQueue<Object> getQueue() {
044        return queue;
045    }
046
047    public void oneway(Object command) throws IOException {
048        try {
049            boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS);
050            if (!success) {
051                throw new IOException("Fail to add to BlockingQueue. Add timed out after " + MAX_TIMEOUT + "ms: size=" + queue.size());
052            }
053        } catch (InterruptedException e) {
054            throw new IOException("Fail to add to BlockingQueue. Interrupted while waiting for space: size=" + queue.size());
055        }
056    }
057
058
059    public String getRemoteAddress() {
060        return "blockingQueue_" + queue.hashCode();
061    }
062
063    protected void doStart() throws Exception {
064    }
065
066    protected void doStop(ServiceStopper stopper) throws Exception {
067    }
068
069    public int getReceiveCounter() {
070        return 0;
071    }
072}