001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.HashSet;
022    import java.util.Iterator;
023    import java.util.Set;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.atomic.AtomicLong;
026    
027    import org.apache.activemq.broker.BrokerService;
028    import org.apache.activemq.broker.BrokerServiceAware;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQQueue;
032    import org.apache.activemq.command.ActiveMQTopic;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.ProducerId;
036    import org.apache.activemq.kaha.CommandMarshaller;
037    import org.apache.activemq.kaha.ContainerId;
038    import org.apache.activemq.kaha.ListContainer;
039    import org.apache.activemq.kaha.MapContainer;
040    import org.apache.activemq.kaha.Marshaller;
041    import org.apache.activemq.kaha.MessageIdMarshaller;
042    import org.apache.activemq.kaha.MessageMarshaller;
043    import org.apache.activemq.kaha.Store;
044    import org.apache.activemq.kaha.StoreFactory;
045    import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
046    import org.apache.activemq.openwire.OpenWireFormat;
047    import org.apache.activemq.store.MessageStore;
048    import org.apache.activemq.store.PersistenceAdapter;
049    import org.apache.activemq.store.TopicMessageStore;
050    import org.apache.activemq.store.TransactionStore;
051    import org.apache.activemq.usage.SystemUsage;
052    import org.apache.activemq.util.IOHelper;
053    import org.slf4j.Logger;
054    import org.slf4j.LoggerFactory;
055    
056    /**
057     * @org.apache.xbean.XBean
058     *
059     */
060    @Deprecated
061    public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
062    
063        private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
064        private static final Logger LOG = LoggerFactory.getLogger(KahaPersistenceAdapter.class);
065        private static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
066    
067        protected OpenWireFormat wireFormat = new OpenWireFormat();
068        protected KahaTransactionStore transactionStore;
069        protected ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
070        protected ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
071        protected ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
072    
073        private long maxDataFileLength = 32 * 1024 * 1024;
074        private File directory;
075        private String brokerName;
076        private Store theStore;
077        private boolean initialized;
078        private final AtomicLong storeSize;
079        private boolean persistentIndex = true;
080        private BrokerService brokerService;
081    
082        public KahaPersistenceAdapter(AtomicLong size) {
083            this.storeSize=size;
084        }
085    
086        public KahaPersistenceAdapter() {
087            this(new AtomicLong());
088        }
089    
090        @Override
091        public Set<ActiveMQDestination> getDestinations() {
092            Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
093            try {
094                Store store = getStore();
095                for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) {
096                    ContainerId id = (ContainerId)i.next();
097                    Object obj = id.getKey();
098                    if (obj instanceof ActiveMQDestination) {
099                        rc.add((ActiveMQDestination)obj);
100                    }
101                }
102            } catch (IOException e) {
103                LOG.error("Failed to get destinations ", e);
104            }
105            return rc;
106        }
107    
108        @Override
109        public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
110            MessageStore rc = queues.get(destination);
111            if (rc == null) {
112                rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination);
113                messageStores.put(destination, rc);
114                if (transactionStore != null) {
115                    rc = transactionStore.proxy(rc);
116                }
117                queues.put(destination, rc);
118            }
119            return rc;
120        }
121    
122        @Override
123        public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
124            throws IOException {
125            TopicMessageStore rc = topics.get(destination);
126            if (rc == null) {
127                Store store = getStore();
128                MapContainer messageContainer = getMapContainer(destination, "topic-data");
129                MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
130                                                                 "topic-subs");
131                ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(),
132                                                                                 "topic-acks");
133                ackContainer.setMarshaller(new TopicSubAckMarshaller());
134                rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination);
135                messageStores.put(destination, rc);
136                if (transactionStore != null) {
137                    rc = transactionStore.proxy(rc);
138                }
139                topics.put(destination, rc);
140            }
141            return rc;
142        }
143    
144        /**
145         * Cleanup method to remove any state associated with the given destination
146         *
147         * @param destination Destination to forget
148         */
149        @Override
150        public void removeQueueMessageStore(ActiveMQQueue destination) {
151            queues.remove(destination);
152            try{
153                if(theStore!=null){
154                    theStore.deleteMapContainer(destination,"queue-data");
155                }
156            }catch(IOException e ){
157                LOG.error("Failed to remove store map container for queue:"+destination, e);
158            }
159        }
160    
161        /**
162         * Cleanup method to remove any state associated with the given destination
163         *
164         * @param destination Destination to forget
165         */
166        @Override
167        public void removeTopicMessageStore(ActiveMQTopic destination) {
168            topics.remove(destination);
169        }
170    
171        protected MessageStore retrieveMessageStore(Object id) {
172            MessageStore result = messageStores.get(id);
173            return result;
174        }
175    
176        @Override
177        public TransactionStore createTransactionStore() throws IOException {
178            if (transactionStore == null) {
179                while (true) {
180                    try {
181                        Store store = getStore();
182                        MapContainer container = store
183                            .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
184                        container.setKeyMarshaller(new CommandMarshaller(wireFormat));
185                        container.setValueMarshaller(new TransactionMarshaller(wireFormat));
186                        container.load();
187                        transactionStore = new KahaTransactionStore(this, container);
188                        transactionStore.setBrokerService(brokerService);
189                        break;
190                    } catch (StoreLockedExcpetion e) {
191                        LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
192                                 + " seconds for the Store to be unlocked.");
193                        try {
194                            Thread.sleep(STORE_LOCKED_WAIT_DELAY);
195                        } catch (InterruptedException e1) {
196                        }
197                    }
198                }
199            }
200            return transactionStore;
201        }
202    
203        @Override
204        public void beginTransaction(ConnectionContext context) {
205        }
206    
207        @Override
208        public void commitTransaction(ConnectionContext context) throws IOException {
209            if (theStore != null) {
210                theStore.force();
211            }
212        }
213    
214        @Override
215        public void rollbackTransaction(ConnectionContext context) {
216        }
217    
218        @Override
219        public void start() throws Exception {
220            initialize();
221        }
222    
223        @Override
224        public void stop() throws Exception {
225            if (theStore != null) {
226                theStore.close();
227            }
228        }
229    
230        @Override
231        public long getLastMessageBrokerSequenceId() throws IOException {
232            return 0;
233        }
234    
235        @Override
236        public void deleteAllMessages() throws IOException {
237            if (theStore != null) {
238                if (theStore.isInitialized()) {
239                    theStore.clear();
240                } else {
241                    theStore.delete();
242                }
243            } else {
244                StoreFactory.delete(getStoreDirectory());
245            }
246        }
247    
248        protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName)
249            throws IOException {
250            Store store = getStore();
251            MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName);
252            container.setKeyMarshaller(new MessageIdMarshaller());
253            container.setValueMarshaller(new MessageMarshaller(wireFormat));
254            container.load();
255            return container;
256        }
257    
258        protected MapContainer getSubsMapContainer(Object id, String containerName)
259            throws IOException {
260            Store store = getStore();
261            MapContainer container = store.getMapContainer(id, containerName);
262            container.setKeyMarshaller(Store.STRING_MARSHALLER);
263            container.setValueMarshaller(createMessageMarshaller());
264            container.load();
265            return container;
266        }
267    
268        protected Marshaller<Object> createMessageMarshaller() {
269            return new CommandMarshaller(wireFormat);
270        }
271    
272        protected ListContainer<TopicSubAck> getListContainer(Object id, String containerName) throws IOException {
273            Store store = getStore();
274            ListContainer<TopicSubAck> container = store.getListContainer(id, containerName);
275            container.setMarshaller(createMessageMarshaller());
276            container.load();
277            return container;
278        }
279    
280        /**
281         * @param usageManager The UsageManager that is controlling the broker's
282         *                memory usage.
283         */
284        @Override
285        public void setUsageManager(SystemUsage usageManager) {
286        }
287    
288        /**
289         * @return the maxDataFileLength
290         */
291        public long getMaxDataFileLength() {
292            return maxDataFileLength;
293        }
294    
295        public boolean isPersistentIndex() {
296            return persistentIndex;
297        }
298    
299        public void setPersistentIndex(boolean persistentIndex) {
300            this.persistentIndex = persistentIndex;
301        }
302    
303        /**
304         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
305         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
306         */
307        public void setMaxDataFileLength(long maxDataFileLength) {
308            this.maxDataFileLength = maxDataFileLength;
309        }
310    
311        protected final synchronized Store getStore() throws IOException {
312            if (theStore == null) {
313                theStore = createStore();
314            }
315            return theStore;
316        }
317    
318        protected final Store createStore() throws IOException {
319            Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
320            result.setMaxDataFileLength(maxDataFileLength);
321            result.setPersistentIndex(isPersistentIndex());
322            result.setDefaultContainerName("container-roots");
323            return result;
324        }
325    
326        private String getStoreName() {
327            initialize();
328            return directory.getAbsolutePath();
329        }
330    
331        private File getStoreDirectory() {
332            initialize();
333            return directory;
334        }
335    
336        @Override
337        public String toString() {
338            return "KahaPersistenceAdapter(" + getStoreName() + ")";
339        }
340    
341        @Override
342        public void setBrokerName(String brokerName) {
343            this.brokerName = brokerName;
344        }
345    
346        public String getBrokerName() {
347            return brokerName;
348        }
349    
350        @Override
351        public File getDirectory() {
352            return this.directory;
353        }
354    
355        @Override
356        public void setDirectory(File directory) {
357            this.directory = directory;
358        }
359    
360        @Override
361        public void checkpoint(boolean sync) throws IOException {
362            if (sync) {
363                getStore().force();
364            }
365        }
366    
367        @Override
368        public long size(){
369           return storeSize.get();
370        }
371    
372        private void initialize() {
373            if (!initialized) {
374                initialized = true;
375                if (this.directory == null) {
376                    File file = new File(IOHelper.getDefaultDataDirectory());
377                    file = new File(file, IOHelper.toFileSystemSafeName(brokerName) + "-kahastore");
378                    setDirectory(file);
379                }
380                try {
381                    IOHelper.mkdirs(this.directory);
382                } catch (IOException e) {
383                    throw new RuntimeException(e);
384                }
385                wireFormat.setCacheEnabled(false);
386                wireFormat.setTightEncodingEnabled(true);
387            }
388        }
389    
390        @Override
391        public void setBrokerService(BrokerService brokerService) {
392            this.brokerService = brokerService;
393        }
394    
395        @Override
396        public long getLastProducerSequenceId(ProducerId id) {
397            // reference store send has adequate duplicate suppression
398            return -1;
399        }
400    
401    
402    }