001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.util.HashMap;
020    import java.util.HashSet;
021    import java.util.Map;
022    import java.util.Set;
023    import java.util.Timer;
024    import java.util.TimerTask;
025    
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.command.ActiveMQDestination;
028    import org.apache.activemq.thread.TaskRunnerFactory;
029    import org.apache.activemq.usage.SystemUsage;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * 
035     */
036    public abstract class AbstractTempRegion extends AbstractRegion {
037        private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class);
038    
039        private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>();
040        private final boolean doCacheTempDestinations;
041        private final int purgeTime;
042        private Timer purgeTimer;
043        private TimerTask purgeTask;
044       
045    
046        /**
047         * @param broker
048         * @param destinationStatistics
049         * @param memoryManager
050         * @param taskRunnerFactory
051         * @param destinationFactory
052         */
053        public AbstractTempRegion(RegionBroker broker,
054                DestinationStatistics destinationStatistics,
055                SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
056                DestinationFactory destinationFactory) {
057            super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
058                    destinationFactory);
059            this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations();
060            this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations();
061            if (this.doCacheTempDestinations) {
062                this.purgeTimer = new Timer("ActiveMQ Temp destination purge timer", true);
063                this.purgeTask = new TimerTask() {
064                    public void run() {
065                        doPurge();
066                    }
067        
068                };
069                this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime);
070            }
071           
072        }
073    
074        public void stop() throws Exception {
075            super.stop();
076            if (purgeTimer != null) {
077                purgeTimer.cancel();
078            }
079        }
080    
081        protected abstract Destination doCreateDestination(
082                ConnectionContext context, ActiveMQDestination destination)
083                throws Exception;
084    
085        protected synchronized Destination createDestination(
086                ConnectionContext context, ActiveMQDestination destination)
087                throws Exception {
088            Destination result = cachedDestinations.remove(new CachedDestination(
089                    destination));
090            if (result == null) {
091                result = doCreateDestination(context, destination);
092            }
093            return result;
094        }
095    
096        protected final synchronized void dispose(ConnectionContext context,
097                Destination dest) throws Exception {
098            // add to cache
099            if (this.doCacheTempDestinations) {
100                cachedDestinations.put(new CachedDestination(dest
101                        .getActiveMQDestination()), dest);
102            }else {
103                try {
104                    dest.dispose(context);
105                    dest.stop();
106                } catch (Exception e) {
107                    LOG.warn("Failed to dispose of " + dest, e);
108                }
109            }
110        }
111    
112        private void doDispose(Destination dest) {
113            ConnectionContext context = new ConnectionContext();
114            try {
115                dest.dispose(context);
116                dest.stop();
117            } catch (Exception e) {
118                LOG.warn("Failed to dispose of " + dest, e);
119            }
120    
121        }
122    
123        private synchronized void doPurge() {
124            long currentTime = System.currentTimeMillis();
125            if (cachedDestinations.size() > 0) {
126                Set<CachedDestination> tmp = new HashSet<CachedDestination>(
127                        cachedDestinations.keySet());
128                for (CachedDestination key : tmp) {
129                    if ((key.timeStamp + purgeTime) < currentTime) {
130                        Destination dest = cachedDestinations.remove(key);
131                        if (dest != null) {
132                            doDispose(dest);
133                        }
134                    }
135                }
136            }
137        }
138    
139        static class CachedDestination {
140            long timeStamp;
141    
142            ActiveMQDestination destination;
143    
144            CachedDestination(ActiveMQDestination destination) {
145                this.destination = destination;
146                this.timeStamp = System.currentTimeMillis();
147            }
148    
149            public int hashCode() {
150                return destination.hashCode();
151            }
152    
153            public boolean equals(Object o) {
154                if (o instanceof CachedDestination) {
155                    CachedDestination other = (CachedDestination) o;
156                    return other.destination.equals(this.destination);
157                }
158                return false;
159            }
160    
161        }
162    
163    }