001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.openwire;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.lang.reflect.Method;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.CommandTypes;
027import org.apache.activemq.command.DataStructure;
028import org.apache.activemq.command.WireFormatInfo;
029import org.apache.activemq.util.ByteSequence;
030import org.apache.activemq.util.ByteSequenceData;
031import org.apache.activemq.util.DataByteArrayInputStream;
032import org.apache.activemq.util.DataByteArrayOutputStream;
033import org.apache.activemq.wireformat.WireFormat;
034
035/**
036 *
037 *
038 */
039public final class OpenWireFormat implements WireFormat {
040
041    public static final int DEFAULT_STORE_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
042    public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
043    public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION;
044    public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
045
046    static final byte NULL_TYPE = CommandTypes.NULL;
047    private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
048    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
049
050    private DataStreamMarshaller dataMarshallers[];
051    private int version;
052    private boolean stackTraceEnabled;
053    private boolean tcpNoDelayEnabled;
054    private boolean cacheEnabled;
055    private boolean tightEncodingEnabled;
056    private boolean sizePrefixDisabled;
057    private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
058
059    // The following fields are used for value caching
060    private short nextMarshallCacheIndex;
061    private short nextMarshallCacheEvictionIndex;
062    private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
063    private DataStructure marshallCache[] = null;
064    private DataStructure unmarshallCache[] = null;
065    private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
066    private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
067    private WireFormatInfo preferedWireFormatInfo;
068
069    public OpenWireFormat() {
070        this(DEFAULT_STORE_VERSION);
071    }
072
073    public OpenWireFormat(int i) {
074        setVersion(i);
075    }
076
077    @Override
078    public int hashCode() {
079        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
080               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
081               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
082               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
083    }
084
085    public OpenWireFormat copy() {
086        OpenWireFormat answer = new OpenWireFormat(version);
087        answer.stackTraceEnabled = stackTraceEnabled;
088        answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
089        answer.cacheEnabled = cacheEnabled;
090        answer.tightEncodingEnabled = tightEncodingEnabled;
091        answer.sizePrefixDisabled = sizePrefixDisabled;
092        answer.preferedWireFormatInfo = preferedWireFormatInfo;
093        return answer;
094    }
095
096    @Override
097    public boolean equals(Object object) {
098        if (object == null) {
099            return false;
100        }
101        OpenWireFormat o = (OpenWireFormat)object;
102        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
103               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
104               && o.sizePrefixDisabled == sizePrefixDisabled;
105    }
106
107
108    @Override
109    public String toString() {
110        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
111               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
112        // return "OpenWireFormat{id="+id+",
113        // tightEncodingEnabled="+tightEncodingEnabled+"}";
114    }
115
116    @Override
117    public int getVersion() {
118        return version;
119    }
120
121    @Override
122    public synchronized ByteSequence marshal(Object command) throws IOException {
123
124        if (cacheEnabled) {
125            runMarshallCacheEvictionSweep();
126        }
127
128        ByteSequence sequence = null;
129        int size = 1;
130        if (command != null) {
131
132            DataStructure c = (DataStructure)command;
133            byte type = c.getDataStructureType();
134            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
135            if (dsm == null) {
136                throw new IOException("Unknown data type: " + type);
137            }
138            if (tightEncodingEnabled) {
139
140                BooleanStream bs = new BooleanStream();
141                size += dsm.tightMarshal1(this, c, bs);
142                size += bs.marshalledSize();
143
144                bytesOut.restart(size);
145                if (!sizePrefixDisabled) {
146                    bytesOut.writeInt(size);
147                }
148                bytesOut.writeByte(type);
149                bs.marshal(bytesOut);
150                dsm.tightMarshal2(this, c, bytesOut, bs);
151                sequence = bytesOut.toByteSequence();
152
153            } else {
154                bytesOut.restart();
155                if (!sizePrefixDisabled) {
156                    bytesOut.writeInt(0); // we don't know the final size
157                    // yet but write this here for
158                    // now.
159                }
160                bytesOut.writeByte(type);
161                dsm.looseMarshal(this, c, bytesOut);
162                sequence = bytesOut.toByteSequence();
163
164                if (!sizePrefixDisabled) {
165                    size = sequence.getLength() - 4;
166                    int pos = sequence.offset;
167                    ByteSequenceData.writeIntBig(sequence, size);
168                    sequence.offset = pos;
169                }
170            }
171
172        } else {
173            bytesOut.restart(5);
174            bytesOut.writeInt(size);
175            bytesOut.writeByte(NULL_TYPE);
176            sequence = bytesOut.toByteSequence();
177        }
178
179        return sequence;
180    }
181
182    @Override
183    public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
184        bytesIn.restart(sequence);
185        // DataInputStream dis = new DataInputStream(new
186        // ByteArrayInputStream(sequence));
187
188        if (!sizePrefixDisabled) {
189            int size = bytesIn.readInt();
190            if (sequence.getLength() - 4 != size) {
191                // throw new IOException("Packet size does not match marshaled
192                // size");
193            }
194
195            if (size > maxFrameSize) {
196                throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
197            }
198        }
199
200        Object command = doUnmarshal(bytesIn);
201        // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
202        // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
203        // }
204        return command;
205    }
206
207    @Override
208    public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
209
210        if (cacheEnabled) {
211            runMarshallCacheEvictionSweep();
212        }
213
214        int size = 1;
215        if (o != null) {
216
217            DataStructure c = (DataStructure)o;
218            byte type = c.getDataStructureType();
219            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
220            if (dsm == null) {
221                throw new IOException("Unknown data type: " + type);
222            }
223            if (tightEncodingEnabled) {
224                BooleanStream bs = new BooleanStream();
225                size += dsm.tightMarshal1(this, c, bs);
226                size += bs.marshalledSize();
227
228                if (!sizePrefixDisabled) {
229                    dataOut.writeInt(size);
230                }
231
232                dataOut.writeByte(type);
233                bs.marshal(dataOut);
234                dsm.tightMarshal2(this, c, dataOut, bs);
235
236            } else {
237                DataOutput looseOut = dataOut;
238
239                if (!sizePrefixDisabled) {
240                    bytesOut.restart();
241                    looseOut = bytesOut;
242                }
243
244                looseOut.writeByte(type);
245                dsm.looseMarshal(this, c, looseOut);
246
247                if (!sizePrefixDisabled) {
248                    ByteSequence sequence = bytesOut.toByteSequence();
249                    dataOut.writeInt(sequence.getLength());
250                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
251                }
252
253            }
254
255        } else {
256            if (!sizePrefixDisabled) {
257                dataOut.writeInt(size);
258            }
259            dataOut.writeByte(NULL_TYPE);
260        }
261    }
262
263    @Override
264    public Object unmarshal(DataInput dis) throws IOException {
265        DataInput dataIn = dis;
266        if (!sizePrefixDisabled) {
267            int size = dis.readInt();
268            if (size > maxFrameSize) {
269                throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
270            }
271            // int size = dis.readInt();
272            // byte[] data = new byte[size];
273            // dis.readFully(data);
274            // bytesIn.restart(data);
275            // dataIn = bytesIn;
276        }
277        return doUnmarshal(dataIn);
278    }
279
280    /**
281     * Used by NIO or AIO transports
282     */
283    public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
284        int size = 1;
285        if (o != null) {
286            DataStructure c = (DataStructure)o;
287            byte type = c.getDataStructureType();
288            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
289            if (dsm == null) {
290                throw new IOException("Unknown data type: " + type);
291            }
292
293            size += dsm.tightMarshal1(this, c, bs);
294            size += bs.marshalledSize();
295        }
296        return size;
297    }
298
299    /**
300     * Used by NIO or AIO transports; note that the size is not written as part
301     * of this method.
302     */
303    public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
304        if (cacheEnabled) {
305            runMarshallCacheEvictionSweep();
306        }
307
308        if (o != null) {
309            DataStructure c = (DataStructure)o;
310            byte type = c.getDataStructureType();
311            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
312            if (dsm == null) {
313                throw new IOException("Unknown data type: " + type);
314            }
315            ds.writeByte(type);
316            bs.marshal(ds);
317            dsm.tightMarshal2(this, c, ds, bs);
318        }
319    }
320
321    /**
322     * Allows you to dynamically switch the version of the openwire protocol
323     * being used.
324     *
325     * @param version
326     */
327    @Override
328    public void setVersion(int version) {
329        String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
330        Class mfClass;
331        try {
332            mfClass = Class.forName(mfName, false, getClass().getClassLoader());
333        } catch (ClassNotFoundException e) {
334            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
335                                                                         + ", could not load " + mfName)
336                .initCause(e);
337        }
338        try {
339            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
340            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
341        } catch (Throwable e) {
342            throw (IllegalArgumentException)new IllegalArgumentException(
343                                                                         "Invalid version: "
344                                                                             + version
345                                                                             + ", "
346                                                                             + mfName
347                                                                             + " does not properly implement the createMarshallerMap method.")
348                .initCause(e);
349        }
350        this.version = version;
351    }
352
353    public Object doUnmarshal(DataInput dis) throws IOException {
354        byte dataType = dis.readByte();
355        if (dataType != NULL_TYPE) {
356            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
357            if (dsm == null) {
358                throw new IOException("Unknown data type: " + dataType);
359            }
360            Object data = dsm.createObject();
361            if (this.tightEncodingEnabled) {
362                BooleanStream bs = new BooleanStream();
363                bs.unmarshal(dis);
364                dsm.tightUnmarshal(this, data, dis, bs);
365            } else {
366                dsm.looseUnmarshal(this, data, dis);
367            }
368            return data;
369        } else {
370            return null;
371        }
372    }
373
374    // public void debug(String msg) {
375    // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
376    // System.out.println(t+": "+msg);
377    // }
378    public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
379        bs.writeBoolean(o != null);
380        if (o == null) {
381            return 0;
382        }
383
384        if (o.isMarshallAware()) {
385            // MarshallAware ma = (MarshallAware)o;
386            ByteSequence sequence = null;
387            // sequence=ma.getCachedMarshalledForm(this);
388            bs.writeBoolean(sequence != null);
389            if (sequence != null) {
390                return 1 + sequence.getLength();
391            }
392        }
393
394        byte type = o.getDataStructureType();
395        DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
396        if (dsm == null) {
397            throw new IOException("Unknown data type: " + type);
398        }
399        return 1 + dsm.tightMarshal1(this, o, bs);
400    }
401
402    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
403        throws IOException {
404        if (!bs.readBoolean()) {
405            return;
406        }
407
408        byte type = o.getDataStructureType();
409        ds.writeByte(type);
410
411        if (o.isMarshallAware() && bs.readBoolean()) {
412
413            // We should not be doing any caching
414            throw new IOException("Corrupted stream");
415            // MarshallAware ma = (MarshallAware) o;
416            // ByteSequence sequence=ma.getCachedMarshalledForm(this);
417            // ds.write(sequence.getData(), sequence.getOffset(),
418            // sequence.getLength());
419
420        } else {
421
422            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
423            if (dsm == null) {
424                throw new IOException("Unknown data type: " + type);
425            }
426            dsm.tightMarshal2(this, o, ds, bs);
427
428        }
429    }
430
431    public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
432        if (bs.readBoolean()) {
433
434            byte dataType = dis.readByte();
435            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
436            if (dsm == null) {
437                throw new IOException("Unknown data type: " + dataType);
438            }
439            DataStructure data = dsm.createObject();
440
441            if (data.isMarshallAware() && bs.readBoolean()) {
442
443                dis.readInt();
444                dis.readByte();
445
446                BooleanStream bs2 = new BooleanStream();
447                bs2.unmarshal(dis);
448                dsm.tightUnmarshal(this, data, dis, bs2);
449
450                // TODO: extract the sequence from the dis and associate it.
451                // MarshallAware ma = (MarshallAware)data
452                // ma.setCachedMarshalledForm(this, sequence);
453
454            } else {
455                dsm.tightUnmarshal(this, data, dis, bs);
456            }
457
458            return data;
459        } else {
460            return null;
461        }
462    }
463
464    public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
465        if (dis.readBoolean()) {
466
467            byte dataType = dis.readByte();
468            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
469            if (dsm == null) {
470                throw new IOException("Unknown data type: " + dataType);
471            }
472            DataStructure data = dsm.createObject();
473            dsm.looseUnmarshal(this, data, dis);
474            return data;
475
476        } else {
477            return null;
478        }
479    }
480
481    public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
482        dataOut.writeBoolean(o != null);
483        if (o != null) {
484            byte type = o.getDataStructureType();
485            dataOut.writeByte(type);
486            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
487            if (dsm == null) {
488                throw new IOException("Unknown data type: " + type);
489            }
490            dsm.looseMarshal(this, o, dataOut);
491        }
492    }
493
494    public void runMarshallCacheEvictionSweep() {
495        // Do we need to start evicting??
496        while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
497
498            marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
499            marshallCache[nextMarshallCacheEvictionIndex] = null;
500
501            nextMarshallCacheEvictionIndex++;
502            if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
503                nextMarshallCacheEvictionIndex = 0;
504            }
505
506        }
507    }
508
509    public Short getMarshallCacheIndex(DataStructure o) {
510        return marshallCacheMap.get(o);
511    }
512
513    public Short addToMarshallCache(DataStructure o) {
514        short i = nextMarshallCacheIndex++;
515        if (nextMarshallCacheIndex >= marshallCache.length) {
516            nextMarshallCacheIndex = 0;
517        }
518
519        // We can only cache that item if there is space left.
520        if (marshallCacheMap.size() < marshallCache.length) {
521            marshallCache[i] = o;
522            Short index = new Short(i);
523            marshallCacheMap.put(o, index);
524            return index;
525        } else {
526            // Use -1 to indicate that the value was not cached due to cache
527            // being full.
528            return new Short((short)-1);
529        }
530    }
531
532    public void setInUnmarshallCache(short index, DataStructure o) {
533
534        // There was no space left in the cache, so we can't
535        // put this in the cache.
536        if (index == -1) {
537            return;
538        }
539
540        unmarshallCache[index] = o;
541    }
542
543    public DataStructure getFromUnmarshallCache(short index) {
544        return unmarshallCache[index];
545    }
546
547    public void setStackTraceEnabled(boolean b) {
548        stackTraceEnabled = b;
549    }
550
551    public boolean isStackTraceEnabled() {
552        return stackTraceEnabled;
553    }
554
555    public boolean isTcpNoDelayEnabled() {
556        return tcpNoDelayEnabled;
557    }
558
559    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
560        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
561    }
562
563    public boolean isCacheEnabled() {
564        return cacheEnabled;
565    }
566
567    public void setCacheEnabled(boolean cacheEnabled) {
568        if(cacheEnabled){
569            marshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
570            unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
571        }
572        this.cacheEnabled = cacheEnabled;
573    }
574
575    public boolean isTightEncodingEnabled() {
576        return tightEncodingEnabled;
577    }
578
579    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
580        this.tightEncodingEnabled = tightEncodingEnabled;
581    }
582
583    public boolean isSizePrefixDisabled() {
584        return sizePrefixDisabled;
585    }
586
587    public void setSizePrefixDisabled(boolean prefixPacketSize) {
588        this.sizePrefixDisabled = prefixPacketSize;
589    }
590
591    public void setPreferedWireFormatInfo(WireFormatInfo info) {
592        this.preferedWireFormatInfo = info;
593    }
594
595    public WireFormatInfo getPreferedWireFormatInfo() {
596        return preferedWireFormatInfo;
597    }
598
599    public long getMaxFrameSize() {
600        return maxFrameSize;
601    }
602
603    public void setMaxFrameSize(long maxFrameSize) {
604        this.maxFrameSize = maxFrameSize;
605    }
606
607    public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
608
609        if (preferedWireFormatInfo == null) {
610            throw new IllegalStateException("Wireformat cannot not be renegotiated.");
611        }
612
613        this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
614        info.setVersion(this.getVersion());
615
616        this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
617        info.setMaxFrameSize(this.getMaxFrameSize());
618
619        this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
620        info.setStackTraceEnabled(this.stackTraceEnabled);
621
622        this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
623        info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
624
625        this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
626        info.setCacheEnabled(this.cacheEnabled);
627
628        this.tightEncodingEnabled = info.isTightEncodingEnabled()
629                                    && preferedWireFormatInfo.isTightEncodingEnabled();
630        info.setTightEncodingEnabled(this.tightEncodingEnabled);
631
632        this.sizePrefixDisabled = info.isSizePrefixDisabled()
633                                  && preferedWireFormatInfo.isSizePrefixDisabled();
634        info.setSizePrefixDisabled(this.sizePrefixDisabled);
635
636        if (cacheEnabled) {
637
638            int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
639            info.setCacheSize(size);
640
641            if (size == 0) {
642                size = MARSHAL_CACHE_SIZE;
643            }
644
645            marshallCache = new DataStructure[size];
646            unmarshallCache = new DataStructure[size];
647            nextMarshallCacheIndex = 0;
648            nextMarshallCacheEvictionIndex = 0;
649            marshallCacheMap = new HashMap<DataStructure, Short>();
650        } else {
651            marshallCache = null;
652            unmarshallCache = null;
653            nextMarshallCacheIndex = 0;
654            nextMarshallCacheEvictionIndex = 0;
655            marshallCacheMap = null;
656        }
657
658    }
659
660    protected int min(int version1, int version2) {
661        if (version1 < version2 && version1 > 0 || version2 <= 0) {
662            return version1;
663        }
664        return version2;
665    }
666
667    protected long min(long version1, long version2) {
668        if (version1 < version2 && version1 > 0 || version2 <= 0) {
669            return version1;
670        }
671        return version2;
672    }
673}