001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.stomp;
019    
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.net.Socket;
026    import java.net.UnknownHostException;
027    import java.util.HashMap;
028    
029    public class StompConnection {
030    
031        public static final long RECEIVE_TIMEOUT = 10000;
032    
033        private Socket stompSocket;
034        private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035        private String version = Stomp.DEFAULT_VERSION;
036    
037        public void open(String host, int port) throws IOException, UnknownHostException {
038            open(new Socket(host, port));
039        }
040    
041        public void open(Socket socket) {
042            stompSocket = socket;
043        }
044    
045        public void close() throws IOException {
046            if (stompSocket != null) {
047                stompSocket.close();
048                stompSocket = null;
049            }
050        }
051    
052        public void sendFrame(String data) throws Exception {
053            byte[] bytes = data.getBytes("UTF-8");
054            OutputStream outputStream = stompSocket.getOutputStream();
055            outputStream.write(bytes);
056            outputStream.flush();
057        }
058    
059        public void sendFrame(String frame, byte[] data) throws Exception {
060            byte[] bytes = frame.getBytes("UTF-8");
061            OutputStream outputStream = stompSocket.getOutputStream();
062            outputStream.write(bytes);
063            outputStream.write(data);
064            outputStream.flush();
065        }
066    
067        public StompFrame receive() throws Exception {
068            return receive(RECEIVE_TIMEOUT);
069        }
070    
071        public StompFrame receive(long timeOut) throws Exception {
072            stompSocket.setSoTimeout((int)timeOut);
073            InputStream is = stompSocket.getInputStream();
074            StompWireFormat wf = new StompWireFormat();
075            wf.setStompVersion(version);
076            DataInputStream dis = new DataInputStream(is);
077            return (StompFrame)wf.unmarshal(dis);
078        }
079    
080        public String receiveFrame() throws Exception {
081            return receiveFrame(RECEIVE_TIMEOUT);
082        }
083    
084        public String receiveFrame(long timeOut) throws Exception {
085            stompSocket.setSoTimeout((int)timeOut);
086            InputStream is = stompSocket.getInputStream();
087            int c = 0;
088            for (;;) {
089                c = is.read();
090                if (c < 0) {
091                    throw new IOException("socket closed.");
092                } else if (c == 0) {
093                    c = is.read();
094                    if (c == '\n') {
095                        // end of frame
096                        return stringFromBuffer(inputBuffer);
097                    } else {
098                        inputBuffer.write(0);
099                        inputBuffer.write(c);
100                    }
101                } else {
102                    inputBuffer.write(c);
103                }
104            }
105        }
106    
107        private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
108            byte[] ba = inputBuffer.toByteArray();
109            inputBuffer.reset();
110            return new String(ba, "UTF-8");
111        }
112    
113        public Socket getStompSocket() {
114            return stompSocket;
115        }
116    
117        public void setStompSocket(Socket stompSocket) {
118            this.stompSocket = stompSocket;
119        }
120    
121        public void connect(String username, String password) throws Exception {
122            connect(username, password, null);
123        }
124    
125        public void connect(String username, String password, String client) throws Exception {
126            HashMap<String, String> headers = new HashMap<String, String>();
127            headers.put("login", username);
128            headers.put("passcode", password);
129            if (client != null) {
130                headers.put("client-id", client);
131            }
132            connect(headers);
133        }
134    
135        public void connect(HashMap<String, String> headers) throws Exception {
136            StompFrame frame = new StompFrame("CONNECT", headers);
137            sendFrame(frame.format());
138    
139            StompFrame connect = receive();
140            if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
141                throw new Exception ("Not connected: " + connect.getBody());
142            }
143        }
144    
145        public void disconnect() throws Exception {
146            disconnect(null);
147        }
148    
149        public void disconnect(String receiptId) throws Exception {
150            StompFrame frame = new StompFrame("DISCONNECT");
151            if (receiptId != null && !receiptId.isEmpty()) {
152                frame.getHeaders().put(Stomp.Headers.RECEIPT_REQUESTED, receiptId);
153            }
154            sendFrame(frame.format());
155        }
156    
157        public void send(String destination, String message) throws Exception {
158            send(destination, message, null, null);
159        }
160    
161        public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
162            if (headers == null) {
163                headers = new HashMap<String, String>();
164            }
165            headers.put("destination", destination);
166            if (transaction != null) {
167                headers.put("transaction", transaction);
168            }
169            StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
170            sendFrame(frame.format());
171        }
172    
173        public void subscribe(String destination) throws Exception {
174            subscribe(destination, null, null);
175        }
176    
177        public void subscribe(String destination, String ack) throws Exception {
178            subscribe(destination, ack, new HashMap<String, String>());
179        }
180    
181        public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
182            if (headers == null) {
183                headers = new HashMap<String, String>();
184            }
185            headers.put("destination", destination);
186            if (ack != null) {
187                headers.put("ack", ack);
188            }
189            StompFrame frame = new StompFrame("SUBSCRIBE", headers);
190            sendFrame(frame.format());
191        }
192    
193        public void unsubscribe(String destination) throws Exception {
194            unsubscribe(destination, null);
195        }
196    
197        public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
198            if (headers == null) {
199                headers = new HashMap<String, String>();
200            }
201            headers.put("destination", destination);
202            StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
203            sendFrame(frame.format());
204        }
205    
206        public void begin(String transaction) throws Exception {
207            HashMap<String, String> headers = new HashMap<String, String>();
208            headers.put("transaction", transaction);
209            StompFrame frame = new StompFrame("BEGIN", headers);
210            sendFrame(frame.format());
211        }
212    
213        public void abort(String transaction) throws Exception {
214            HashMap<String, String> headers = new HashMap<String, String>();
215            headers.put("transaction", transaction);
216            StompFrame frame = new StompFrame("ABORT", headers);
217            sendFrame(frame.format());
218        }
219    
220        public void commit(String transaction) throws Exception {
221            HashMap<String, String> headers = new HashMap<String, String>();
222            headers.put("transaction", transaction);
223            StompFrame frame = new StompFrame("COMMIT", headers);
224            sendFrame(frame.format());
225        }
226    
227        public void ack(StompFrame frame) throws Exception {
228            ack(frame.getHeaders().get("message-id"), null);
229        }
230    
231        public void ack(StompFrame frame, String transaction) throws Exception {
232            ack(frame.getHeaders().get("message-id"), transaction);
233        }
234    
235        public void ack(String messageId) throws Exception {
236            ack(messageId, null);
237        }
238    
239        public void ack(String messageId, String transaction) throws Exception {
240            HashMap<String, String> headers = new HashMap<String, String>();
241            headers.put("message-id", messageId);
242            if (transaction != null)
243                headers.put("transaction", transaction);
244            StompFrame frame = new StompFrame("ACK", headers);
245            sendFrame(frame.format());
246        }
247    
248        public void keepAlive() throws Exception {
249            OutputStream outputStream = stompSocket.getOutputStream();
250            outputStream.write('\n');
251            outputStream.flush();
252        }
253    
254        protected String appendHeaders(HashMap<String, Object> headers) {
255            StringBuilder result = new StringBuilder();
256            for (String key : headers.keySet()) {
257                result.append(key + ":" + headers.get(key) + "\n");
258            }
259            result.append("\n");
260            return result.toString();
261        }
262    
263        public String getVersion() {
264            return version;
265        }
266    
267        public void setVersion(String version) {
268            this.version = version;
269        }
270    }