001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.state;
019
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.concurrent.atomic.AtomicBoolean;
031
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.ConnectionInfo;
034import org.apache.activemq.command.ConsumerId;
035import org.apache.activemq.command.ConsumerInfo;
036import org.apache.activemq.command.DestinationInfo;
037import org.apache.activemq.command.SessionId;
038import org.apache.activemq.command.SessionInfo;
039import org.apache.activemq.command.TransactionId;
040
041public class ConnectionState {
042
043    ConnectionInfo info;
044    private final ConcurrentMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>();
045    private final ConcurrentMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>();
046    private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>());
047    private final AtomicBoolean shutdown = new AtomicBoolean(false);
048    private boolean connectionInterruptProcessingComplete = true;
049    private HashMap<ConsumerId, ConsumerInfo> recoveringPullConsumers;
050
051    public ConnectionState(ConnectionInfo info) {
052        this.info = info;
053        // Add the default session id.
054        addSession(new SessionInfo(info, -1));
055    }
056
057    @Override
058    public String toString() {
059        return info.toString();
060    }
061
062    public void reset(ConnectionInfo info) {
063        this.info = info;
064        transactions.clear();
065        sessions.clear();
066        tempDestinations.clear();
067        shutdown.set(false);
068        // Add the default session id.
069        addSession(new SessionInfo(info, -1));
070    }
071
072    public void addTempDestination(DestinationInfo info) {
073        checkShutdown();
074        tempDestinations.add(info);
075    }
076
077    public void removeTempDestination(ActiveMQDestination destination) {
078        for (Iterator<DestinationInfo> iter = tempDestinations.iterator(); iter.hasNext();) {
079            DestinationInfo di = iter.next();
080            if (di.getDestination().equals(destination)) {
081                iter.remove();
082            }
083        }
084    }
085
086    public void addTransactionState(TransactionId id) {
087        checkShutdown();
088        transactions.put(id, new TransactionState(id));
089    }
090
091    public TransactionState getTransactionState(TransactionId id) {
092        return transactions.get(id);
093    }
094
095    public Collection<TransactionState> getTransactionStates() {
096        return transactions.values();
097    }
098
099    public TransactionState removeTransactionState(TransactionId id) {
100        return transactions.remove(id);
101    }
102
103    public void addSession(SessionInfo info) {
104        checkShutdown();
105        sessions.put(info.getSessionId(), new SessionState(info));
106    }
107
108    public SessionState removeSession(SessionId id) {
109        return sessions.remove(id);
110    }
111
112    public SessionState getSessionState(SessionId id) {
113        return sessions.get(id);
114    }
115
116    public ConnectionInfo getInfo() {
117        return info;
118    }
119
120    public Set<SessionId> getSessionIds() {
121        return sessions.keySet();
122    }
123
124    public List<DestinationInfo> getTempDestinations() {
125        return tempDestinations;
126    }
127
128    public Collection<SessionState> getSessionStates() {
129        return sessions.values();
130    }
131
132    private void checkShutdown() {
133        if (shutdown.get()) {
134            throw new IllegalStateException("Disposed");
135        }
136    }
137
138    public void shutdown() {
139        if (shutdown.compareAndSet(false, true)) {
140            for (Iterator<SessionState> iter = sessions.values().iterator(); iter.hasNext();) {
141                SessionState ss = iter.next();
142                ss.shutdown();
143            }
144        }
145    }
146
147    public Map<ConsumerId, ConsumerInfo> getRecoveringPullConsumers() {
148        if (recoveringPullConsumers == null) {
149            recoveringPullConsumers = new HashMap<ConsumerId, ConsumerInfo>();
150        }
151        return recoveringPullConsumers;
152    }
153
154    public void setConnectionInterruptProcessingComplete(boolean connectionInterruptProcessingComplete) {
155        this.connectionInterruptProcessingComplete = connectionInterruptProcessingComplete;
156    }
157
158    public boolean isConnectionInterruptProcessingComplete() {
159        return connectionInterruptProcessingComplete;
160    }
161}