001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.openwire;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.lang.reflect.Method;
023import java.util.HashMap;
024import java.util.Map;
025
026import org.apache.activemq.command.CommandTypes;
027import org.apache.activemq.command.DataStructure;
028import org.apache.activemq.command.WireFormatInfo;
029import org.apache.activemq.util.ByteSequence;
030import org.apache.activemq.util.ByteSequenceData;
031import org.apache.activemq.util.DataByteArrayInputStream;
032import org.apache.activemq.util.DataByteArrayOutputStream;
033import org.apache.activemq.util.IOExceptionSupport;
034import org.apache.activemq.wireformat.WireFormat;
035
036/**
037 *
038 *
039 */
040public final class OpenWireFormat implements WireFormat {
041
042    public static final int DEFAULT_STORE_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
043    public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
044    public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION;
045    public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
046
047    static final byte NULL_TYPE = CommandTypes.NULL;
048    private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
049    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
050
051    private DataStreamMarshaller dataMarshallers[];
052    private int version;
053    private boolean stackTraceEnabled;
054    private boolean tcpNoDelayEnabled;
055    private boolean cacheEnabled;
056    private boolean tightEncodingEnabled;
057    private boolean sizePrefixDisabled;
058    private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
059
060    // The following fields are used for value caching
061    private short nextMarshallCacheIndex;
062    private short nextMarshallCacheEvictionIndex;
063    private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
064    private DataStructure marshallCache[] = null;
065    private DataStructure unmarshallCache[] = null;
066    private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
067    private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
068    private WireFormatInfo preferedWireFormatInfo;
069
070    public OpenWireFormat() {
071        this(DEFAULT_STORE_VERSION);
072    }
073
074    public OpenWireFormat(int i) {
075        setVersion(i);
076    }
077
078    @Override
079    public int hashCode() {
080        return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
081               ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
082               ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
083               ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
084    }
085
086    public OpenWireFormat copy() {
087        OpenWireFormat answer = new OpenWireFormat(version);
088        answer.stackTraceEnabled = stackTraceEnabled;
089        answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
090        answer.cacheEnabled = cacheEnabled;
091        answer.tightEncodingEnabled = tightEncodingEnabled;
092        answer.sizePrefixDisabled = sizePrefixDisabled;
093        answer.preferedWireFormatInfo = preferedWireFormatInfo;
094        return answer;
095    }
096
097    @Override
098    public boolean equals(Object object) {
099        if (object == null) {
100            return false;
101        }
102        OpenWireFormat o = (OpenWireFormat)object;
103        return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
104               && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
105               && o.sizePrefixDisabled == sizePrefixDisabled;
106    }
107
108
109    @Override
110    public String toString() {
111        return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
112               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
113        // return "OpenWireFormat{id="+id+",
114        // tightEncodingEnabled="+tightEncodingEnabled+"}";
115    }
116
117    @Override
118    public int getVersion() {
119        return version;
120    }
121
122    @Override
123    public synchronized ByteSequence marshal(Object command) throws IOException {
124
125        if (cacheEnabled) {
126            runMarshallCacheEvictionSweep();
127        }
128
129        ByteSequence sequence = null;
130        int size = 1;
131        if (command != null) {
132
133            DataStructure c = (DataStructure)command;
134            byte type = c.getDataStructureType();
135            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
136            if (dsm == null) {
137                throw new IOException("Unknown data type: " + type);
138            }
139            if (tightEncodingEnabled) {
140
141                BooleanStream bs = new BooleanStream();
142                size += dsm.tightMarshal1(this, c, bs);
143                size += bs.marshalledSize();
144
145                bytesOut.restart(size);
146                if (!sizePrefixDisabled) {
147                    bytesOut.writeInt(size);
148                }
149                bytesOut.writeByte(type);
150                bs.marshal(bytesOut);
151                dsm.tightMarshal2(this, c, bytesOut, bs);
152                sequence = bytesOut.toByteSequence();
153
154            } else {
155                bytesOut.restart();
156                if (!sizePrefixDisabled) {
157                    bytesOut.writeInt(0); // we don't know the final size
158                    // yet but write this here for
159                    // now.
160                }
161                bytesOut.writeByte(type);
162                dsm.looseMarshal(this, c, bytesOut);
163                sequence = bytesOut.toByteSequence();
164
165                if (!sizePrefixDisabled) {
166                    size = sequence.getLength() - 4;
167                    int pos = sequence.offset;
168                    ByteSequenceData.writeIntBig(sequence, size);
169                    sequence.offset = pos;
170                }
171            }
172
173        } else {
174            bytesOut.restart(5);
175            bytesOut.writeInt(size);
176            bytesOut.writeByte(NULL_TYPE);
177            sequence = bytesOut.toByteSequence();
178        }
179
180        return sequence;
181    }
182
183    @Override
184    public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
185        bytesIn.restart(sequence);
186        // DataInputStream dis = new DataInputStream(new
187        // ByteArrayInputStream(sequence));
188
189        if (!sizePrefixDisabled) {
190            int size = bytesIn.readInt();
191            if (sequence.getLength() - 4 != size) {
192                // throw new IOException("Packet size does not match marshaled
193                // size");
194            }
195
196            if (size > maxFrameSize) {
197                throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
198            }
199        }
200
201        Object command = doUnmarshal(bytesIn);
202        // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
203        // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
204        // }
205        return command;
206    }
207
208    @Override
209    public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
210
211        if (cacheEnabled) {
212            runMarshallCacheEvictionSweep();
213        }
214
215        int size = 1;
216        if (o != null) {
217
218            DataStructure c = (DataStructure)o;
219            byte type = c.getDataStructureType();
220            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
221            if (dsm == null) {
222                throw new IOException("Unknown data type: " + type);
223            }
224            if (tightEncodingEnabled) {
225                BooleanStream bs = new BooleanStream();
226                size += dsm.tightMarshal1(this, c, bs);
227                size += bs.marshalledSize();
228
229                if (!sizePrefixDisabled) {
230                    dataOut.writeInt(size);
231                }
232
233                dataOut.writeByte(type);
234                bs.marshal(dataOut);
235                dsm.tightMarshal2(this, c, dataOut, bs);
236
237            } else {
238                DataOutput looseOut = dataOut;
239
240                if (!sizePrefixDisabled) {
241                    bytesOut.restart();
242                    looseOut = bytesOut;
243                }
244
245                looseOut.writeByte(type);
246                dsm.looseMarshal(this, c, looseOut);
247
248                if (!sizePrefixDisabled) {
249                    ByteSequence sequence = bytesOut.toByteSequence();
250                    dataOut.writeInt(sequence.getLength());
251                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
252                }
253
254            }
255
256        } else {
257            if (!sizePrefixDisabled) {
258                dataOut.writeInt(size);
259            }
260            dataOut.writeByte(NULL_TYPE);
261        }
262    }
263
264    @Override
265    public Object unmarshal(DataInput dis) throws IOException {
266        DataInput dataIn = dis;
267        if (!sizePrefixDisabled) {
268            int size = dis.readInt();
269            if (size > maxFrameSize) {
270                throw IOExceptionSupport.createFrameSizeException(size, maxFrameSize);
271            }
272            // int size = dis.readInt();
273            // byte[] data = new byte[size];
274            // dis.readFully(data);
275            // bytesIn.restart(data);
276            // dataIn = bytesIn;
277        }
278        return doUnmarshal(dataIn);
279    }
280
281    /**
282     * Used by NIO or AIO transports
283     */
284    public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
285        int size = 1;
286        if (o != null) {
287            DataStructure c = (DataStructure)o;
288            byte type = c.getDataStructureType();
289            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
290            if (dsm == null) {
291                throw new IOException("Unknown data type: " + type);
292            }
293
294            size += dsm.tightMarshal1(this, c, bs);
295            size += bs.marshalledSize();
296        }
297        return size;
298    }
299
300    /**
301     * Used by NIO or AIO transports; note that the size is not written as part
302     * of this method.
303     */
304    public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
305        if (cacheEnabled) {
306            runMarshallCacheEvictionSweep();
307        }
308
309        if (o != null) {
310            DataStructure c = (DataStructure)o;
311            byte type = c.getDataStructureType();
312            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
313            if (dsm == null) {
314                throw new IOException("Unknown data type: " + type);
315            }
316            ds.writeByte(type);
317            bs.marshal(ds);
318            dsm.tightMarshal2(this, c, ds, bs);
319        }
320    }
321
322    /**
323     * Allows you to dynamically switch the version of the openwire protocol
324     * being used.
325     *
326     * @param version
327     */
328    @Override
329    public void setVersion(int version) {
330        String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
331        Class mfClass;
332        try {
333            mfClass = Class.forName(mfName, false, getClass().getClassLoader());
334        } catch (ClassNotFoundException e) {
335            throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
336                                                                         + ", could not load " + mfName)
337                .initCause(e);
338        }
339        try {
340            Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
341            dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
342        } catch (Throwable e) {
343            throw (IllegalArgumentException)new IllegalArgumentException(
344                                                                         "Invalid version: "
345                                                                             + version
346                                                                             + ", "
347                                                                             + mfName
348                                                                             + " does not properly implement the createMarshallerMap method.")
349                .initCause(e);
350        }
351        this.version = version;
352    }
353
354    public Object doUnmarshal(DataInput dis) throws IOException {
355        byte dataType = dis.readByte();
356        if (dataType != NULL_TYPE) {
357            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
358            if (dsm == null) {
359                throw new IOException("Unknown data type: " + dataType);
360            }
361            Object data = dsm.createObject();
362            if (this.tightEncodingEnabled) {
363                BooleanStream bs = new BooleanStream();
364                bs.unmarshal(dis);
365                dsm.tightUnmarshal(this, data, dis, bs);
366            } else {
367                dsm.looseUnmarshal(this, data, dis);
368            }
369            return data;
370        } else {
371            return null;
372        }
373    }
374
375    // public void debug(String msg) {
376    // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
377    // System.out.println(t+": "+msg);
378    // }
379    public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
380        bs.writeBoolean(o != null);
381        if (o == null) {
382            return 0;
383        }
384
385        if (o.isMarshallAware()) {
386            // MarshallAware ma = (MarshallAware)o;
387            ByteSequence sequence = null;
388            // sequence=ma.getCachedMarshalledForm(this);
389            bs.writeBoolean(sequence != null);
390            if (sequence != null) {
391                return 1 + sequence.getLength();
392            }
393        }
394
395        byte type = o.getDataStructureType();
396        DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
397        if (dsm == null) {
398            throw new IOException("Unknown data type: " + type);
399        }
400        return 1 + dsm.tightMarshal1(this, o, bs);
401    }
402
403    public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
404        throws IOException {
405        if (!bs.readBoolean()) {
406            return;
407        }
408
409        byte type = o.getDataStructureType();
410        ds.writeByte(type);
411
412        if (o.isMarshallAware() && bs.readBoolean()) {
413
414            // We should not be doing any caching
415            throw new IOException("Corrupted stream");
416            // MarshallAware ma = (MarshallAware) o;
417            // ByteSequence sequence=ma.getCachedMarshalledForm(this);
418            // ds.write(sequence.getData(), sequence.getOffset(),
419            // sequence.getLength());
420
421        } else {
422
423            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
424            if (dsm == null) {
425                throw new IOException("Unknown data type: " + type);
426            }
427            dsm.tightMarshal2(this, o, ds, bs);
428
429        }
430    }
431
432    public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
433        if (bs.readBoolean()) {
434
435            byte dataType = dis.readByte();
436            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
437            if (dsm == null) {
438                throw new IOException("Unknown data type: " + dataType);
439            }
440            DataStructure data = dsm.createObject();
441
442            if (data.isMarshallAware() && bs.readBoolean()) {
443
444                dis.readInt();
445                dis.readByte();
446
447                BooleanStream bs2 = new BooleanStream();
448                bs2.unmarshal(dis);
449                dsm.tightUnmarshal(this, data, dis, bs2);
450
451                // TODO: extract the sequence from the dis and associate it.
452                // MarshallAware ma = (MarshallAware)data
453                // ma.setCachedMarshalledForm(this, sequence);
454
455            } else {
456                dsm.tightUnmarshal(this, data, dis, bs);
457            }
458
459            return data;
460        } else {
461            return null;
462        }
463    }
464
465    public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
466        if (dis.readBoolean()) {
467
468            byte dataType = dis.readByte();
469            DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
470            if (dsm == null) {
471                throw new IOException("Unknown data type: " + dataType);
472            }
473            DataStructure data = dsm.createObject();
474            dsm.looseUnmarshal(this, data, dis);
475            return data;
476
477        } else {
478            return null;
479        }
480    }
481
482    public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
483        dataOut.writeBoolean(o != null);
484        if (o != null) {
485            byte type = o.getDataStructureType();
486            dataOut.writeByte(type);
487            DataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
488            if (dsm == null) {
489                throw new IOException("Unknown data type: " + type);
490            }
491            dsm.looseMarshal(this, o, dataOut);
492        }
493    }
494
495    public void runMarshallCacheEvictionSweep() {
496        // Do we need to start evicting??
497        while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
498
499            marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
500            marshallCache[nextMarshallCacheEvictionIndex] = null;
501
502            nextMarshallCacheEvictionIndex++;
503            if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
504                nextMarshallCacheEvictionIndex = 0;
505            }
506
507        }
508    }
509
510    public Short getMarshallCacheIndex(DataStructure o) {
511        return marshallCacheMap.get(o);
512    }
513
514    public Short addToMarshallCache(DataStructure o) {
515        short i = nextMarshallCacheIndex++;
516        if (nextMarshallCacheIndex >= marshallCache.length) {
517            nextMarshallCacheIndex = 0;
518        }
519
520        // We can only cache that item if there is space left.
521        if (marshallCacheMap.size() < marshallCache.length) {
522            marshallCache[i] = o;
523            Short index = new Short(i);
524            marshallCacheMap.put(o, index);
525            return index;
526        } else {
527            // Use -1 to indicate that the value was not cached due to cache
528            // being full.
529            return new Short((short)-1);
530        }
531    }
532
533    public void setInUnmarshallCache(short index, DataStructure o) {
534
535        // There was no space left in the cache, so we can't
536        // put this in the cache.
537        if (index == -1) {
538            return;
539        }
540
541        unmarshallCache[index] = o;
542    }
543
544    public DataStructure getFromUnmarshallCache(short index) {
545        return unmarshallCache[index];
546    }
547
548    public void setStackTraceEnabled(boolean b) {
549        stackTraceEnabled = b;
550    }
551
552    public boolean isStackTraceEnabled() {
553        return stackTraceEnabled;
554    }
555
556    public boolean isTcpNoDelayEnabled() {
557        return tcpNoDelayEnabled;
558    }
559
560    public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
561        this.tcpNoDelayEnabled = tcpNoDelayEnabled;
562    }
563
564    public boolean isCacheEnabled() {
565        return cacheEnabled;
566    }
567
568    public void setCacheEnabled(boolean cacheEnabled) {
569        if(cacheEnabled){
570            marshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
571            unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
572        }
573        this.cacheEnabled = cacheEnabled;
574    }
575
576    public boolean isTightEncodingEnabled() {
577        return tightEncodingEnabled;
578    }
579
580    public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
581        this.tightEncodingEnabled = tightEncodingEnabled;
582    }
583
584    public boolean isSizePrefixDisabled() {
585        return sizePrefixDisabled;
586    }
587
588    public void setSizePrefixDisabled(boolean prefixPacketSize) {
589        this.sizePrefixDisabled = prefixPacketSize;
590    }
591
592    public void setPreferedWireFormatInfo(WireFormatInfo info) {
593        this.preferedWireFormatInfo = info;
594    }
595
596    public WireFormatInfo getPreferedWireFormatInfo() {
597        return preferedWireFormatInfo;
598    }
599
600    public long getMaxFrameSize() {
601        return maxFrameSize;
602    }
603
604    public void setMaxFrameSize(long maxFrameSize) {
605        this.maxFrameSize = maxFrameSize;
606    }
607
608    public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
609
610        if (preferedWireFormatInfo == null) {
611            throw new IllegalStateException("Wireformat cannot not be renegotiated.");
612        }
613
614        this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
615        info.setVersion(this.getVersion());
616
617        this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
618        info.setMaxFrameSize(this.getMaxFrameSize());
619
620        this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
621        info.setStackTraceEnabled(this.stackTraceEnabled);
622
623        this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
624        info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
625
626        this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
627        info.setCacheEnabled(this.cacheEnabled);
628
629        this.tightEncodingEnabled = info.isTightEncodingEnabled()
630                                    && preferedWireFormatInfo.isTightEncodingEnabled();
631        info.setTightEncodingEnabled(this.tightEncodingEnabled);
632
633        this.sizePrefixDisabled = info.isSizePrefixDisabled()
634                                  && preferedWireFormatInfo.isSizePrefixDisabled();
635        info.setSizePrefixDisabled(this.sizePrefixDisabled);
636
637        if (cacheEnabled) {
638
639            int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
640            info.setCacheSize(size);
641
642            if (size == 0) {
643                size = MARSHAL_CACHE_SIZE;
644            }
645
646            marshallCache = new DataStructure[size];
647            unmarshallCache = new DataStructure[size];
648            nextMarshallCacheIndex = 0;
649            nextMarshallCacheEvictionIndex = 0;
650            marshallCacheMap = new HashMap<DataStructure, Short>();
651        } else {
652            marshallCache = null;
653            unmarshallCache = null;
654            nextMarshallCacheIndex = 0;
655            nextMarshallCacheEvictionIndex = 0;
656            marshallCacheMap = null;
657        }
658
659    }
660
661    protected int min(int version1, int version2) {
662        if (version1 < version2 && version1 > 0 || version2 <= 0) {
663            return version1;
664        }
665        return version2;
666    }
667
668    protected long min(long version1, long version2) {
669        if (version1 < version2 && version1 > 0 || version2 <= 0) {
670            return version1;
671        }
672        return version2;
673    }
674}