001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.scheduler.JobSchedulerStore;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.ActiveMQQueue;
031import org.apache.activemq.command.ActiveMQTopic;
032import org.apache.activemq.command.ProducerId;
033import org.apache.activemq.store.MessageStore;
034import org.apache.activemq.store.NoLocalSubscriptionAware;
035import org.apache.activemq.store.PersistenceAdapter;
036import org.apache.activemq.store.ProxyMessageStore;
037import org.apache.activemq.store.TopicMessageStore;
038import org.apache.activemq.store.TransactionStore;
039import org.apache.activemq.usage.SystemUsage;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * @org.apache.xbean.XBean
045 */
046public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubscriptionAware {
047    private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
048
049    MemoryTransactionStore transactionStore;
050    ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
051    ConcurrentMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
052    private boolean useExternalMessageReferences;
053
054    @Override
055    public Set<ActiveMQDestination> getDestinations() {
056        Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
057        for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
058            rc.add(iter.next());
059        }
060        for (Iterator<ActiveMQDestination> iter = topics.keySet().iterator(); iter.hasNext();) {
061            rc.add(iter.next());
062        }
063        return rc;
064    }
065
066    public static MemoryPersistenceAdapter newInstance(File file) {
067        return new MemoryPersistenceAdapter();
068    }
069
070    @Override
071    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
072        MessageStore rc = queues.get(destination);
073        if (rc == null) {
074            rc = new MemoryMessageStore(destination);
075            if (transactionStore != null) {
076                rc = transactionStore.proxy(rc);
077            }
078            queues.put(destination, rc);
079        }
080        return rc;
081    }
082
083    @Override
084    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
085        TopicMessageStore rc = topics.get(destination);
086        if (rc == null) {
087            rc = new MemoryTopicMessageStore(destination);
088            if (transactionStore != null) {
089                rc = transactionStore.proxy(rc);
090            }
091            topics.put(destination, rc);
092        }
093        return rc;
094    }
095
096    /**
097     * Cleanup method to remove any state associated with the given destination
098     *
099     * @param destination
100     *        Destination to forget
101     */
102    @Override
103    public void removeQueueMessageStore(ActiveMQQueue destination) {
104        queues.remove(destination);
105    }
106
107    /**
108     * Cleanup method to remove any state associated with the given destination
109     *
110     * @param destination
111     *        Destination to forget
112     */
113    @Override
114    public void removeTopicMessageStore(ActiveMQTopic destination) {
115        topics.remove(destination);
116    }
117
118    @Override
119    public TransactionStore createTransactionStore() throws IOException {
120        if (transactionStore == null) {
121            transactionStore = new MemoryTransactionStore(this);
122        }
123        return transactionStore;
124    }
125
126    @Override
127    public void beginTransaction(ConnectionContext context) {
128    }
129
130    @Override
131    public void commitTransaction(ConnectionContext context) {
132    }
133
134    @Override
135    public void rollbackTransaction(ConnectionContext context) {
136    }
137
138    @Override
139    public void start() throws Exception {
140    }
141
142    @Override
143    public void stop() throws Exception {
144    }
145
146    @Override
147    public long getLastMessageBrokerSequenceId() throws IOException {
148        return 0;
149    }
150
151    @Override
152    public void deleteAllMessages() throws IOException {
153        for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
154            MemoryMessageStore store = asMemoryMessageStore(iter.next());
155            if (store != null) {
156                store.delete();
157            }
158        }
159        for (Iterator<MessageStore> iter = queues.values().iterator(); iter.hasNext();) {
160            MemoryMessageStore store = asMemoryMessageStore(iter.next());
161            if (store != null) {
162                store.delete();
163            }
164        }
165
166        if (transactionStore != null) {
167            transactionStore.delete();
168        }
169    }
170
171    public boolean isUseExternalMessageReferences() {
172        return useExternalMessageReferences;
173    }
174
175    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
176        this.useExternalMessageReferences = useExternalMessageReferences;
177    }
178
179    protected MemoryMessageStore asMemoryMessageStore(Object value) {
180        if (value instanceof MemoryMessageStore) {
181            return (MemoryMessageStore) value;
182        }
183        if (value instanceof ProxyMessageStore) {
184            MessageStore delegate = ((ProxyMessageStore) value).getDelegate();
185            if (delegate instanceof MemoryMessageStore) {
186                return (MemoryMessageStore) delegate;
187            }
188        }
189        LOG.warn("Expected an instance of MemoryMessageStore but was: " + value);
190        return null;
191    }
192
193    /**
194     * @param usageManager
195     *        The UsageManager that is controlling the broker's memory usage.
196     */
197    @Override
198    public void setUsageManager(SystemUsage usageManager) {
199    }
200
201    @Override
202    public String toString() {
203        return "MemoryPersistenceAdapter";
204    }
205
206    @Override
207    public void setBrokerName(String brokerName) {
208    }
209
210    @Override
211    public void setDirectory(File dir) {
212    }
213
214    @Override
215    public File getDirectory() {
216        return null;
217    }
218
219    @Override
220    public void checkpoint(boolean sync) throws IOException {
221    }
222
223    @Override
224    public long size() {
225        return 0;
226    }
227
228    public void setCreateTransactionStore(boolean create) throws IOException {
229        if (create) {
230            createTransactionStore();
231        }
232    }
233
234    @Override
235    public long getLastProducerSequenceId(ProducerId id) {
236        // memory map does duplicate suppression
237        return -1;
238    }
239
240    @Override
241    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
242        // We could eventuall implement an in memory scheduler.
243        throw new UnsupportedOperationException();
244    }
245
246    /* (non-Javadoc)
247     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
248     */
249    @Override
250    public boolean isPersistNoLocal() {
251        return true;
252    }
253}