001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.state;
019    
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.ConnectionInfo;
033    import org.apache.activemq.command.ConsumerId;
034    import org.apache.activemq.command.ConsumerInfo;
035    import org.apache.activemq.command.DestinationInfo;
036    import org.apache.activemq.command.SessionId;
037    import org.apache.activemq.command.SessionInfo;
038    import org.apache.activemq.command.TransactionId;
039    
040    public class ConnectionState {
041    
042        ConnectionInfo info;
043        private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>();
044        private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>();
045        private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>());
046        private final AtomicBoolean shutdown = new AtomicBoolean(false);
047        private boolean connectionInterruptProcessingComplete = true;
048        private HashMap<ConsumerId, ConsumerInfo> recoveringPullConsumers;
049    
050        public ConnectionState(ConnectionInfo info) {
051            this.info = info;
052            // Add the default session id.
053            addSession(new SessionInfo(info, -1));
054        }
055    
056        public String toString() {
057            return info.toString();
058        }
059    
060        public void reset(ConnectionInfo info) {
061            this.info = info;
062            transactions.clear();
063            sessions.clear();
064            tempDestinations.clear();
065            shutdown.set(false);
066            // Add the default session id.
067            addSession(new SessionInfo(info, -1));
068        }
069    
070        public void addTempDestination(DestinationInfo info) {
071            checkShutdown();
072            tempDestinations.add(info);
073        }
074    
075        public void removeTempDestination(ActiveMQDestination destination) {
076            for (Iterator<DestinationInfo> iter = tempDestinations.iterator(); iter.hasNext();) {
077                DestinationInfo di = iter.next();
078                if (di.getDestination().equals(destination)) {
079                    iter.remove();
080                }
081            }
082        }
083    
084        public void addTransactionState(TransactionId id) {
085            checkShutdown();
086            transactions.put(id, new TransactionState(id));
087        }
088    
089        public TransactionState getTransactionState(TransactionId id) {
090            return transactions.get(id);
091        }
092    
093        public Collection<TransactionState> getTransactionStates() {
094            return transactions.values();
095        }
096    
097        public TransactionState removeTransactionState(TransactionId id) {
098            return transactions.remove(id);
099        }
100    
101        public void addSession(SessionInfo info) {
102            checkShutdown();
103            sessions.put(info.getSessionId(), new SessionState(info));
104        }
105    
106        public SessionState removeSession(SessionId id) {
107            return sessions.remove(id);
108        }
109    
110        public SessionState getSessionState(SessionId id) {
111            return sessions.get(id);
112        }
113    
114        public ConnectionInfo getInfo() {
115            return info;
116        }
117    
118        public Set<SessionId> getSessionIds() {
119            return sessions.keySet();
120        }
121    
122        public List<DestinationInfo> getTempDestinations() {
123            return tempDestinations;
124        }
125    
126        public Collection<SessionState> getSessionStates() {
127            return sessions.values();
128        }
129    
130        private void checkShutdown() {
131            if (shutdown.get()) {
132                throw new IllegalStateException("Disposed");
133            }
134        }
135    
136        public void shutdown() {
137            if (shutdown.compareAndSet(false, true)) {
138                for (Iterator<SessionState> iter = sessions.values().iterator(); iter.hasNext();) {
139                    SessionState ss = iter.next();
140                    ss.shutdown();
141                }
142            }
143        }
144    
145        public Map<ConsumerId, ConsumerInfo> getRecoveringPullConsumers() {
146            if (recoveringPullConsumers == null) {
147                recoveringPullConsumers = new HashMap<ConsumerId, ConsumerInfo>();
148            }
149            return recoveringPullConsumers;
150        }
151    
152        public void setConnectionInterruptProcessingComplete(boolean connectionInterruptProcessingComplete) {
153            this.connectionInterruptProcessingComplete = connectionInterruptProcessingComplete;
154        }
155        
156        public boolean isConnectionInterruptProcessingComplete() {
157            return connectionInterruptProcessingComplete;
158        }
159    }