001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.store.amq;
019    
020    import java.io.DataInput;
021    import java.io.DataOutput;
022    import java.io.IOException;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.command.ActiveMQDestination;
025    import org.apache.activemq.command.JournalTopicAck;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.command.MessageAck;
028    import org.apache.activemq.kaha.impl.async.Location;
029    import org.apache.activemq.util.ByteSequence;
030    import org.apache.activemq.wireformat.WireFormat;
031    
032    /**
033     */
034    public class AMQTxOperation {
035    
036        public static final byte ADD_OPERATION_TYPE = 0;
037        public static final byte REMOVE_OPERATION_TYPE = 1;
038        public static final byte ACK_OPERATION_TYPE = 3;
039        private byte operationType;
040        private ActiveMQDestination destination;
041        private Object data;
042        private Location location;
043    
044        public AMQTxOperation() {
045        }
046    
047        public AMQTxOperation(byte operationType, ActiveMQDestination destination, Object data, Location location) {
048            this.operationType = operationType;
049            this.destination = destination;
050            this.data = data;
051            this.location = location;
052    
053        }
054    
055        /**
056         * @return the data
057         */
058        public Object getData() {
059            return this.data;
060        }
061    
062        /**
063         * @param data the data to set
064         */
065        public void setData(Object data) {
066            this.data = data;
067        }
068    
069        /**
070         * @return the location
071         */
072        public Location getLocation() {
073            return this.location;
074        }
075    
076        /**
077         * @param location the location to set
078         */
079        public void setLocation(Location location) {
080            this.location = location;
081        }
082    
083        /**
084         * @return the operationType
085         */
086        public byte getOperationType() {
087            return this.operationType;
088        }
089    
090        /**
091         * @param operationType the operationType to set
092         */
093        public void setOperationType(byte operationType) {
094            this.operationType = operationType;
095        }
096    
097        public boolean replay(AMQPersistenceAdapter adapter, ConnectionContext context) throws IOException {
098            boolean result = false;
099            AMQMessageStore store = (AMQMessageStore)adapter.createMessageStore(destination);
100            if (operationType == ADD_OPERATION_TYPE) {
101                result = store.replayAddMessage(context, (Message)data, location);
102            } else if (operationType == REMOVE_OPERATION_TYPE) {
103                result = store.replayRemoveMessage(context, (MessageAck)data);
104            } else {
105                JournalTopicAck ack = (JournalTopicAck)data;
106                result = ((AMQTopicMessageStore)store).replayAcknowledge(context, ack.getClientId(), ack
107                    .getSubscritionName(), ack.getMessageId());
108            }
109            return result;
110        }
111    
112        public void writeExternal(WireFormat wireFormat, DataOutput dos) throws IOException {
113            location.writeExternal(dos);
114            ByteSequence packet = wireFormat.marshal(getData());
115            dos.writeInt(packet.length);
116            dos.write(packet.data, packet.offset, packet.length);
117            packet = wireFormat.marshal(destination);
118            dos.writeInt(packet.length);
119            dos.write(packet.data, packet.offset, packet.length);
120        }
121    
122        public void readExternal(WireFormat wireFormat, DataInput dis) throws IOException {
123            this.location = new Location();
124            this.location.readExternal(dis);
125            int size = dis.readInt();
126            byte[] data = new byte[size];
127            dis.readFully(data);
128            setData(wireFormat.unmarshal(new ByteSequence(data)));
129            size = dis.readInt();
130            data = new byte[size];
131            dis.readFully(data);
132            this.destination = (ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
133        }
134    }