001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.DataInputStream;
021    import java.io.IOException;
022    
023    import org.apache.activemq.command.Command;
024    import org.apache.activemq.command.LastPartialCommand;
025    import org.apache.activemq.command.PartialCommand;
026    import org.apache.activemq.openwire.OpenWireFormat;
027    import org.apache.activemq.util.ByteArrayInputStream;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * Joins together of partial commands which were split into individual chunks of
033     * data.
034     * 
035     * 
036     */
037    public class CommandJoiner extends TransportFilter {
038        private static final Logger LOG = LoggerFactory.getLogger(CommandJoiner.class);
039    
040        private ByteArrayOutputStream out = new ByteArrayOutputStream();
041        private OpenWireFormat wireFormat;
042    
043        public CommandJoiner(Transport next, OpenWireFormat wireFormat) {
044            super(next);
045            this.wireFormat = wireFormat;
046        }
047    
048        public void onCommand(Object o) {
049            Command command = (Command)o;
050            byte type = command.getDataStructureType();
051            if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
052                PartialCommand header = (PartialCommand)command;
053                byte[] partialData = header.getData();
054                try {
055                    out.write(partialData);
056                } catch (IOException e) {
057                    getTransportListener().onException(e);
058                }
059                if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
060                    try {
061                        byte[] fullData = out.toByteArray();
062                        out.reset();
063                        DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(fullData));
064                        Command completeCommand = (Command)wireFormat.unmarshal(dataIn);
065    
066                        LastPartialCommand lastCommand = (LastPartialCommand)command;
067                        lastCommand.configure(completeCommand);
068    
069                        getTransportListener().onCommand(completeCommand);
070                    } catch (IOException e) {
071                        LOG.warn("Failed to unmarshal partial command: " + command);
072                        getTransportListener().onException(e);
073                    }
074                }
075            } else {
076                getTransportListener().onCommand(command);
077            }
078        }
079    
080        public void stop() throws Exception {
081            super.stop();
082            out = null;
083        }
084    
085        public String toString() {
086            return next.toString();
087        }
088    }