001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.stomp;
019
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025 import java.net.Socket;
026 import java.net.UnknownHostException;
027 import java.util.HashMap;
028
029 public class StompConnection {
030
031 public static final long RECEIVE_TIMEOUT = 10000;
032
033 private Socket stompSocket;
034 private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035 private String version = Stomp.DEFAULT_VERSION;
036
037 public void open(String host, int port) throws IOException, UnknownHostException {
038 open(new Socket(host, port));
039 }
040
041 public void open(Socket socket) {
042 stompSocket = socket;
043 }
044
045 public void close() throws IOException {
046 if (stompSocket != null) {
047 stompSocket.close();
048 stompSocket = null;
049 }
050 }
051
052 public void sendFrame(String data) throws Exception {
053 byte[] bytes = data.getBytes("UTF-8");
054 OutputStream outputStream = stompSocket.getOutputStream();
055 outputStream.write(bytes);
056 outputStream.flush();
057 }
058
059 public void sendFrame(String frame, byte[] data) throws Exception {
060 byte[] bytes = frame.getBytes("UTF-8");
061 OutputStream outputStream = stompSocket.getOutputStream();
062 outputStream.write(bytes);
063 outputStream.write(data);
064 outputStream.flush();
065 }
066
067 public StompFrame receive() throws Exception {
068 return receive(RECEIVE_TIMEOUT);
069 }
070
071 public StompFrame receive(long timeOut) throws Exception {
072 stompSocket.setSoTimeout((int)timeOut);
073 InputStream is = stompSocket.getInputStream();
074 StompWireFormat wf = new StompWireFormat();
075 wf.setStompVersion(version);
076 DataInputStream dis = new DataInputStream(is);
077 return (StompFrame)wf.unmarshal(dis);
078 }
079
080 public String receiveFrame() throws Exception {
081 return receiveFrame(RECEIVE_TIMEOUT);
082 }
083
084 public String receiveFrame(long timeOut) throws Exception {
085 stompSocket.setSoTimeout((int)timeOut);
086 InputStream is = stompSocket.getInputStream();
087 int c = 0;
088 for (;;) {
089 c = is.read();
090 if (c < 0) {
091 throw new IOException("socket closed.");
092 } else if (c == 0) {
093 c = is.read();
094 if (c == '\n') {
095 // end of frame
096 return stringFromBuffer(inputBuffer);
097 } else {
098 inputBuffer.write(0);
099 inputBuffer.write(c);
100 }
101 } else {
102 inputBuffer.write(c);
103 }
104 }
105 }
106
107 private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
108 byte[] ba = inputBuffer.toByteArray();
109 inputBuffer.reset();
110 return new String(ba, "UTF-8");
111 }
112
113 public Socket getStompSocket() {
114 return stompSocket;
115 }
116
117 public void setStompSocket(Socket stompSocket) {
118 this.stompSocket = stompSocket;
119 }
120
121 public void connect(String username, String password) throws Exception {
122 connect(username, password, null);
123 }
124
125 public void connect(String username, String password, String client) throws Exception {
126 HashMap<String, String> headers = new HashMap<String, String>();
127 headers.put("login", username);
128 headers.put("passcode", password);
129 if (client != null) {
130 headers.put("client-id", client);
131 }
132 StompFrame frame = new StompFrame("CONNECT", headers);
133 sendFrame(frame.format());
134
135 StompFrame connect = receive();
136 if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
137 throw new Exception ("Not connected: " + connect.getBody());
138 }
139 }
140
141 public void disconnect() throws Exception {
142 disconnect(null);
143 }
144
145 public void disconnect(String receiptId) throws Exception {
146 StompFrame frame = new StompFrame("DISCONNECT");
147 if (receiptId != null && !receiptId.isEmpty()) {
148 frame.getHeaders().put(Stomp.Headers.RECEIPT_REQUESTED, receiptId);
149 }
150 sendFrame(frame.format());
151 }
152
153 public void send(String destination, String message) throws Exception {
154 send(destination, message, null, null);
155 }
156
157 public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
158 if (headers == null) {
159 headers = new HashMap<String, String>();
160 }
161 headers.put("destination", destination);
162 if (transaction != null) {
163 headers.put("transaction", transaction);
164 }
165 StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
166 sendFrame(frame.format());
167 }
168
169 public void subscribe(String destination) throws Exception {
170 subscribe(destination, null, null);
171 }
172
173 public void subscribe(String destination, String ack) throws Exception {
174 subscribe(destination, ack, new HashMap<String, String>());
175 }
176
177 public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
178 if (headers == null) {
179 headers = new HashMap<String, String>();
180 }
181 headers.put("destination", destination);
182 if (ack != null) {
183 headers.put("ack", ack);
184 }
185 StompFrame frame = new StompFrame("SUBSCRIBE", headers);
186 sendFrame(frame.format());
187 }
188
189 public void unsubscribe(String destination) throws Exception {
190 unsubscribe(destination, null);
191 }
192
193 public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
194 if (headers == null) {
195 headers = new HashMap<String, String>();
196 }
197 headers.put("destination", destination);
198 StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
199 sendFrame(frame.format());
200 }
201
202 public void begin(String transaction) throws Exception {
203 HashMap<String, String> headers = new HashMap<String, String>();
204 headers.put("transaction", transaction);
205 StompFrame frame = new StompFrame("BEGIN", headers);
206 sendFrame(frame.format());
207 }
208
209 public void abort(String transaction) throws Exception {
210 HashMap<String, String> headers = new HashMap<String, String>();
211 headers.put("transaction", transaction);
212 StompFrame frame = new StompFrame("ABORT", headers);
213 sendFrame(frame.format());
214 }
215
216 public void commit(String transaction) throws Exception {
217 HashMap<String, String> headers = new HashMap<String, String>();
218 headers.put("transaction", transaction);
219 StompFrame frame = new StompFrame("COMMIT", headers);
220 sendFrame(frame.format());
221 }
222
223 public void ack(StompFrame frame) throws Exception {
224 ack(frame.getHeaders().get("message-id"), null);
225 }
226
227 public void ack(StompFrame frame, String transaction) throws Exception {
228 ack(frame.getHeaders().get("message-id"), transaction);
229 }
230
231 public void ack(String messageId) throws Exception {
232 ack(messageId, null);
233 }
234
235 public void ack(String messageId, String transaction) throws Exception {
236 HashMap<String, String> headers = new HashMap<String, String>();
237 headers.put("message-id", messageId);
238 if (transaction != null)
239 headers.put("transaction", transaction);
240 StompFrame frame = new StompFrame("ACK", headers);
241 sendFrame(frame.format());
242 }
243
244 protected String appendHeaders(HashMap<String, Object> headers) {
245 StringBuilder result = new StringBuilder();
246 for (String key : headers.keySet()) {
247 result.append(key + ":" + headers.get(key) + "\n");
248 }
249 result.append("\n");
250 return result.toString();
251 }
252
253 public String getVersion() {
254 return version;
255 }
256
257 public void setVersion(String version) {
258 this.version = version;
259 }
260 }