001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp; 018 019import java.io.IOException; 020import java.util.ArrayList; 021 022import org.apache.activemq.broker.BrokerService; 023import org.apache.activemq.command.Command; 024import org.apache.activemq.transport.amqp.protocol.AmqpConnection; 025 026/** 027 * Used to assign the best implementation of a AmqpProtocolConverter to the 028 * AmqpTransport based on the AmqpHeader that the client sends us. 029 */ 030public class AmqpProtocolDiscriminator implements AmqpProtocolConverter { 031 032 public static final int DEFAULT_PREFETCH = 1000; 033 034 private final AmqpTransport transport; 035 private final BrokerService brokerService; 036 037 interface Discriminator { 038 boolean matches(AmqpHeader header); 039 040 AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService); 041 } 042 043 static final private ArrayList<Discriminator> DISCRIMINATORS = new ArrayList<Discriminator>(); 044 static { 045 DISCRIMINATORS.add(new Discriminator() { 046 047 @Override 048 public AmqpProtocolConverter create(AmqpTransport transport, BrokerService brokerService) { 049 return new AmqpConnection(transport, brokerService); 050 } 051 052 @Override 053 public boolean matches(AmqpHeader header) { 054 switch (header.getProtocolId()) { 055 case 0: 056 case 3: 057 if (header.getMajor() == 1 && header.getMinor() == 0 && header.getRevision() == 0) { 058 return true; 059 } 060 } 061 return false; 062 } 063 }); 064 } 065 066 final private ArrayList<Command> pendingCommands = new ArrayList<Command>(); 067 068 public AmqpProtocolDiscriminator(AmqpTransport transport, BrokerService brokerService) { 069 this.transport = transport; 070 this.brokerService = brokerService; 071 } 072 073 @Override 074 public void onAMQPData(Object command) throws Exception { 075 if (command.getClass() == AmqpHeader.class) { 076 AmqpHeader header = (AmqpHeader) command; 077 078 Discriminator match = null; 079 for (Discriminator discriminator : DISCRIMINATORS) { 080 if (discriminator.matches(header)) { 081 match = discriminator; 082 } 083 } 084 085 // Lets use first in the list if none are a good match. 086 if (match == null) { 087 match = DISCRIMINATORS.get(0); 088 } 089 090 AmqpProtocolConverter next = match.create(transport, brokerService); 091 transport.setProtocolConverter(next); 092 for (Command send : pendingCommands) { 093 next.onActiveMQCommand(send); 094 } 095 pendingCommands.clear(); 096 next.onAMQPData(command); 097 } else { 098 throw new IllegalStateException(); 099 } 100 } 101 102 @Override 103 public void onAMQPException(IOException error) { 104 transport.sendToActiveMQ(error); 105 } 106 107 @Override 108 public void onActiveMQCommand(Command command) throws Exception { 109 pendingCommands.add(command); 110 } 111 112 @Override 113 public void updateTracer() { 114 } 115 116 @Override 117 public long keepAlive() { 118 return 0; 119 } 120}