001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.message;
018
019import java.nio.ByteBuffer;
020
021import org.apache.qpid.proton.codec.ReadableBuffer;
022import org.apache.qpid.proton.codec.WritableBuffer;
023
024public class AmqpWritableBuffer implements WritableBuffer {
025
026    public final static int DEFAULT_CAPACITY = 4 * 1024;
027
028    byte buffer[];
029    int position;
030
031   /**
032    * Creates a new WritableBuffer with default capacity.
033    */
034   public AmqpWritableBuffer() {
035       this(DEFAULT_CAPACITY);
036   }
037
038    /**
039     * Create a new WritableBuffer with the given capacity.
040     */
041    public AmqpWritableBuffer(int capacity) {
042        this.buffer = new byte[capacity];
043    }
044
045    public byte[] getArray() {
046        return buffer;
047    }
048
049    public int getArrayLength() {
050        return position;
051    }
052
053    @Override
054    public void put(byte b) {
055        int newPosition = position + 1;
056        ensureCapacity(newPosition);
057        buffer[position] = b;
058        position = newPosition;
059    }
060
061    @Override
062    public void putShort(short value) {
063        ensureCapacity(position + 2);
064        buffer[position++] = (byte)(value >>> 8);
065        buffer[position++] = (byte)(value >>> 0);
066    }
067
068    @Override
069    public void putInt(int value) {
070        ensureCapacity(position + 4);
071        buffer[position++] = (byte)(value >>> 24);
072        buffer[position++] = (byte)(value >>> 16);
073        buffer[position++] = (byte)(value >>> 8);
074        buffer[position++] = (byte)(value >>> 0);
075    }
076
077    @Override
078    public void putLong(long value) {
079        ensureCapacity(position + 8);
080        buffer[position++] = (byte)(value >>> 56);
081        buffer[position++] = (byte)(value >>> 48);
082        buffer[position++] = (byte)(value >>> 40);
083        buffer[position++] = (byte)(value >>> 32);
084        buffer[position++] = (byte)(value >>> 24);
085        buffer[position++] = (byte)(value >>> 16);
086        buffer[position++] = (byte)(value >>> 8);
087        buffer[position++] = (byte)(value >>> 0);
088    }
089
090    @Override
091    public void putFloat(float value) {
092        putInt(Float.floatToRawIntBits(value));
093    }
094
095    @Override
096    public void putDouble(double value) {
097        putLong(Double.doubleToRawLongBits(value));
098    }
099
100    @Override
101    public void put(byte[] src, int offset, int length) {
102        if (length == 0) {
103            return;
104        }
105
106        int newPosition = position + length;
107        ensureCapacity(newPosition);
108        System.arraycopy(src, offset, buffer, position, length);
109        position = newPosition;
110    }
111
112    @Override
113    public boolean hasRemaining() {
114        return position < Integer.MAX_VALUE;
115    }
116
117    @Override
118    public int remaining() {
119        return Integer.MAX_VALUE - position;
120    }
121
122    @Override
123    public int position() {
124        return position;
125    }
126
127    @Override
128    public void position(int position) {
129        ensureCapacity(position);
130        this.position = position;
131    }
132
133    @Override
134    public void put(ByteBuffer payload) {
135        int newPosition = position + payload.remaining();
136        ensureCapacity(newPosition);
137        while (payload.hasRemaining()) {
138            buffer[position++] = payload.get();
139        }
140
141        position = newPosition;
142    }
143
144    @Override
145    public int limit() {
146        return Integer.MAX_VALUE;
147    }
148
149    @Override
150    public void put(ReadableBuffer src) {
151        ensureCapacity(position);
152        src.get(this);
153    }
154
155    /**
156     * Ensures the the buffer has at least the minimumCapacity specified.
157     *
158     * @param minimumCapacity
159     *      the minimum capacity needed to meet the next write operation.
160     */
161    private void ensureCapacity(int minimumCapacity) {
162        if (minimumCapacity > buffer.length) {
163            byte newBuffer[] = new byte[Math.max(buffer.length << 1, minimumCapacity)];
164            System.arraycopy(buffer, 0, newBuffer, 0, position);
165            buffer = newBuffer;
166        }
167    }
168}