001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import org.apache.activemq.broker.BrokerService;
021    import org.apache.activemq.broker.ConnectionContext;
022    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
023    import org.apache.activemq.command.ActiveMQDestination;
024    import org.apache.activemq.command.ActiveMQTempDestination;
025    import org.apache.activemq.store.MessageStore;
026    import org.apache.activemq.thread.TaskRunnerFactory;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * The Queue is a List of MessageEntry objects that are dispatched to matching
032     * subscriptions.
033     * 
034     * 
035     */
036    public class TempQueue extends Queue{
037        private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class);
038        private final ActiveMQTempDestination tempDest;
039       
040        
041        /**
042         * @param brokerService
043         * @param destination
044         * @param store
045         * @param parentStats
046         * @param taskFactory
047         * @throws Exception
048         */
049        public TempQueue(BrokerService brokerService,
050                ActiveMQDestination destination, MessageStore store,
051                DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
052                throws Exception {
053            super(brokerService, destination, store, parentStats, taskFactory);
054            this.tempDest = (ActiveMQTempDestination) destination;
055        }
056        
057        @Override
058        public void initialize() throws Exception {
059            this.messages=new VMPendingMessageCursor(false);
060            this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
061            this.systemUsage = brokerService.getSystemUsage();
062            memoryUsage.setParent(systemUsage.getMemoryUsage());           
063            this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());
064        }
065        
066        @Override
067        public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
068            // Only consumers on the same connection can consume from
069            // the temporary destination
070            // However, we could have failed over - and we do this
071            // check client side anyways ....
072            if (!context.isFaultTolerant()
073                    && (!context.isNetworkConnection() && !tempDest
074                            .getConnectionId().equals(
075                                    sub.getConsumerInfo().getConsumerId()
076                                            .getConnectionId()))) {
077    
078                tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
079                if (LOG.isDebugEnabled()) {
080                    LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
081                }
082            }
083            super.addSubscription(context, sub);
084        }
085        
086        @Override
087        public void dispose(ConnectionContext context) throws IOException {
088            try {
089               purge();
090            } catch (Exception e) {
091              LOG.warn("Caught an exception purging Queue: " + destination);
092            }
093            super.dispose(context);
094        }
095    }