001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.amqp;
018    
019    import org.apache.activemq.command.Command;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    
024    /**
025     * Used to assign the best implementation of a AmqpProtocolConverter to the
026     * AmqpTransport based on the AmqpHeader that the client sends us.
027     */
028    public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
029    
030        final private AmqpTransport transport;
031    
032        interface Discriminator {
033            boolean matches(AmqpHeader header);
034            IAmqpProtocolConverter create(AmqpTransport transport);
035        }
036    
037        static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>();
038        static {
039            DISCRIMINATORS.add(new Discriminator(){
040    
041                @Override
042                public IAmqpProtocolConverter create(AmqpTransport transport) {
043                    return new AmqpProtocolConverter(transport);
044                }
045    
046                @Override
047                public boolean matches(AmqpHeader header) {
048                    switch( header.getProtocolId() ) {
049                        case 0:
050                        case 3:
051                            if( header.getMajor() == 1 && header.getMinor()==0 && header.getRevision()==0 )
052                                return true;
053                    }
054                    return false;
055                }
056            });
057    
058        }
059    
060        static final private ArrayList<Command> pendingCommands = new ArrayList<Command>();
061    
062        public AMQPProtocolDiscriminator(AmqpTransport transport) {
063            this.transport = transport;
064        }
065    
066        @Override
067        public void onAMQPData(Object command) throws Exception {
068            if (command.getClass() == AmqpHeader.class) {
069                AmqpHeader header = (AmqpHeader) command;
070    
071                Discriminator match = null;
072                for (Discriminator discriminator : DISCRIMINATORS) {
073                    if( discriminator.matches(header) ) {
074                        match = discriminator;
075                    }
076                }
077                // Lets use first in the list if none are a good match.
078                if( match == null ) {
079                    match = DISCRIMINATORS.get(0);
080                }
081                IAmqpProtocolConverter next = match.create(transport);
082                transport.setProtocolConverter(next);
083                for (Command send : pendingCommands) {
084                    next.onActiveMQCommand(send);
085                }
086                pendingCommands.clear();
087                next.onAMQPData(command);
088            } else {
089                throw new IllegalStateException();
090            }
091        }
092    
093        @Override
094        public void onAMQPException(IOException error) {
095        }
096    
097        @Override
098        public void onActiveMQCommand(Command command) throws Exception {
099            pendingCommands.add(command);
100        }
101    
102        @Override
103        public void updateTracer() {
104        }
105    }