001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import org.apache.activemq.broker.BrokerPluginSupport;
020    import org.apache.activemq.broker.ProducerBrokerExchange;
021    import org.apache.activemq.broker.region.Destination;
022    import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ActiveMQMessage;
025    import org.apache.activemq.command.Message;
026    import org.slf4j.Logger;
027    import org.slf4j.LoggerFactory;
028    
029    /**
030     * A Broker interceptor which updates a JMS Client's timestamp on the message
031     * with a broker timestamp. Useful when the clocks on client machines are known
032     * to not be correct and you can only trust the time set on the broker machines.
033     *
034     * Enabling this plugin will break JMS compliance since the timestamp that the
035     * producer sees on the messages after as send() will be different from the
036     * timestamp the consumer will observe when he receives the message. This plugin
037     * is not enabled in the default ActiveMQ configuration.
038     *
039     * 2 new attributes have been added which will allow the administrator some override control
040     * over the expiration time for incoming messages:
041     *
042     * Attribute 'zeroExpirationOverride' can be used to apply an expiration
043     * time to incoming messages with no expiration defined (messages that would never expire)
044     *
045     * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
046     *
047     * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
048     *
049     *
050     */
051    public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
052        private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
053        /**
054        * variable which (when non-zero) is used to override
055        * the expiration date for messages that arrive with
056        * no expiration date set (in Milliseconds).
057        */
058        long zeroExpirationOverride = 0;
059    
060        /**
061        * variable which (when non-zero) is used to limit
062        * the expiration date (in Milliseconds).
063        */
064        long ttlCeiling = 0;
065    
066        /**
067         * If true, the plugin will not update timestamp to past values
068         * False by default
069         */
070        boolean futureOnly = false;
071    
072    
073        /**
074         * if true, update timestamp even if message has passed through a network
075         * default false
076         */
077        boolean processNetworkMessages = false;
078    
079        /**
080        * setter method for zeroExpirationOverride
081        */
082        public void setZeroExpirationOverride(long ttl)
083        {
084            this.zeroExpirationOverride = ttl;
085        }
086    
087        /**
088        * setter method for ttlCeiling
089        */
090        public void setTtlCeiling(long ttlCeiling)
091        {
092            this.ttlCeiling = ttlCeiling;
093        }
094    
095        public void setFutureOnly(boolean futureOnly) {
096            this.futureOnly = futureOnly;
097        }
098    
099        public void setProcessNetworkMessages(Boolean processNetworkMessages) {
100            this.processNetworkMessages = processNetworkMessages;
101        }
102    
103        @Override
104        public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
105    
106            if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
107               (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
108                // timestamp not been disabled and has not passed through a network or processNetworkMessages=true
109    
110                long oldExpiration = message.getExpiration();
111                long newTimeStamp = System.currentTimeMillis();
112                long timeToLive = zeroExpirationOverride;
113                long oldTimestamp = message.getTimestamp();
114                if (oldExpiration > 0) {
115                    timeToLive = oldExpiration - oldTimestamp;
116                }
117                if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
118                    timeToLive = ttlCeiling;
119                }
120                long expiration = timeToLive + newTimeStamp;
121                // In the scenario that the Broker is behind the clients we never want to set the
122                // Timestamp and Expiration in the past
123                if(!futureOnly || (expiration > oldExpiration)) {
124                    if (timeToLive > 0 && expiration > 0) {
125                        message.setExpiration(expiration);
126                    }
127                    message.setTimestamp(newTimeStamp);
128                    if (LOG.isDebugEnabled()) {
129                        LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
130                    }
131                }
132            }
133            super.send(producerExchange, message);
134        }
135    
136        private boolean isDestinationDLQ(Message message) {
137            DeadLetterStrategy deadLetterStrategy;
138            Message tmp;
139    
140            Destination regionDestination = (Destination) message.getRegionDestination();
141            if (message != null && regionDestination != null) {
142                deadLetterStrategy = regionDestination.getDeadLetterStrategy();
143                if (deadLetterStrategy != null) {
144                    // Cheap copy, since we only need two fields
145                    tmp = new ActiveMQMessage();
146                    tmp.setDestination(message.getOriginalDestination());
147                    tmp.setRegionDestination(regionDestination);
148    
149                    // Determine if we are headed for a DLQ
150                    ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null);
151                    if (deadLetterDestination.equals(message.getDestination())) {
152                        return true;
153                    }
154                }
155            }
156            return false;
157        }
158    }