001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.IOException;
020import java.util.ArrayList;
021
022import org.apache.activemq.broker.BrokerService;
023import org.apache.activemq.command.Command;
024import org.apache.activemq.transport.amqp.protocol.AmqpConnection;
025
026/**
027 * Used to assign the best implementation of a AmqpProtocolConverter to the
028 * AmqpTransport based on the AmqpHeader that the client sends us.
029 */
030public class AmqpProtocolDiscriminator implements AmqpProtocolConverter {
031
032    public static final int DEFAULT_PREFETCH = 1000;
033
034    private final AmqpTransport transport;
035    private final BrokerService brokerService;
036
037    interface Discriminator {
038        boolean matches(AmqpHeader header);
039
040        AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService);
041    }
042
043    static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
044    static {
045        DISCRIMINATORS.add(new Discriminator() {
046
047            @Override
048            public AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) {
049                return new AmqpConnection(transport, brokerService);
050            }
051
052            @Override
053            public boolean matches(AmqpHeader header) {
054                switch (header.getProtocolId()) {
055                    case 0:
056                    case 3:
057                        if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) {
058                            return true;
059                        }
060                }
061                return false;
062            }
063        });
064    }
065
066    final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
067
068    public AmqpProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) {
069        this.transport = transport;
070        this.brokerService = brokerService;
071    }
072
073    @Override
074    public void onAMQPData(Object command) throws Exception {
075        if (command.getClass() == AmqpHeader.class) {
076            AmqpHeader header = (AmqpHeader) command;
077
078            Discriminator match = null;
079            for (Discriminator discriminator : DISCRIMINATORS) {
080                if (discriminator.matches(header)) {
081                    match = discriminator;
082                }
083            }
084
085            // Lets use first in the list if none are a good match.
086            if (match == null) {
087                match = DISCRIMINATORS.get(0);
088            }
089
090            AmqpProtocolConverter next = match.create(transport, brokerService);
091            transport.setProtocolConverter(next);
092            for (Command send : pendingCommands) {
093                next.onActiveMQCommand(send);
094            }
095            pendingCommands.clear();
096            next.onAMQPData(command);
097        } else {
098            throw new IllegalStateException();
099        }
100    }
101
102    @Override
103    public void onAMQPException(IOException error) {
104        transport.sendToActiveMQ(error);
105    }
106
107    @Override
108    public void onActiveMQCommand(Command command) throws Exception {
109        pendingCommands.add(command);
110    }
111
112    @Override
113    public void updateTracer() {
114    }
115
116    @Override
117    public long keepAlive() {
118        return 0;
119    }
120}