001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.ByteArrayInputStream;
020    import java.util.Arrays;
021    import java.util.Collections;
022    import java.util.HashMap;
023    import java.util.HashSet;
024    import java.util.Map;
025    
026    import org.apache.activemq.transport.tcp.TcpTransport;
027    import org.apache.activemq.util.ByteArrayOutputStream;
028    import org.apache.activemq.util.DataByteArrayInputStream;
029    
030    public class StompCodec {
031    
032        final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
033        TcpTransport transport;
034    
035        ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
036        boolean processedHeaders = false;
037        String action;
038        HashMap<String, String> headers;
039        int contentLength = -1;
040        int readLength = 0;
041        int previousByte = -1;
042        boolean awaitingCommandStart = true;
043        String version = Stomp.DEFAULT_VERSION;
044    
045        public StompCodec(TcpTransport transport) {
046            this.transport = transport;
047        }
048    
049        public void parse(ByteArrayInputStream input, int readSize) throws Exception {
050           int i = 0;
051           int b;
052           while(i++ < readSize) {
053               b = input.read();
054               // skip repeating nulls
055               if (!processedHeaders && previousByte == 0 && b == 0) {
056                   continue;
057               }
058    
059               if (!processedHeaders) {
060    
061                   // skip heart beat commands.
062                   if (awaitingCommandStart && b == '\n') {
063                       continue;
064                   } else {
065                       awaitingCommandStart = false;   // non-newline indicates next frame.
066                   }
067    
068                   currentCommand.write(b);
069                   // end of headers section, parse action and header
070                   if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
071                       StompWireFormat wf = (StompWireFormat) transport.getWireFormat();
072                       DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
073                       action = wf.parseAction(data);
074                       headers = wf.parseHeaders(data);
075                       try {
076                           String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
077                           if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
078                               contentLength = wf.parseContentLength(contentLengthHeader);
079                           } else {
080                               contentLength = -1;
081                           }
082                       } catch (ProtocolException ignore) {}
083                       processedHeaders = true;
084                       currentCommand.reset();
085                   }
086    
087               } else {
088    
089                   if (contentLength == -1) {
090                       // end of command reached, unmarshal
091                       if (b == 0) {
092                           processCommand();
093                       } else {
094                           currentCommand.write(b);
095                       }
096                   } else {
097                       // read desired content length
098                       if (readLength++ == contentLength) {
099                           processCommand();
100                           readLength = 0;
101                       } else {
102                           currentCommand.write(b);
103                       }
104                   }
105               }
106    
107               previousByte = b;
108           }
109        }
110    
111        protected void processCommand() throws Exception {
112            StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
113            transport.doConsume(frame);
114            processedHeaders = false;
115            awaitingCommandStart = true;
116            currentCommand.reset();
117            contentLength = -1;
118        }
119    
120        public static String detectVersion(Map<String, String> headers) throws ProtocolException {
121            String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
122    
123            if (accepts == null) {
124                accepts = Stomp.DEFAULT_VERSION;
125            }
126            HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
127            acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
128            if (acceptsVersions.isEmpty()) {
129                throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
130                        Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
131            } else {
132                return Collections.max(acceptsVersions);
133            }
134        }
135    }