001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.HashMap;
020    import java.util.HashSet;
021    import java.util.Map;
022    import java.util.Set;
023    import java.util.Timer;
024    import java.util.TimerTask;
025    
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.thread.TaskRunnerFactory;
029    import org.apache.activemq.usage.SystemUsage;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * 
035     */
036    public abstract class AbstractTempRegion extends AbstractRegion {
037        private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class);
038    
039        private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>();
040        private final boolean doCacheTempDestinations;
041        private final int purgeTime;
042        private Timer purgeTimer;
043        private TimerTask purgeTask;
044       
045    
046        /**
047         * @param broker
048         * @param destinationStatistics
049         * @param memoryManager
050         * @param taskRunnerFactory
051         * @param destinationFactory
052         */
053        public AbstractTempRegion(RegionBroker broker,
054                DestinationStatistics destinationStatistics,
055                SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
056                DestinationFactory destinationFactory) {
057            super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
058                    destinationFactory);
059            this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations();
060            this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations();
061            if (this.doCacheTempDestinations) {
062                this.purgeTimer = new Timer("ActiveMQ Temp destination purge timer", true);
063                this.purgeTask = new TimerTask() {
064                    public void run() {
065                        doPurge();
066                    }
067        
068                };
069                this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime);
070            }
071           
072        }
073    
074        public void stop() throws Exception {
075            super.stop();
076            if (purgeTimer != null) {
077                purgeTimer.cancel();
078            }
079        }
080    
081        protected synchronized Destination createDestination(
082                ConnectionContext context, ActiveMQDestination destination)
083                throws Exception {
084            Destination result = cachedDestinations.remove(new CachedDestination(
085                    destination));
086            if (result == null) {
087                result =  destinationFactory.createDestination(context, destination, destinationStatistics);
088            }
089            return result;
090        }
091    
092        protected final synchronized void dispose(ConnectionContext context,
093                Destination dest) throws Exception {
094            // add to cache
095            if (this.doCacheTempDestinations) {
096                cachedDestinations.put(new CachedDestination(dest
097                        .getActiveMQDestination()), dest);
098            }else {
099                try {
100                    dest.dispose(context);
101                    dest.stop();
102                } catch (Exception e) {
103                    LOG.warn("Failed to dispose of " + dest, e);
104                }
105            }
106        }
107    
108        private void doDispose(Destination dest) {
109            ConnectionContext context = new ConnectionContext();
110            try {
111                dest.dispose(context);
112                dest.stop();
113            } catch (Exception e) {
114                LOG.warn("Failed to dispose of " + dest, e);
115            }
116    
117        }
118    
119        private synchronized void doPurge() {
120            long currentTime = System.currentTimeMillis();
121            if (cachedDestinations.size() > 0) {
122                Set<CachedDestination> tmp = new HashSet<CachedDestination>(
123                        cachedDestinations.keySet());
124                for (CachedDestination key : tmp) {
125                    if ((key.timeStamp + purgeTime) < currentTime) {
126                        Destination dest = cachedDestinations.remove(key);
127                        if (dest != null) {
128                            doDispose(dest);
129                        }
130                    }
131                }
132            }
133        }
134    
135        static class CachedDestination {
136            long timeStamp;
137    
138            ActiveMQDestination destination;
139    
140            CachedDestination(ActiveMQDestination destination) {
141                this.destination = destination;
142                this.timeStamp = System.currentTimeMillis();
143            }
144    
145            public int hashCode() {
146                return destination.hashCode();
147            }
148    
149            public boolean equals(Object o) {
150                if (o instanceof CachedDestination) {
151                    CachedDestination other = (CachedDestination) o;
152                    return other.destination.equals(this.destination);
153                }
154                return false;
155            }
156    
157        }
158    
159    }