001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.xmpp;
018    
019    import java.io.DataInput;
020    import java.io.DataInputStream;
021    import java.io.DataOutput;
022    import java.io.DataOutputStream;
023    import java.io.IOException;
024    
025    import org.apache.activemq.util.ByteArrayInputStream;
026    import org.apache.activemq.util.ByteArrayOutputStream;
027    import org.apache.activemq.util.ByteSequence;
028    import org.apache.activemq.wireformat.WireFormat;
029    
030    /**
031     * A wire format which uses XMPP format of messages
032     *
033     * 
034     */
035    public class XmppWireFormat implements WireFormat {
036        
037        private int version = 1;
038    
039        public WireFormat copy() {
040            return new XmppWireFormat();
041        }
042    
043        /*
044        public Packet readPacket(DataInput in) throws IOException {
045            return null;
046        }
047    
048        public Packet readPacket(int firstByte, DataInput in) throws IOException {
049            return null;
050        }
051    
052        public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
053            switch (packet.getPacketType()) {
054                case Packet.ACTIVEMQ_MESSAGE:
055                    writeMessage((ActiveMQMessage) packet, "", out);
056                    break;
057    
058                case Packet.ACTIVEMQ_TEXT_MESSAGE:
059                    writeTextMessage((ActiveMQTextMessage) packet, out);
060                    break;
061    
062                case Packet.ACTIVEMQ_BYTES_MESSAGE:
063                    writeBytesMessage((ActiveMQBytesMessage) packet, out);
064                    break;
065    
066                case Packet.ACTIVEMQ_OBJECT_MESSAGE:
067                    writeObjectMessage((ActiveMQObjectMessage) packet, out);
068                    break;
069    
070                case Packet.ACTIVEMQ_MAP_MESSAGE:
071                case Packet.ACTIVEMQ_STREAM_MESSAGE:
072    
073    
074                case Packet.ACTIVEMQ_BROKER_INFO:
075                case Packet.ACTIVEMQ_CONNECTION_INFO:
076                case Packet.ACTIVEMQ_MSG_ACK:
077                case Packet.CONSUMER_INFO:
078                case Packet.DURABLE_UNSUBSCRIBE:
079                case Packet.INT_RESPONSE_RECEIPT_INFO:
080                case Packet.PRODUCER_INFO:
081                case Packet.RECEIPT_INFO:
082                case Packet.RESPONSE_RECEIPT_INFO:
083                case Packet.SESSION_INFO:
084                case Packet.TRANSACTION_INFO:
085                case Packet.XA_TRANSACTION_INFO:
086                default:
087                    log.warn("Ignoring message type: " + packet.getPacketType() + " packet: " + packet);
088            }
089            return null;
090        }
091    */
092    
093    //    /**
094    //     * Can this wireformat process packets of this version
095    //     * @param version the version number to test
096    //     * @return true if can accept the version
097    //     */
098    //    public boolean canProcessWireFormatVersion(int version){
099    //        return true;
100    //    }
101    //
102    //    /**
103    //     * @return the current version of this wire format
104    //     */
105    //    public int getCurrentWireFormatVersion(){
106    //        return 1;
107    //    }
108    //
109    //    // Implementation methods
110    //    //-------------------------------------------------------------------------
111    //    protected void writeObjectMessage(ActiveMQObjectMessage message, DataOutput out) throws JMSException, IOException {
112    //        Serializable object = message.getObject();
113    //        String text = (object != null) ? object.toString() : "";
114    //        writeMessage(message, text, out);
115    //    }
116    //
117    //    protected void writeTextMessage(ActiveMQTextMessage message, DataOutput out) throws JMSException, IOException {
118    //        writeMessage(message, message.getText(), out);
119    //    }
120    //
121    //    protected void writeBytesMessage(ActiveMQBytesMessage message, DataOutput out) throws IOException {
122    //        ByteArray data = message.getBodyAsBytes();
123    //        String text = encodeBinary(data.getBuf(),data.getOffset(),data.getLength());
124    //        writeMessage(message, text, out);
125    //    }
126    //
127    //    protected void writeMessage(ActiveMQMessage message, String body, DataOutput out) throws IOException {
128    //        String type = getXmppType(message);
129    //
130    //        StringBuffer buffer = new StringBuffer("<");
131    //        buffer.append(type);
132    //        buffer.append(" to='");
133    //        buffer.append(message.getJMSDestination().toString());
134    //        buffer.append("' from='");
135    //        buffer.append(message.getJMSReplyTo().toString());
136    //        String messageID = message.getJMSMessageID();
137    //        if (messageID != null) {
138    //            buffer.append("' id='");
139    //            buffer.append(messageID);
140    //        }
141    //
142    //        HashMap properties = message.getProperties();
143    //        if (properties != null) {
144    //            for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
145    //                Map.Entry entry = (Map.Entry) iter.next();
146    //                Object key = entry.getKey();
147    //                Object value = entry.getValue();
148    //                if (value != null) {
149    //                    buffer.append("' ");
150    //                    buffer.append(key.toString());
151    //                    buffer.append("='");
152    //                    buffer.append(value.toString());
153    //                }
154    //            }
155    //        }
156    //
157    //        buffer.append("'>");
158    //
159    //        String id = message.getJMSCorrelationID();
160    //        if (id != null) {
161    //            buffer.append("<thread>");
162    //            buffer.append(id);
163    //            buffer.append("</thread>");
164    //        }
165    //        buffer.append(body);
166    //        buffer.append("</");
167    //        buffer.append(type);
168    //        buffer.append(">");
169    //
170    //        out.write(buffer.toString().getBytes());
171    //    }
172    //
173    //    protected String encodeBinary(byte[] data,int offset,int length) {
174    //        // TODO
175    //        throw new RuntimeException("Not implemented yet!");
176    //    }
177    //
178    //    protected String getXmppType(ActiveMQMessage message) {
179    //        String type = message.getJMSType();
180    //        if (type == null) {
181    //            type = "message";
182    //        }
183    //        return type;
184    //    }
185    
186    
187        public ByteSequence marshal(Object command) throws IOException {
188            ByteArrayOutputStream baos = new ByteArrayOutputStream();
189            DataOutputStream dos = new DataOutputStream(baos);
190            marshal(command, dos);
191            dos.close();
192            return baos.toByteSequence();
193        }
194    
195        public Object unmarshal(ByteSequence packet) throws IOException {
196            ByteArrayInputStream stream = new ByteArrayInputStream(packet);
197            DataInputStream dis = new DataInputStream(stream);
198            return unmarshal(dis);
199        }
200    
201        public void marshal(Object object, DataOutput dataOutput) throws IOException {
202            /** TODO */
203        }
204    
205        public Object unmarshal(DataInput dataInput) throws IOException {
206            return null;  /** TODO */
207        }
208    
209    
210        public int getVersion() {
211            return version;
212        }
213    
214        public void setVersion(int version) {
215            this.version = version;
216        }
217        
218            public boolean inReceive() {
219                    // TODO Implement for inactivity monitor
220                    return false;
221            }
222    }