001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.mqtt;
018    
019    import java.io.IOException;
020    
021    import javax.jms.JMSException;
022    import org.apache.activemq.transport.tcp.TcpTransport;
023    import org.fusesource.hawtbuf.DataByteArrayInputStream;
024    import org.fusesource.hawtbuf.DataByteArrayOutputStream;
025    import org.fusesource.mqtt.codec.*;
026    
027    public class MQTTCodec {
028    
029        TcpTransport transport;
030    
031        DataByteArrayOutputStream currentCommand = new DataByteArrayOutputStream();
032        boolean processedHeader = false;
033        String action;
034        byte header;
035        int contentLength = -1;
036        int previousByte = -1;
037        int payLoadRead = 0;
038    
039        public MQTTCodec(TcpTransport transport) {
040            this.transport = transport;
041        }
042    
043        public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
044            int i = 0;
045            byte b;
046            while (i++ < readSize) {
047                b = input.readByte();
048                // skip repeating nulls
049                if (!processedHeader && b == 0) {
050                    previousByte = 0;
051                    continue;
052                }
053    
054                if (!processedHeader) {
055                    i += processHeader(b, input);
056                    if (contentLength == 0) {
057                        processCommand();
058                    }
059    
060                } else {
061    
062                    if (contentLength == -1) {
063                        // end of command reached, unmarshal
064                        if (b == 0) {
065                            processCommand();
066                        } else {
067                            currentCommand.write(b);
068                        }
069                    } else {
070                        // read desired content length
071                        if (payLoadRead == contentLength) {
072                            processCommand();
073                            i += processHeader(b, input);
074                        } else {
075                            currentCommand.write(b);
076                            payLoadRead++;
077                        }
078                    }
079                }
080    
081                previousByte = b;
082            }
083            if (processedHeader && payLoadRead == contentLength) {
084                processCommand();
085            }
086        }
087    
088        /**
089         * sets the content length
090         *
091         * @return number of bytes read
092         */
093        private int processHeader(byte header, DataByteArrayInputStream input) {
094            this.header = header;
095            byte digit;
096            int multiplier = 1;
097            int read = 0;
098            int length = 0;
099            do {
100                digit = input.readByte();
101                length += (digit & 0x7F) * multiplier;
102                multiplier <<= 7;
103                read++;
104            } while ((digit & 0x80) != 0);
105    
106            contentLength = length;
107            processedHeader = true;
108            return read;
109        }
110    
111    
112        private void processCommand() throws Exception {
113            MQTTFrame frame = new MQTTFrame(currentCommand.toBuffer().deepCopy()).header(header);
114            transport.doConsume(frame);
115            processedHeader = false;
116            currentCommand.reset();
117            contentLength = -1;
118            payLoadRead = 0;
119        }
120    
121        public static String commandType(byte header) throws IOException, JMSException {
122    
123            byte messageType = (byte) ((header & 0xF0) >>> 4);
124            switch (messageType) {
125                case PINGREQ.TYPE: {
126                    return "PINGREQ";
127                }
128                case CONNECT.TYPE: {
129                    return "CONNECT";
130                }
131                case DISCONNECT.TYPE: {
132                    return "DISCONNECT";
133                }
134                case SUBSCRIBE.TYPE: {
135                    return "SUBSCRIBE";
136                }
137                case UNSUBSCRIBE.TYPE: {
138                    return "UNSUBSCRIBE";
139                }
140                case PUBLISH.TYPE: {
141                    return "PUBLISH";
142                }
143                case PUBACK.TYPE: {
144                    return "PUBACK";
145                }
146                case PUBREC.TYPE: {
147                    return "PUBREC";
148                }
149                case PUBREL.TYPE: {
150                    return "PUBREL";
151                }
152                case PUBCOMP.TYPE: {
153                    return "PUBCOMP";
154                }
155                default:
156                    return "UNKNOWN";
157            }
158    
159        }
160    
161    }