001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.Socket;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.activemq.command.Command;
027import org.apache.activemq.command.WireFormatInfo;
028import org.apache.activemq.openwire.OpenWireFormat;
029import org.apache.activemq.util.IOExceptionSupport;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Negotiates the wire format with a new connection
035 */
036public class WireFormatNegotiator extends TransportFilter {
037
038    private static final Logger LOG = LoggerFactory.getLogger(WireFormatNegotiator.class);
039
040    private OpenWireFormat wireFormat;
041    private final int minimumVersion;
042    private long negotiateTimeout = 15000L;
043
044    private final AtomicBoolean firstStart = new AtomicBoolean(true);
045    private final CountDownLatch readyCountDownLatch = new CountDownLatch(1);
046    private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
047
048    /**
049     * Negotiator
050     * 
051     * @param next
052     */
053    public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) {
054        super(next);
055        this.wireFormat = wireFormat;
056        if (minimumVersion <= 0) {
057            minimumVersion = 1;
058        }
059        this.minimumVersion = minimumVersion;
060        
061        // Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat
062        // Does not make sense for us to take longer.
063        try {
064            if( wireFormat.getPreferedWireFormatInfo() !=null ) {
065                setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
066            }
067        } catch (IOException e) {
068        }
069    }
070
071    public void start() throws Exception {
072        super.start();
073        if (firstStart.compareAndSet(true, false)) {
074            sendWireFormat();
075        }
076    }
077
078    public void sendWireFormat() throws IOException {
079        try {
080            WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
081            if (LOG.isDebugEnabled()) {
082                LOG.debug("Sending: " + info);
083            }
084            sendWireFormat(info);
085        } finally {
086            wireInfoSentDownLatch.countDown();
087        }
088    }
089
090    public void stop() throws Exception {
091        super.stop();
092        readyCountDownLatch.countDown();
093    }
094
095    public void oneway(Object command) throws IOException {
096        boolean wasInterrupted = Thread.interrupted();
097        try {
098            if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
099                throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
100            }
101        } catch (InterruptedException e) {
102            InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation");
103            interruptedIOException.initCause(e);
104            try {
105                onException(interruptedIOException);
106            } finally {
107                Thread.currentThread().interrupt();
108                wasInterrupted = false;
109            }
110            throw interruptedIOException;
111        }  finally {
112            if (wasInterrupted) {
113                Thread.currentThread().interrupt();
114            }
115        }
116        super.oneway(command);
117    }
118
119    public void onCommand(Object o) {
120        Command command = (Command)o;
121        if (command.isWireFormatInfo()) {
122            WireFormatInfo info = (WireFormatInfo)command;
123            negociate(info);
124        }
125        getTransportListener().onCommand(command);
126    }
127
128    public void negociate(WireFormatInfo info) {
129        if (LOG.isDebugEnabled()) {
130            LOG.debug("Received WireFormat: " + info);
131        }
132
133        try {
134            wireInfoSentDownLatch.await();
135
136            if (LOG.isDebugEnabled()) {
137                LOG.debug(this + " before negotiation: " + wireFormat);
138            }
139            if (!info.isValid()) {
140                onException(new IOException("Remote wire format magic is invalid"));
141            } else if (info.getVersion() < minimumVersion) {
142                onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
143            }
144
145            wireFormat.renegotiateWireFormat(info);
146            Socket socket = next.narrow(Socket.class);
147            if (socket != null) {
148                socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
149            }
150
151            if (LOG.isDebugEnabled()) {
152                LOG.debug(this + " after negotiation: " + wireFormat);
153            }
154
155        } catch (IOException e) {
156            onException(e);
157        } catch (InterruptedException e) {
158            Thread.currentThread().interrupt();
159            onException((IOException)new InterruptedIOException().initCause(e));
160        } catch (Exception e) {
161            onException(IOExceptionSupport.create(e));
162        }
163        readyCountDownLatch.countDown();
164        onWireFormatNegotiated(info);
165    }
166
167    public void onException(IOException error) {
168        readyCountDownLatch.countDown();
169        /*
170         * try { super.oneway(new ExceptionResponse(error)); } catch
171         * (IOException e) { // ignore as we are already throwing an exception }
172         */
173        super.onException(error);
174    }
175
176    public String toString() {
177        return next.toString();
178    }
179
180    protected void sendWireFormat(WireFormatInfo info) throws IOException {
181        next.oneway(info);
182    }
183
184    protected void onWireFormatNegotiated(WireFormatInfo info) {
185    }
186
187    public long getNegotiateTimeout() {
188        return negotiateTimeout;
189    }
190
191    public void setNegotiateTimeout(long negotiateTimeout) {
192        this.negotiateTimeout = negotiateTimeout;
193    }
194}