001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import org.apache.activemq.broker.BrokerPluginSupport;
020    import org.apache.activemq.broker.ProducerBrokerExchange;
021    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
022    import org.apache.activemq.command.ActiveMQDestination;
023    import org.apache.activemq.command.ActiveMQMessage;
024    import org.apache.activemq.command.Message;
025    import org.slf4j.Logger;
026    import org.slf4j.LoggerFactory;
027    
028    /**
029     * A Broker interceptor which updates a JMS Client's timestamp on the message
030     * with a broker timestamp. Useful when the clocks on client machines are known
031     * to not be correct and you can only trust the time set on the broker machines.
032     *
033     * Enabling this plugin will break JMS compliance since the timestamp that the
034     * producer sees on the messages after as send() will be different from the
035     * timestamp the consumer will observe when he receives the message. This plugin
036     * is not enabled in the default ActiveMQ configuration.
037     *
038     * 2 new attributes have been added which will allow the administrator some override control
039     * over the expiration time for incoming messages:
040     *
041     * Attribute 'zeroExpirationOverride' can be used to apply an expiration
042     * time to incoming messages with no expiration defined (messages that would never expire)
043     *
044     * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
045     *
046     * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
047     *
048     *
049     */
050    public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
051        private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
052        /**
053        * variable which (when non-zero) is used to override
054        * the expiration date for messages that arrive with
055        * no expiration date set (in Milliseconds).
056        */
057        long zeroExpirationOverride = 0;
058    
059        /**
060        * variable which (when non-zero) is used to limit
061        * the expiration date (in Milliseconds).
062        */
063        long ttlCeiling = 0;
064    
065        /**
066         * If true, the plugin will not update timestamp to past values
067         * False by default
068         */
069        boolean futureOnly = false;
070    
071    
072        /**
073         * if true, update timestamp even if message has passed through a network
074         * default false
075         */
076        boolean processNetworkMessages = false;
077    
078        /**
079        * setter method for zeroExpirationOverride
080        */
081        public void setZeroExpirationOverride(long ttl)
082        {
083            this.zeroExpirationOverride = ttl;
084        }
085    
086        /**
087        * setter method for ttlCeiling
088        */
089        public void setTtlCeiling(long ttlCeiling)
090        {
091            this.ttlCeiling = ttlCeiling;
092        }
093    
094        public void setFutureOnly(boolean futureOnly) {
095            this.futureOnly = futureOnly;
096        }
097    
098        public void setProcessNetworkMessages(Boolean processNetworkMessages) {
099            this.processNetworkMessages = processNetworkMessages;
100        }
101    
102        @Override
103        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
104    
105            if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
106               (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
107                // timestamp not been disabled and has not passed through a network or processNetworkMessages=true
108    
109                long oldExpiration = message.getExpiration();
110                long newTimeStamp = System.currentTimeMillis();
111                long timeToLive = zeroExpirationOverride;
112                long oldTimestamp = message.getTimestamp();
113                if (oldExpiration > 0) {
114                    timeToLive = oldExpiration - oldTimestamp;
115                }
116                if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
117                    timeToLive = ttlCeiling;
118                }
119                long expiration = timeToLive + newTimeStamp;
120                // In the scenario that the Broker is behind the clients we never want to set the
121                // Timestamp and Expiration in the past
122                if(!futureOnly || (expiration > oldExpiration)) {
123                    if (timeToLive > 0 && expiration > 0) {
124                        message.setExpiration(expiration);
125                    }
126                    message.setTimestamp(newTimeStamp);
127                    if (LOG.isDebugEnabled()) {
128                        LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
129                    }
130                }
131            }
132            super.send(producerExchange, message);
133        }
134    
135        private boolean isDestinationDLQ(Message message) {
136            DeadLetterStrategy deadLetterStrategy;
137            Message tmp;
138    
139            if (message != null && message.getRegionDestination() != null) {
140                deadLetterStrategy = message.getRegionDestination().getDeadLetterStrategy();
141                if (deadLetterStrategy != null) {
142                    // Cheap copy, since we only need two fields
143                    tmp = new ActiveMQMessage();
144                    tmp.setDestination(message.getOriginalDestination());
145                    tmp.setRegionDestination(message.getRegionDestination());
146    
147                    // Determine if we are headed for a DLQ
148                    ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null);
149                    if (deadLetterDestination.equals(message.getDestination())) {
150                        return true;
151                    }
152                }
153            }
154            return false;
155        }
156    }