001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashSet;
022    import java.util.Iterator;
023    import java.util.Set;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import org.apache.activemq.broker.BrokerService;
028    import org.apache.activemq.broker.BrokerServiceAware;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQQueue;
032    import org.apache.activemq.command.ActiveMQTopic;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.ProducerId;
036    import org.apache.activemq.kaha.CommandMarshaller;
037    import org.apache.activemq.kaha.ContainerId;
038    import org.apache.activemq.kaha.ListContainer;
039    import org.apache.activemq.kaha.MapContainer;
040    import org.apache.activemq.kaha.Marshaller;
041    import org.apache.activemq.kaha.MessageIdMarshaller;
042    import org.apache.activemq.kaha.MessageMarshaller;
043    import org.apache.activemq.kaha.Store;
044    import org.apache.activemq.kaha.StoreFactory;
045    import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
046    import org.apache.activemq.openwire.OpenWireFormat;
047    import org.apache.activemq.store.MessageStore;
048    import org.apache.activemq.store.PersistenceAdapter;
049    import org.apache.activemq.store.TopicMessageStore;
050    import org.apache.activemq.store.TransactionStore;
051    import org.apache.activemq.usage.SystemUsage;
052    import org.apache.activemq.util.IOHelper;
053    import org.slf4j.Logger;
054    import org.slf4j.LoggerFactory;
055    
056    /**
057     * @org.apache.xbean.XBean
058     * 
059     */
060    public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
061    
062        private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
063        private static final Logger LOG = LoggerFactory.getLogger(KahaPersistenceAdapter.class);
064        private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
065    
066        protected OpenWireFormat wireFormat = new OpenWireFormat();
067        protected KahaTransactionStore transactionStore;
068        protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
069        protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
070        protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
071    
072        private long maxDataFileLength = 32 * 1024 * 1024;
073        private File directory;
074        private String brokerName;
075        private Store theStore;
076        private boolean initialized;
077        private final AtomicLong storeSize;
078        private boolean persistentIndex = true;
079        private BrokerService brokerService;
080    
081        
082        public KahaPersistenceAdapter(AtomicLong size) {
083            this.storeSize=size;
084        }
085        
086        public KahaPersistenceAdapter() {
087            this(new AtomicLong());
088        }
089        
090        public Set<ActiveMQDestination> getDestinations() {
091            Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
092            try {
093                Store store = getStore();
094                for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) {
095                    ContainerId id = (ContainerId)i.next();
096                    Object obj = id.getKey();
097                    if (obj instanceof ActiveMQDestination) {
098                        rc.add((ActiveMQDestination)obj);
099                    }
100                }
101            } catch (IOException e) {
102                LOG.error("Failed to get destinations ", e);
103            }
104            return rc;
105        }
106    
107        public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
108            MessageStore rc = queues.get(destination);
109            if (rc == null) {
110                rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination);
111                messageStores.put(destination, rc);
112                if (transactionStore != null) {
113                    rc = transactionStore.proxy(rc);
114                }
115                queues.put(destination, rc);
116            }
117            return rc;
118        }
119    
120        public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
121            throws IOException {
122            TopicMessageStore rc = topics.get(destination);
123            if (rc == null) {
124                Store store = getStore();
125                MapContainer messageContainer = getMapContainer(destination, "topic-data");
126                MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
127                                                                 "topic-subs");
128                ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(),
129                                                                                 "topic-acks");
130                ackContainer.setMarshaller(new TopicSubAckMarshaller());
131                rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination);
132                messageStores.put(destination, rc);
133                if (transactionStore != null) {
134                    rc = transactionStore.proxy(rc);
135                }
136                topics.put(destination, rc);
137            }
138            return rc;
139        }
140    
141        /**
142         * Cleanup method to remove any state associated with the given destination
143         *
144         * @param destination Destination to forget
145         */
146        public void removeQueueMessageStore(ActiveMQQueue destination) {
147            queues.remove(destination);
148            try{
149                    if(theStore!=null){
150                            theStore.deleteMapContainer(destination,"queue-data");
151                    }
152            }catch(IOException e ){
153                    LOG.error("Failed to remove store map container for queue:"+destination, e);
154            }
155        }
156    
157        /**
158         * Cleanup method to remove any state associated with the given destination
159         *
160         * @param destination Destination to forget
161         */
162        public void removeTopicMessageStore(ActiveMQTopic destination) {
163            topics.remove(destination);
164        }
165    
166        protected MessageStore retrieveMessageStore(Object id) {
167            MessageStore result = messageStores.get(id);
168            return result;
169        }
170    
171        public TransactionStore createTransactionStore() throws IOException {
172            if (transactionStore == null) {
173                while (true) {
174                    try {
175                        Store store = getStore();
176                        MapContainer container = store
177                            .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
178                        container.setKeyMarshaller(new CommandMarshaller(wireFormat));
179                        container.setValueMarshaller(new TransactionMarshaller(wireFormat));
180                        container.load();
181                        transactionStore = new KahaTransactionStore(this, container);
182                        transactionStore.setBrokerService(brokerService);
183                        break;
184                    } catch (StoreLockedExcpetion e) {
185                        LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
186                                 + " seconds for the Store to be unlocked.");
187                        try {
188                            Thread.sleep(STORE_LOCKED_WAIT_DELAY);
189                        } catch (InterruptedException e1) {
190                        }
191                    }
192                }
193            }
194            return transactionStore;
195        }
196    
197        public void beginTransaction(ConnectionContext context) {
198        }
199    
200        public void commitTransaction(ConnectionContext context) throws IOException {
201            if (theStore != null) {
202                theStore.force();
203            }
204        }
205    
206        public void rollbackTransaction(ConnectionContext context) {
207        }
208    
209        public void start() throws Exception {
210            initialize();
211        }
212    
213        public void stop() throws Exception {
214            if (theStore != null) {
215                theStore.close();
216            }
217        }
218    
219        public long getLastMessageBrokerSequenceId() throws IOException {
220            return 0;
221        }
222    
223        public void deleteAllMessages() throws IOException {
224            if (theStore != null) {
225                if (theStore.isInitialized()) {
226                    theStore.clear();
227                } else {
228                    theStore.delete();
229                }
230            } else {
231                StoreFactory.delete(getStoreDirectory());
232            }
233        }
234    
235        protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName)
236            throws IOException {
237            Store store = getStore();
238            MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName);
239            container.setKeyMarshaller(new MessageIdMarshaller());
240            container.setValueMarshaller(new MessageMarshaller(wireFormat));
241            container.load();
242            return container;
243        }
244    
245        protected MapContainer getSubsMapContainer(Object id, String containerName)
246            throws IOException {
247            Store store = getStore();
248            MapContainer container = store.getMapContainer(id, containerName);
249            container.setKeyMarshaller(Store.STRING_MARSHALLER);
250            container.setValueMarshaller(createMessageMarshaller());
251            container.load();
252            return container;
253        }
254    
255        protected Marshaller<Object> createMessageMarshaller() {
256            return new CommandMarshaller(wireFormat);
257        }
258    
259        protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException {
260            Store store = getStore();
261            ListContainer<TopicSubAck> container = store.getListContainer(id, containerName);
262            container.setMarshaller(createMessageMarshaller());
263            container.load();
264            return container;
265        }
266    
267        /**
268         * @param usageManager The UsageManager that is controlling the broker's
269         *                memory usage.
270         */
271        public void setUsageManager(SystemUsage usageManager) {
272        }
273    
274        /**
275         * @return the maxDataFileLength
276         */
277        public long getMaxDataFileLength() {
278            return maxDataFileLength;
279        }
280        
281        public boolean isPersistentIndex() {
282                    return persistentIndex;
283            }
284    
285            public void setPersistentIndex(boolean persistentIndex) {
286                    this.persistentIndex = persistentIndex;
287            }
288    
289        /**
290         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
291         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
292         */
293        public void setMaxDataFileLength(long maxDataFileLength) {
294            this.maxDataFileLength = maxDataFileLength;
295        }
296    
297        protected final synchronized Store getStore() throws IOException {
298            if (theStore == null) {
299                theStore = createStore();
300            }
301            return theStore;
302        }
303        
304        protected final Store createStore() throws IOException {
305            Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
306            result.setMaxDataFileLength(maxDataFileLength);
307            result.setPersistentIndex(isPersistentIndex());
308            result.setDefaultContainerName("container-roots");
309            return result;
310        }
311    
312        private String getStoreName() {
313            initialize();
314            return directory.getAbsolutePath();
315        }
316    
317        private File getStoreDirectory() {
318            initialize();
319            return directory;
320        }
321    
322        public String toString() {
323            return "KahaPersistenceAdapter(" + getStoreName() + ")";
324        }
325    
326        public void setBrokerName(String brokerName) {
327            this.brokerName = brokerName;
328        }
329    
330        public String getBrokerName() {
331            return brokerName;
332        }
333    
334        public File getDirectory() {
335            return this.directory;
336        }
337    
338        public void setDirectory(File directory) {
339            this.directory = directory;
340        }
341    
342        public void checkpoint(boolean sync) throws IOException {
343            if (sync) {
344                getStore().force();
345            }
346        }
347       
348        public long size(){
349           return storeSize.get();
350        }
351    
352        private void initialize() {
353            if (!initialized) {
354                initialized = true;
355                if (this.directory == null) {
356                    File file = new File(IOHelper.getDefaultDataDirectory());
357                    file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore");
358                    setDirectory(file);
359                }
360                try {
361                    IOHelper.mkdirs(this.directory);
362                } catch (IOException e) {
363                    throw new RuntimeException(e);
364                }
365                wireFormat.setCacheEnabled(false);
366                wireFormat.setTightEncodingEnabled(true);
367            }
368        }
369    
370            public void setBrokerService(BrokerService brokerService) {
371                    this.brokerService = brokerService;
372            }
373    
374        public long getLastProducerSequenceId(ProducerId id) {
375            // reference store send has adequate duplicate suppression
376            return -1;
377        }
378      
379    
380    }