001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.Socket;
022    import java.util.concurrent.CountDownLatch;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    
026    import org.apache.activemq.command.Command;
027    import org.apache.activemq.command.WireFormatInfo;
028    import org.apache.activemq.openwire.OpenWireFormat;
029    import org.apache.activemq.util.IOExceptionSupport;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * Negotiates the wire format with a new connection
035     */
036    public class WireFormatNegotiator extends TransportFilter {
037    
038        private static final Logger LOG = LoggerFactory.getLogger(WireFormatNegotiator.class);
039    
040        private OpenWireFormat wireFormat;
041        private final int minimumVersion;
042        private long negotiateTimeout = 15000L;
043    
044        private final AtomicBoolean firstStart = new AtomicBoolean(true);
045        private final CountDownLatch readyCountDownLatch = new CountDownLatch(1);
046        private final CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
047    
048        /**
049         * Negotiator
050         * 
051         * @param next
052         */
053        public WireFormatNegotiator(Transport next, OpenWireFormat wireFormat, int minimumVersion) {
054            super(next);
055            this.wireFormat = wireFormat;
056            if (minimumVersion <= 0) {
057                minimumVersion = 1;
058            }
059            this.minimumVersion = minimumVersion;
060            
061            // Setup the initial negociation timeout to be the same as the inital max inactivity delay specified on the wireformat
062            // Does not make sense for us to take longer.
063            try {
064                if( wireFormat.getPreferedWireFormatInfo() !=null ) {
065                    setNegotiateTimeout(wireFormat.getPreferedWireFormatInfo().getMaxInactivityDurationInitalDelay());
066                }
067            } catch (IOException e) {
068            }
069        }
070    
071        public void start() throws Exception {
072            super.start();
073            if (firstStart.compareAndSet(true, false)) {
074                sendWireFormat();
075            }
076        }
077    
078        public void sendWireFormat() throws IOException {
079            try {
080                WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
081                if (LOG.isDebugEnabled()) {
082                    LOG.debug("Sending: " + info);
083                }
084                sendWireFormat(info);
085            } finally {
086                wireInfoSentDownLatch.countDown();
087            }
088        }
089    
090        public void stop() throws Exception {
091            super.stop();
092            readyCountDownLatch.countDown();
093        }
094    
095        public void oneway(Object command) throws IOException {
096            try {
097                if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
098                    throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
099                }
100            } catch (InterruptedException e) {
101                Thread.currentThread().interrupt();
102                throw new InterruptedIOException();
103            }
104            super.oneway(command);
105        }
106    
107        public void onCommand(Object o) {
108            Command command = (Command)o;
109            if (command.isWireFormatInfo()) {
110                WireFormatInfo info = (WireFormatInfo)command;
111                negociate(info);
112            }
113            getTransportListener().onCommand(command);
114        }
115    
116        public void negociate(WireFormatInfo info) {
117            if (LOG.isDebugEnabled()) {
118                LOG.debug("Received WireFormat: " + info);
119            }
120    
121            try {
122                wireInfoSentDownLatch.await();
123    
124                if (LOG.isDebugEnabled()) {
125                    LOG.debug(this + " before negotiation: " + wireFormat);
126                }
127                if (!info.isValid()) {
128                    onException(new IOException("Remote wire format magic is invalid"));
129                } else if (info.getVersion() < minimumVersion) {
130                    onException(new IOException("Remote wire format (" + info.getVersion() + ") is lower the minimum version required (" + minimumVersion + ")"));
131                }
132    
133                wireFormat.renegotiateWireFormat(info);
134                Socket socket = next.narrow(Socket.class);
135                if (socket != null) {
136                    socket.setTcpNoDelay(wireFormat.isTcpNoDelayEnabled());
137                }
138    
139                if (LOG.isDebugEnabled()) {
140                    LOG.debug(this + " after negotiation: " + wireFormat);
141                }
142    
143            } catch (IOException e) {
144                onException(e);
145            } catch (InterruptedException e) {
146                onException((IOException)new InterruptedIOException().initCause(e));
147            } catch (Exception e) {
148                onException(IOExceptionSupport.create(e));
149            }
150            readyCountDownLatch.countDown();
151            onWireFormatNegotiated(info);
152        }
153    
154        public void onException(IOException error) {
155            readyCountDownLatch.countDown();
156            /*
157             * try { super.oneway(new ExceptionResponse(error)); } catch
158             * (IOException e) { // ignore as we are already throwing an exception }
159             */
160            super.onException(error);
161        }
162    
163        public String toString() {
164            return next.toString();
165        }
166    
167        protected void sendWireFormat(WireFormatInfo info) throws IOException {
168            next.oneway(info);
169        }
170    
171        protected void onWireFormatNegotiated(WireFormatInfo info) {
172        }
173    
174        public long getNegotiateTimeout() {
175            return negotiateTimeout;
176        }
177    
178        public void setNegotiateTimeout(long negotiateTimeout) {
179            this.negotiateTimeout = negotiateTimeout;
180        }
181    }