001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import javax.jms.JMSException;
020    
021    import org.apache.activemq.broker.ConnectionContext;
022    import org.apache.activemq.broker.region.policy.PolicyEntry;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ConsumerInfo;
025    import org.apache.activemq.thread.TaskRunnerFactory;
026    import org.apache.activemq.usage.SystemUsage;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * 
032     */
033    public class TempTopicRegion extends AbstractTempRegion {
034    
035        private static final Logger LOG = LoggerFactory.getLogger(TempTopicRegion.class);
036    
037        public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
038                               DestinationFactory destinationFactory) {
039            super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
040            // We should allow the following to be configurable via a Destination
041            // Policy
042            // setAutoCreateDestinations(false);
043        }
044    
045        protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
046            if (info.isDurable()) {
047                throw new JMSException("A durable subscription cannot be created for a temporary topic.");
048            }
049            try {
050                TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
051                // lets configure the subscription depending on the destination
052                ActiveMQDestination destination = info.getDestination();
053                if (destination != null && broker.getDestinationPolicy() != null) {
054                    PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
055                    if (entry != null) {
056                        entry.configure(broker, usageManager, answer);
057                    }
058                }
059                answer.init();
060                return answer;
061            } catch (Exception e) {
062                LOG.error("Failed to create TopicSubscription ", e);
063                JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
064                jmsEx.setLinkedException(e);
065                throw jmsEx;
066            }
067        }
068    
069        public String toString() {
070            return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
071        }
072    
073        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
074    
075            // Force a timeout value so that we don't get an error that
076            // there is still an active sub. Temp destination may be removed
077            // while a network sub is still active which is valid.
078            if (timeout == 0) {
079                timeout = 1;
080            }
081    
082            super.removeDestination(context, destination, timeout);
083        }
084    
085        
086        protected Destination doCreateDestination(ConnectionContext context,
087                ActiveMQDestination destination) throws Exception {
088            return destinationFactory.createDestination(context, destination, destinationStatistics);
089        }
090    }