001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.stomp;
019
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.net.Socket;
026import java.net.UnknownHostException;
027import java.util.HashMap;
028
029public class StompConnection {
030
031    public static final long RECEIVE_TIMEOUT = 10000;
032
033    private Socket stompSocket;
034    private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035    private String version = Stomp.DEFAULT_VERSION;
036
037    public void open(String host, int port) throws IOException, UnknownHostException {
038        open(new Socket(host, port));
039    }
040
041    public void open(Socket socket) {
042        stompSocket = socket;
043    }
044
045    public void close() throws IOException {
046        if (stompSocket != null) {
047            stompSocket.close();
048            stompSocket = null;
049        }
050    }
051
052    public void sendFrame(String data) throws Exception {
053        byte[] bytes = data.getBytes("UTF-8");
054        OutputStream outputStream = stompSocket.getOutputStream();
055        outputStream.write(bytes);
056        outputStream.flush();
057    }
058
059    public void sendFrame(String frame, byte[] data) throws Exception {
060        byte[] bytes = frame.getBytes("UTF-8");
061        OutputStream outputStream = stompSocket.getOutputStream();
062        outputStream.write(bytes);
063        outputStream.write(data);
064        outputStream.flush();
065    }
066
067    public StompFrame receive() throws Exception {
068        return receive(RECEIVE_TIMEOUT);
069    }
070
071    public StompFrame receive(long timeOut) throws Exception {
072        stompSocket.setSoTimeout((int)timeOut);
073        InputStream is = stompSocket.getInputStream();
074        StompWireFormat wf = new StompWireFormat();
075        wf.setStompVersion(version);
076        DataInputStream dis = new DataInputStream(is);
077        return (StompFrame)wf.unmarshal(dis);
078    }
079
080    public String receiveFrame() throws Exception {
081        return receiveFrame(RECEIVE_TIMEOUT);
082    }
083
084    public String receiveFrame(long timeOut) throws Exception {
085        stompSocket.setSoTimeout((int)timeOut);
086        InputStream is = stompSocket.getInputStream();
087        int c = 0;
088        for (;;) {
089            c = is.read();
090            if (c < 0) {
091                throw new IOException("socket closed.");
092            } else if (c == 0) {
093                c = is.read();
094                if (c == '\n') {
095                    // end of frame
096                    return stringFromBuffer(inputBuffer);
097                } else {
098                    inputBuffer.write(0);
099                    inputBuffer.write(c);
100                }
101            } else {
102                inputBuffer.write(c);
103            }
104        }
105    }
106
107    private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
108        byte[] ba = inputBuffer.toByteArray();
109        inputBuffer.reset();
110        return new String(ba, "UTF-8");
111    }
112
113    public Socket getStompSocket() {
114        return stompSocket;
115    }
116
117    public void setStompSocket(Socket stompSocket) {
118        this.stompSocket = stompSocket;
119    }
120
121    public void connect(String username, String password) throws Exception {
122        connect(username, password, null);
123    }
124
125    public void connect(String username, String password, String client) throws Exception {
126        HashMap<String, String> headers = new HashMap<String, String>();
127        headers.put("login", username);
128        headers.put("passcode", password);
129        if (client != null) {
130            headers.put("client-id", client);
131        }
132        connect(headers);
133    }
134
135    public void connect(HashMap<String, String> headers) throws Exception {
136        StompFrame frame = new StompFrame("CONNECT", headers);
137        sendFrame(frame.format());
138
139        StompFrame connect = receive();
140        if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
141            throw new Exception ("Not connected: " + connect.getBody());
142        }
143    }
144
145    public void disconnect() throws Exception {
146        disconnect(null);
147    }
148
149    public void disconnect(String receiptId) throws Exception {
150        StompFrame frame = new StompFrame("DISCONNECT");
151        if (receiptId != null && !receiptId.isEmpty()) {
152            frame.getHeaders().put(Stomp.Headers.RECEIPT_REQUESTED, receiptId);
153        }
154        sendFrame(frame.format());
155    }
156
157    public void send(String destination, String message) throws Exception {
158        send(destination, message, null, null);
159    }
160
161    public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
162        if (headers == null) {
163            headers = new HashMap<String, String>();
164        }
165        headers.put("destination", destination);
166        if (transaction != null) {
167            headers.put("transaction", transaction);
168        }
169        StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
170        sendFrame(frame.format());
171    }
172
173    public void subscribe(String destination) throws Exception {
174        subscribe(destination, null, null);
175    }
176
177    public void subscribe(String destination, String ack) throws Exception {
178        subscribe(destination, ack, new HashMap<String, String>());
179    }
180
181    public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
182        if (headers == null) {
183            headers = new HashMap<String, String>();
184        }
185        headers.put("destination", destination);
186        if (ack != null) {
187            headers.put("ack", ack);
188        }
189        StompFrame frame = new StompFrame("SUBSCRIBE", headers);
190        sendFrame(frame.format());
191    }
192
193    public void unsubscribe(String destination) throws Exception {
194        unsubscribe(destination, null);
195    }
196
197    public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
198        if (headers == null) {
199            headers = new HashMap<String, String>();
200        }
201        headers.put("destination", destination);
202        StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
203        sendFrame(frame.format());
204    }
205
206    public void begin(String transaction) throws Exception {
207        HashMap<String, String> headers = new HashMap<String, String>();
208        headers.put("transaction", transaction);
209        StompFrame frame = new StompFrame("BEGIN", headers);
210        sendFrame(frame.format());
211    }
212
213    public void abort(String transaction) throws Exception {
214        HashMap<String, String> headers = new HashMap<String, String>();
215        headers.put("transaction", transaction);
216        StompFrame frame = new StompFrame("ABORT", headers);
217        sendFrame(frame.format());
218    }
219
220    public void commit(String transaction) throws Exception {
221        HashMap<String, String> headers = new HashMap<String, String>();
222        headers.put("transaction", transaction);
223        StompFrame frame = new StompFrame("COMMIT", headers);
224        sendFrame(frame.format());
225    }
226
227    public void ack(StompFrame frame) throws Exception {
228        ack(frame.getHeaders().get("message-id"), null);
229    }
230
231    public void ack(StompFrame frame, String transaction) throws Exception {
232        ack(frame.getHeaders().get("message-id"), transaction);
233    }
234
235    public void ack(String messageId) throws Exception {
236        ack(messageId, null);
237    }
238
239    public void ack(String messageId, String transaction) throws Exception {
240        HashMap<String, String> headers = new HashMap<String, String>();
241        headers.put("message-id", messageId);
242        if (transaction != null)
243            headers.put("transaction", transaction);
244        StompFrame frame = new StompFrame("ACK", headers);
245        sendFrame(frame.format());
246    }
247
248    public void keepAlive() throws Exception {
249        OutputStream outputStream = stompSocket.getOutputStream();
250        outputStream.write('\n');
251        outputStream.flush();
252    }
253
254    protected String appendHeaders(HashMap<String, Object> headers) {
255        StringBuilder result = new StringBuilder();
256        for (String key : headers.keySet()) {
257            result.append(key + ":" + headers.get(key) + "\n");
258        }
259        result.append("\n");
260        return result.toString();
261    }
262
263    public String getVersion() {
264        return version;
265    }
266
267    public void setVersion(String version) {
268        this.version = version;
269    }
270}