001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.stomp;
019    
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.net.Socket;
026    import java.net.UnknownHostException;
027    import java.util.HashMap;
028    
029    public class StompConnection {
030    
031        public static final long RECEIVE_TIMEOUT = 10000;
032    
033        private Socket stompSocket;
034        private ByteArrayOutputStream inputBuffer = new ByteArrayOutputStream();
035        private String version = Stomp.DEFAULT_VERSION;
036    
037        public void open(String host, int port) throws IOException, UnknownHostException {
038            open(new Socket(host, port));
039        }
040    
041        public void open(Socket socket) {
042            stompSocket = socket;
043        }
044    
045        public void close() throws IOException {
046            if (stompSocket != null) {
047                stompSocket.close();
048                stompSocket = null;
049            }
050        }
051    
052        public void sendFrame(String data) throws Exception {
053            byte[] bytes = data.getBytes("UTF-8");
054            OutputStream outputStream = stompSocket.getOutputStream();
055            outputStream.write(bytes);
056            outputStream.flush();
057        }
058    
059        public void sendFrame(String frame, byte[] data) throws Exception {
060            byte[] bytes = frame.getBytes("UTF-8");
061            OutputStream outputStream = stompSocket.getOutputStream();
062            outputStream.write(bytes);
063            outputStream.write(data);
064            outputStream.flush();
065        }
066    
067        public StompFrame receive() throws Exception {
068            return receive(RECEIVE_TIMEOUT);
069        }
070    
071        public StompFrame receive(long timeOut) throws Exception {
072            stompSocket.setSoTimeout((int)timeOut);
073            InputStream is = stompSocket.getInputStream();
074            StompWireFormat wf = new StompWireFormat();
075            wf.setStompVersion(version);
076            DataInputStream dis = new DataInputStream(is);
077            return (StompFrame)wf.unmarshal(dis);
078        }
079    
080        public String receiveFrame() throws Exception {
081            return receiveFrame(RECEIVE_TIMEOUT);
082        }
083    
084        public String receiveFrame(long timeOut) throws Exception {
085            stompSocket.setSoTimeout((int)timeOut);
086            InputStream is = stompSocket.getInputStream();
087            int c = 0;
088            for (;;) {
089                c = is.read();
090                if (c < 0) {
091                    throw new IOException("socket closed.");
092                } else if (c == 0) {
093                    c = is.read();
094                    if (c == '\n') {
095                        // end of frame
096                        return stringFromBuffer(inputBuffer);
097                    } else {
098                        inputBuffer.write(0);
099                        inputBuffer.write(c);
100                    }
101                } else {
102                    inputBuffer.write(c);
103                }
104            }
105        }
106    
107        private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
108            byte[] ba = inputBuffer.toByteArray();
109            inputBuffer.reset();
110            return new String(ba, "UTF-8");
111        }
112    
113        public Socket getStompSocket() {
114            return stompSocket;
115        }
116    
117        public void setStompSocket(Socket stompSocket) {
118            this.stompSocket = stompSocket;
119        }
120    
121        public void connect(String username, String password) throws Exception {
122            connect(username, password, null);
123        }
124    
125        public void connect(String username, String password, String client) throws Exception {
126            HashMap<String, String> headers = new HashMap<String, String>();
127            headers.put("login", username);
128            headers.put("passcode", password);
129            if (client != null) {
130                headers.put("client-id", client);
131            }
132            StompFrame frame = new StompFrame("CONNECT", headers);
133            sendFrame(frame.format());
134    
135            StompFrame connect = receive();
136            if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
137                throw new Exception ("Not connected: " + connect.getBody());
138            }
139        }
140    
141        public void disconnect() throws Exception {
142            disconnect(null);
143        }
144    
145        public void disconnect(String receiptId) throws Exception {
146            StompFrame frame = new StompFrame("DISCONNECT");
147            if (receiptId != null && !receiptId.isEmpty()) {
148                frame.getHeaders().put(Stomp.Headers.RECEIPT_REQUESTED, receiptId);
149            }
150            sendFrame(frame.format());
151        }
152    
153        public void send(String destination, String message) throws Exception {
154            send(destination, message, null, null);
155        }
156    
157        public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
158            if (headers == null) {
159                headers = new HashMap<String, String>();
160            }
161            headers.put("destination", destination);
162            if (transaction != null) {
163                headers.put("transaction", transaction);
164            }
165            StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
166            sendFrame(frame.format());
167        }
168    
169        public void subscribe(String destination) throws Exception {
170            subscribe(destination, null, null);
171        }
172    
173        public void subscribe(String destination, String ack) throws Exception {
174            subscribe(destination, ack, new HashMap<String, String>());
175        }
176    
177        public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
178            if (headers == null) {
179                headers = new HashMap<String, String>();
180            }
181            headers.put("destination", destination);
182            if (ack != null) {
183                headers.put("ack", ack);
184            }
185            StompFrame frame = new StompFrame("SUBSCRIBE", headers);
186            sendFrame(frame.format());
187        }
188    
189        public void unsubscribe(String destination) throws Exception {
190            unsubscribe(destination, null);
191        }
192    
193        public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
194            if (headers == null) {
195                headers = new HashMap<String, String>();
196            }
197            headers.put("destination", destination);
198            StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
199            sendFrame(frame.format());
200        }
201    
202        public void begin(String transaction) throws Exception {
203            HashMap<String, String> headers = new HashMap<String, String>();
204            headers.put("transaction", transaction);
205            StompFrame frame = new StompFrame("BEGIN", headers);
206            sendFrame(frame.format());
207        }
208    
209        public void abort(String transaction) throws Exception {
210            HashMap<String, String> headers = new HashMap<String, String>();
211            headers.put("transaction", transaction);
212            StompFrame frame = new StompFrame("ABORT", headers);
213            sendFrame(frame.format());
214        }
215    
216        public void commit(String transaction) throws Exception {
217            HashMap<String, String> headers = new HashMap<String, String>();
218            headers.put("transaction", transaction);
219            StompFrame frame = new StompFrame("COMMIT", headers);
220            sendFrame(frame.format());
221        }
222    
223        public void ack(StompFrame frame) throws Exception {
224            ack(frame.getHeaders().get("message-id"), null);
225        }
226    
227        public void ack(StompFrame frame, String transaction) throws Exception {
228            ack(frame.getHeaders().get("message-id"), transaction);
229        }
230    
231        public void ack(String messageId) throws Exception {
232            ack(messageId, null);
233        }
234    
235        public void ack(String messageId, String transaction) throws Exception {
236            HashMap<String, String> headers = new HashMap<String, String>();
237            headers.put("message-id", messageId);
238            if (transaction != null)
239                headers.put("transaction", transaction);
240            StompFrame frame = new StompFrame("ACK", headers);
241            sendFrame(frame.format());
242        }
243    
244        protected String appendHeaders(HashMap<String, Object> headers) {
245            StringBuilder result = new StringBuilder();
246            for (String key : headers.keySet()) {
247                result.append(key + ":" + headers.get(key) + "\n");
248            }
249            result.append("\n");
250            return result.toString();
251        }
252    
253        public String getVersion() {
254            return version;
255        }
256    
257        public void setVersion(String version) {
258            this.version = version;
259        }
260    }