001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.ByteArrayInputStream;
020 import java.util.Arrays;
021 import java.util.Collections;
022 import java.util.HashMap;
023 import java.util.HashSet;
024 import java.util.Map;
025
026 import org.apache.activemq.transport.tcp.TcpTransport;
027 import org.apache.activemq.util.ByteArrayOutputStream;
028 import org.apache.activemq.util.DataByteArrayInputStream;
029
030 public class StompCodec {
031
032 final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
033 TcpTransport transport;
034
035 ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
036 boolean processedHeaders = false;
037 String action;
038 HashMap<String, String> headers;
039 int contentLength = -1;
040 int readLength = 0;
041 int previousByte = -1;
042 boolean awaitingCommandStart = true;
043 String version = Stomp.DEFAULT_VERSION;
044
045 public StompCodec(TcpTransport transport) {
046 this.transport = transport;
047 }
048
049 public void parse(ByteArrayInputStream input, int readSize) throws Exception {
050 int i = 0;
051 int b;
052 while(i++ < readSize) {
053 b = input.read();
054 // skip repeating nulls
055 if (!processedHeaders && previousByte == 0 && b == 0) {
056 continue;
057 }
058
059 if (!processedHeaders) {
060
061 // skip heart beat commands.
062 if (awaitingCommandStart && b == '\n') {
063 continue;
064 } else {
065 awaitingCommandStart = false; // non-newline indicates next frame.
066 }
067
068 currentCommand.write(b);
069 // end of headers section, parse action and header
070 if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
071 StompWireFormat wf = (StompWireFormat) transport.getWireFormat();
072 DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
073 action = wf.parseAction(data);
074 headers = wf.parseHeaders(data);
075 try {
076 String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
077 if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
078 contentLength = wf.parseContentLength(contentLengthHeader);
079 } else {
080 contentLength = -1;
081 }
082 } catch (ProtocolException ignore) {}
083 processedHeaders = true;
084 currentCommand.reset();
085 }
086
087 } else {
088
089 if (contentLength == -1) {
090 // end of command reached, unmarshal
091 if (b == 0) {
092 processCommand();
093 } else {
094 currentCommand.write(b);
095 }
096 } else {
097 // read desired content length
098 if (readLength++ == contentLength) {
099 processCommand();
100 readLength = 0;
101 } else {
102 currentCommand.write(b);
103 }
104 }
105 }
106
107 previousByte = b;
108 }
109 }
110
111 protected void processCommand() throws Exception {
112 StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
113 transport.doConsume(frame);
114 processedHeaders = false;
115 awaitingCommandStart = true;
116 currentCommand.reset();
117 contentLength = -1;
118 }
119
120 public static String detectVersion(Map<String, String> headers) throws ProtocolException {
121 String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
122
123 if (accepts == null) {
124 accepts = Stomp.DEFAULT_VERSION;
125 }
126 HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
127 acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
128 if (acceptsVersions.isEmpty()) {
129 throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
130 Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
131 } else {
132 return Collections.max(acceptsVersions);
133 }
134 }
135 }