001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import org.apache.activemq.broker.BrokerPluginSupport;
020import org.apache.activemq.broker.ProducerBrokerExchange;
021import org.apache.activemq.broker.region.Destination;
022import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
023import org.apache.activemq.command.ActiveMQDestination;
024import org.apache.activemq.command.ActiveMQMessage;
025import org.apache.activemq.command.Message;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * A Broker interceptor which updates a JMS Client's timestamp on the message
031 * with a broker timestamp. Useful when the clocks on client machines are known
032 * to not be correct and you can only trust the time set on the broker machines.
033 *
034 * Enabling this plugin will break JMS compliance since the timestamp that the
035 * producer sees on the messages after as send() will be different from the
036 * timestamp the consumer will observe when he receives the message. This plugin
037 * is not enabled in the default ActiveMQ configuration.
038 *
039 * 2 new attributes have been added which will allow the administrator some override control
040 * over the expiration time for incoming messages:
041 *
042 * Attribute 'zeroExpirationOverride' can be used to apply an expiration
043 * time to incoming messages with no expiration defined (messages that would never expire)
044 *
045 * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
046 *
047 * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
048 *
049 *
050 */
051public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
052    private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
053    /**
054    * variable which (when non-zero) is used to override
055    * the expiration date for messages that arrive with
056    * no expiration date set (in Milliseconds).
057    */
058    long zeroExpirationOverride = 0;
059
060    /**
061    * variable which (when non-zero) is used to limit
062    * the expiration date (in Milliseconds).
063    */
064    long ttlCeiling = 0;
065
066    /**
067     * If true, the plugin will not update timestamp to past values
068     * False by default
069     */
070    boolean futureOnly = false;
071
072
073    /**
074     * if true, update timestamp even if message has passed through a network
075     * default false
076     */
077    boolean processNetworkMessages = false;
078
079    /**
080    * setter method for zeroExpirationOverride
081    */
082    public void setZeroExpirationOverride(long ttl)
083    {
084        this.zeroExpirationOverride = ttl;
085    }
086
087    /**
088    * setter method for ttlCeiling
089    */
090    public void setTtlCeiling(long ttlCeiling)
091    {
092        this.ttlCeiling = ttlCeiling;
093    }
094
095    public void setFutureOnly(boolean futureOnly) {
096        this.futureOnly = futureOnly;
097    }
098
099    public void setProcessNetworkMessages(Boolean processNetworkMessages) {
100        this.processNetworkMessages = processNetworkMessages;
101    }
102
103    @Override
104    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
105
106        if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
107           (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length == 0))) {
108            // timestamp not been disabled and has not passed through a network or processNetworkMessages=true
109
110            long oldExpiration = message.getExpiration();
111            long newTimeStamp = System.currentTimeMillis();
112            long timeToLive = zeroExpirationOverride;
113            long oldTimestamp = message.getTimestamp();
114            if (oldExpiration > 0) {
115                timeToLive = oldExpiration - oldTimestamp;
116            }
117            if (timeToLive > 0 && ttlCeiling > 0 && timeToLive > ttlCeiling) {
118                timeToLive = ttlCeiling;
119            }
120            long expiration = timeToLive + newTimeStamp;
121            // In the scenario that the Broker is behind the clients we never want to set the
122            // Timestamp and Expiration in the past
123            if(!futureOnly || (expiration > oldExpiration)) {
124                if (timeToLive > 0 && expiration > 0) {
125                    message.setExpiration(expiration);
126                }
127                message.setTimestamp(newTimeStamp);
128                LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ message.getMessageId(), oldTimestamp, newTimeStamp });
129            }
130        }
131        super.send(producerExchange, message);
132    }
133
134    private boolean isDestinationDLQ(Message message) {
135        DeadLetterStrategy deadLetterStrategy;
136        Message tmp;
137
138        Destination regionDestination = (Destination) message.getRegionDestination();
139        if (message != null && regionDestination != null) {
140            deadLetterStrategy = regionDestination.getDeadLetterStrategy();
141            if (deadLetterStrategy != null && message.getOriginalDestination() != null) {
142                // Cheap copy, since we only need two fields
143                tmp = new ActiveMQMessage();
144                tmp.setDestination(message.getOriginalDestination());
145                tmp.setRegionDestination(regionDestination);
146
147                // Determine if we are headed for a DLQ
148                ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp, null);
149                if (deadLetterDestination.equals(message.getDestination())) {
150                    return true;
151                }
152            }
153        }
154        return false;
155    }
156}