001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.amqp;
018    
019    import org.apache.activemq.util.ByteArrayInputStream;
020    import org.apache.activemq.util.ByteArrayOutputStream;
021    import org.apache.activemq.util.ByteSequence;
022    import org.apache.activemq.wireformat.WireFormat;
023    import org.fusesource.hawtbuf.Buffer;
024    
025    import java.io.*;
026    
027    /**
028     */
029    public class AmqpWireFormat implements WireFormat {
030    
031    
032        private int version = 1;
033        private long maxFrameSize = 1024*1024*100;
034    
035        public ByteSequence marshal(Object command) throws IOException {
036            ByteArrayOutputStream baos = new ByteArrayOutputStream();
037            DataOutputStream dos = new DataOutputStream(baos);
038            marshal(command, dos);
039            dos.close();
040            return baos.toByteSequence();
041        }
042    
043        public Object unmarshal(ByteSequence packet) throws IOException {
044            ByteArrayInputStream stream = new ByteArrayInputStream(packet);
045            DataInputStream dis = new DataInputStream(stream);
046            return unmarshal(dis);
047        }
048    
049        public void marshal(Object command, DataOutput dataOut) throws IOException {
050            Buffer frame = (Buffer) command;
051            frame.writeTo(dataOut);
052        }
053    
054        boolean magicRead = false;
055        public Object unmarshal(DataInput dataIn) throws IOException {
056            if( !magicRead ) {
057                Buffer magic = new Buffer(8);
058                magic.readFrom(dataIn);
059                magicRead = true;
060                return new AmqpHeader(magic);
061            } else {
062                int size = dataIn.readInt();
063                if( size > maxFrameSize) {
064                    throw new AmqpProtocolException("Frame size exceeded max frame length.");
065                }
066                Buffer frame = new Buffer(size);
067                frame.bigEndianEditor().writeInt(size);
068                frame.readFrom(dataIn);
069                frame.clear();
070                return frame;
071            }
072        }
073    
074        /**
075         */
076        public void setVersion(int version) {
077            this.version = version;
078        }
079    
080        /**
081         * @return the version of the wire format
082         */
083        public int getVersion() {
084            return this.version;
085        }
086    
087    
088        public long getMaxFrameSize() {
089            return maxFrameSize;
090        }
091    
092        public void setMaxFrameSize(long maxFrameSize) {
093            this.maxFrameSize = maxFrameSize;
094        }
095    }