001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.scheduler.JobSchedulerStore;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQQueue;
031import org.apache.activemq.command.ActiveMQTopic;
032import org.apache.activemq.command.ProducerId;
033import org.apache.activemq.store.MessageStore;
034import org.apache.activemq.store.PersistenceAdapter;
035import org.apache.activemq.store.ProxyMessageStore;
036import org.apache.activemq.store.TopicMessageStore;
037import org.apache.activemq.store.TransactionStore;
038import org.apache.activemq.usage.SystemUsage;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * @org.apache.xbean.XBean
044 *
045 */
046public class MemoryPersistenceAdapter implements PersistenceAdapter {
047    private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
048
049    MemoryTransactionStore transactionStore;
050    ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
051    ConcurrentMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
052    private boolean useExternalMessageReferences;
053
054    @Override
055    public Set<ActiveMQDestination> getDestinations() {
056        Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
057        for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
058            rc.add(iter.next());
059        }
060        for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) {
061            rc.add(iter.next());
062        }
063        return rc;
064    }
065
066    public static MemoryPersistenceAdapter newInstance(File file) {
067        return new MemoryPersistenceAdapter();
068    }
069
070    @Override
071    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
072        MessageStore rc = queues.get(destination);
073        if (rc == null) {
074            rc = new MemoryMessageStore(destination);
075            if (transactionStore != null) {
076                rc = transactionStore.proxy(rc);
077            }
078            queues.put(destination, rc);
079        }
080        return rc;
081    }
082
083    @Override
084    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
085        TopicMessageStore rc = topics.get(destination);
086        if (rc == null) {
087            rc = new MemoryTopicMessageStore(destination);
088            if (transactionStore != null) {
089                rc = transactionStore.proxy(rc);
090            }
091            topics.put(destination, rc);
092        }
093        return rc;
094    }
095
096    /**
097     * Cleanup method to remove any state associated with the given destination
098     *
099     * @param destination Destination to forget
100     */
101    @Override
102    public void removeQueueMessageStore(ActiveMQQueue destination) {
103        queues.remove(destination);
104    }
105
106    /**
107     * Cleanup method to remove any state associated with the given destination
108     *
109     * @param destination Destination to forget
110     */
111    @Override
112    public void removeTopicMessageStore(ActiveMQTopic destination) {
113        topics.remove(destination);
114    }
115
116    @Override
117    public TransactionStore createTransactionStore() throws IOException {
118        if (transactionStore == null) {
119            transactionStore = new MemoryTransactionStore(this);
120        }
121        return transactionStore;
122    }
123
124    @Override
125    public void beginTransaction(ConnectionContext context) {
126    }
127
128    @Override
129    public void commitTransaction(ConnectionContext context) {
130    }
131
132    @Override
133    public void rollbackTransaction(ConnectionContext context) {
134    }
135
136    @Override
137    public void start() throws Exception {
138    }
139
140    @Override
141    public void stop() throws Exception {
142    }
143
144    @Override
145    public long getLastMessageBrokerSequenceId() throws IOException {
146        return 0;
147    }
148
149    @Override
150    public void deleteAllMessages() throws IOException {
151        for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
152            MemoryMessageStore store = asMemoryMessageStore(iter.next());
153            if (store != null) {
154                store.delete();
155            }
156        }
157        for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) {
158            MemoryMessageStore store = asMemoryMessageStore(iter.next());
159            if (store != null) {
160                store.delete();
161            }
162        }
163
164        if (transactionStore != null) {
165            transactionStore.delete();
166        }
167    }
168
169    public boolean isUseExternalMessageReferences() {
170        return useExternalMessageReferences;
171    }
172
173    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
174        this.useExternalMessageReferences = useExternalMessageReferences;
175    }
176
177    protected MemoryMessageStore asMemoryMessageStore(Object value) {
178        if (value instanceof MemoryMessageStore) {
179            return (MemoryMessageStore)value;
180        }
181        if (value instanceof ProxyMessageStore) {
182            MessageStore delegate = ((ProxyMessageStore)value).getDelegate();
183            if (delegate instanceof MemoryMessageStore) {
184                return (MemoryMessageStore) delegate;
185            }
186        }
187        LOG.warn("Expected an instance of MemoryMessageStore but was: " + value);
188        return null;
189    }
190
191    /**
192     * @param usageManager The UsageManager that is controlling the broker's
193     *                memory usage.
194     */
195    @Override
196    public void setUsageManager(SystemUsage usageManager) {
197    }
198
199    @Override
200    public String toString() {
201        return "MemoryPersistenceAdapter";
202    }
203
204    @Override
205    public void setBrokerName(String brokerName) {
206    }
207
208    @Override
209    public void setDirectory(File dir) {
210    }
211
212    @Override
213    public File getDirectory(){
214        return null;
215    }
216
217    @Override
218    public void checkpoint(boolean sync) throws IOException {
219    }
220
221    @Override
222    public long size(){
223        return 0;
224    }
225
226    public void setCreateTransactionStore(boolean create) throws IOException {
227        if (create) {
228            createTransactionStore();
229        }
230    }
231
232    @Override
233    public long getLastProducerSequenceId(ProducerId id) {
234        // memory map does duplicate suppression
235        return -1;
236    }
237
238    @Override
239    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
240        // We could eventuall implement an in memory scheduler.
241        throw new UnsupportedOperationException();
242    }
243}