001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import org.apache.activemq.transport.tcp.TcpTransport;
020 import org.apache.activemq.util.ByteArrayOutputStream;
021 import org.apache.activemq.util.DataByteArrayInputStream;
022
023 import java.io.ByteArrayInputStream;
024 import java.util.Arrays;
025 import java.util.Collections;
026 import java.util.HashMap;
027 import java.util.HashSet;
028 import java.util.Map;
029
030 public class StompCodec {
031
032 final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
033 TcpTransport transport;
034
035 ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
036 boolean processedHeaders = false;
037 String action;
038 HashMap<String, String> headers;
039 int contentLength = -1;
040 int readLength = 0;
041 int previousByte = -1;
042 String version = Stomp.DEFAULT_VERSION;
043
044 public StompCodec(TcpTransport transport) {
045 this.transport = transport;
046 }
047
048 public void parse(ByteArrayInputStream input, int readSize) throws Exception {
049 int i = 0;
050 int b;
051 while(i++ < readSize) {
052 b = input.read();
053 // skip repeating nulls
054 if (!processedHeaders && previousByte == 0 && b == 0) {
055 continue;
056 }
057
058 if (!processedHeaders) {
059 currentCommand.write(b);
060 // end of headers section, parse action and header
061 if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
062 StompWireFormat wf = (StompWireFormat) transport.getWireFormat();
063 DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
064 action = wf.parseAction(data);
065 headers = wf.parseHeaders(data);
066 try {
067 if (action.equals(Stomp.Commands.CONNECT) || action.equals(Stomp.Commands.STOMP)) {
068 wf.setStompVersion(detectVersion(headers));
069 }
070 String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
071 if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
072 contentLength = wf.parseContentLength(contentLengthHeader);
073 } else {
074 contentLength = -1;
075 }
076 } catch (ProtocolException ignore) {}
077 processedHeaders = true;
078 currentCommand.reset();
079 }
080 } else {
081
082 if (contentLength == -1) {
083 // end of command reached, unmarshal
084 if (b == 0) {
085 processCommand();
086 } else {
087 currentCommand.write(b);
088 }
089 } else {
090 // read desired content length
091 if (readLength++ == contentLength) {
092 processCommand();
093 readLength = 0;
094 } else {
095 currentCommand.write(b);
096 }
097 }
098 }
099
100 previousByte = b;
101 }
102 }
103
104 protected void processCommand() throws Exception {
105 StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
106 transport.doConsume(frame);
107 processedHeaders = false;
108 currentCommand.reset();
109 contentLength = -1;
110 }
111
112 public static String detectVersion(Map<String, String> headers) throws ProtocolException {
113 String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
114
115 if (accepts == null) {
116 accepts = Stomp.DEFAULT_VERSION;
117 }
118 HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
119 acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
120 if (acceptsVersions.isEmpty()) {
121 throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
122 Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
123 } else {
124 return Collections.max(acceptsVersions);
125 }
126 }
127 }