001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.List;
022    import javax.jms.InvalidSelectorException;
023    import javax.jms.JMSException;
024    
025    import org.apache.activemq.broker.Broker;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
028    import org.apache.activemq.command.ConsumerInfo;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.filter.MessageEvaluationContext;
031    import org.apache.activemq.usage.SystemUsage;
032    
033    public class QueueBrowserSubscription extends QueueSubscription {
034    
035        int queueRefs;
036        boolean browseDone;
037        boolean destinationsAdded;
038    
039        public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
040            throws JMSException {
041            super(broker,usageManager, context, info);
042        }
043    
044        protected boolean canDispatch(MessageReference node) {
045            return !((QueueMessageReference)node).isAcked();
046        }
047    
048        public synchronized String toString() {
049            return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + ", destinations="
050                   + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
051                   + this.prefetchExtension + ", pending=" + getPendingQueueSize();
052        }
053    
054        synchronized public void destinationsAdded() throws Exception {
055            destinationsAdded = true;
056            checkDone();
057        }
058    
059        private void checkDone() throws Exception {
060            if( !browseDone && queueRefs == 0 && destinationsAdded) {
061                browseDone=true;
062                add(QueueMessageReference.NULL_MESSAGE);
063            }
064        }
065    
066        public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
067            return !browseDone && super.matches(node, context);
068        }
069    
070        /**
071         * Since we are a browser we don't really remove the message from the queue.
072         */
073        protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n)
074            throws IOException {
075            if (info.isNetworkSubscription()) {
076                    super.acknowledge(context, ack, n);
077            }
078        }
079    
080        synchronized public void incrementQueueRef() {
081            queueRefs++;        
082        }
083    
084        synchronized public void decrementQueueRef() throws Exception {
085            if (queueRefs > 0) {
086                queueRefs--;
087            }
088            checkDone();
089        }
090    
091    
092        @Override
093        public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
094            super.remove(context, destination);
095            // there's no unacked messages that needs to be redelivered
096            // in case of browser
097            return new ArrayList<MessageReference>();
098        }
099    }