001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.FileLock;
023    import java.util.Date;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.atomic.AtomicLong;
030    
031    import org.apache.activemq.kaha.ContainerId;
032    import org.apache.activemq.kaha.ListContainer;
033    import org.apache.activemq.kaha.MapContainer;
034    import org.apache.activemq.kaha.Store;
035    import org.apache.activemq.kaha.StoreLocation;
036    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
037    import org.apache.activemq.kaha.impl.async.DataManagerFacade;
038    import org.apache.activemq.kaha.impl.container.ListContainerImpl;
039    import org.apache.activemq.kaha.impl.container.MapContainerImpl;
040    import org.apache.activemq.kaha.impl.data.DataManagerImpl;
041    import org.apache.activemq.kaha.impl.data.Item;
042    import org.apache.activemq.kaha.impl.data.RedoListener;
043    import org.apache.activemq.kaha.impl.index.IndexItem;
044    import org.apache.activemq.kaha.impl.index.IndexManager;
045    import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
046    import org.apache.activemq.util.IOHelper;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * Store Implementation
052     * 
053     * 
054     */
055    public class KahaStore implements Store {
056    
057        private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
058        private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
059                                                                                         + ".FileLockBroken",
060                                                                                         "false"));
061        private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
062                                                                                        + ".DisableLocking",
063                                                                                        "false"));
064        //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm
065        //and we can use it as a monitor for the lockset.
066        private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor";
067        private static final Logger LOG = LoggerFactory.getLogger(KahaStore.class);
068    
069        private final File directory;
070        private final String mode;
071        private IndexRootContainer mapsContainer;
072        private IndexRootContainer listsContainer;
073        private final Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap<ContainerId, ListContainerImpl>();
074        private final Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap<ContainerId, MapContainerImpl>();
075        private final Map<String, DataManager> dataManagers = new ConcurrentHashMap<String, DataManager>();
076        private final Map<String, IndexManager> indexManagers = new ConcurrentHashMap<String, IndexManager>();
077        private boolean closed;
078        private boolean initialized;
079        private boolean logIndexChanges;
080        private boolean useAsyncDataManager;
081        private long maxDataFileLength = 1024 * 1024 * 32;
082        private FileLock lock;
083        private boolean persistentIndex = true;
084        private RandomAccessFile lockFile;
085        private final AtomicLong storeSize;
086        private String defaultContainerName = DEFAULT_CONTAINER_NAME;
087    
088        
089        public KahaStore(String name, String mode) throws IOException {
090            this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong());
091        }
092    
093        public KahaStore(File directory, String mode) throws IOException {
094            this(directory, mode, new AtomicLong());
095        }
096    
097        public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
098            this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize);
099        }
100        
101        public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException {
102            this.mode = mode;
103            this.storeSize = storeSize;
104            this.directory = directory;
105            IOHelper.mkdirs(this.directory);
106        }
107    
108        public synchronized void close() throws IOException {
109            if (!closed) {
110                closed = true;
111                if (initialized) {
112                    unlock();
113                    for (ListContainerImpl container : lists.values()) {
114                        container.close();
115                    }
116                    lists.clear();
117                    for (MapContainerImpl container : maps.values()) {
118                        container.close();
119                    }
120                    maps.clear();
121                    for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
122                        IndexManager im = iter.next();
123                        im.close();
124                        iter.remove();
125                    }
126                    for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
127                        DataManager dm = iter.next();
128                        dm.close();
129                        iter.remove();
130                    }
131                }
132                if (lockFile!=null) {
133                    lockFile.close();
134                    lockFile=null;
135                }
136            }
137        }
138    
139        public synchronized void force() throws IOException {
140            if (initialized) {
141                for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
142                    IndexManager im = iter.next();
143                    im.force();
144                }
145                for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
146                    DataManager dm = iter.next();
147                    dm.force();
148                }
149            }
150        }
151    
152        public synchronized void clear() throws IOException {
153            initialize();
154            for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
155                ContainerId id = (ContainerId)i.next();
156                MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName());
157                container.clear();
158            }
159            for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
160                ContainerId id = (ContainerId)i.next();
161                ListContainer container = getListContainer(id.getKey(), id.getDataContainerName());
162                container.clear();
163            }
164    
165        }
166    
167        public synchronized boolean delete() throws IOException {
168            boolean result = true;
169            if (initialized) {
170                clear();
171                for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
172                    IndexManager im = iter.next();
173                    result &= im.delete();
174                    iter.remove();
175                }
176                for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
177                    DataManager dm = iter.next();
178                    result &= dm.delete();
179                    iter.remove();
180                }
181            }
182            if (directory != null && directory.isDirectory()) {
183                result =IOHelper.deleteChildren(directory);
184                String str = result ? "successfully deleted" : "failed to delete";
185                LOG.info("Kaha Store " + str + " data directory " + directory);
186            }
187            return result;
188        }
189    
190        public synchronized boolean isInitialized() {
191            return initialized;
192        }
193    
194        public boolean doesMapContainerExist(Object id) throws IOException {
195            return doesMapContainerExist(id, defaultContainerName);
196        }
197    
198        public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException {
199            initialize();
200            ContainerId containerId = new ContainerId(id, containerName);
201            return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId);
202        }
203    
204        public MapContainer getMapContainer(Object id) throws IOException {
205            return getMapContainer(id, defaultContainerName);
206        }
207    
208        public MapContainer getMapContainer(Object id, String containerName) throws IOException {
209            return getMapContainer(id, containerName, persistentIndex);
210        }
211    
212        public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
213            throws IOException {
214            initialize();
215            ContainerId containerId = new ContainerId(id, containerName);
216            MapContainerImpl result = maps.get(containerId);
217            if (result == null) {
218                DataManager dm = getDataManager(containerName);
219                IndexManager im = getIndexManager(dm, containerName);
220    
221                IndexItem root = mapsContainer.getRoot(im, containerId);
222                if (root == null) {
223                    root = mapsContainer.addRoot(im, containerId);
224                }
225                result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex);
226                maps.put(containerId, result);
227            }
228            return result;
229        }
230    
231        public void deleteMapContainer(Object id) throws IOException {
232            deleteMapContainer(id, defaultContainerName);
233        }
234    
235        public void deleteMapContainer(Object id, String containerName) throws IOException {
236            ContainerId containerId = new ContainerId(id, containerName);
237            deleteMapContainer(containerId);
238        }
239    
240        public synchronized void deleteMapContainer(ContainerId containerId) throws IOException {
241            initialize();
242            MapContainerImpl container = maps.remove(containerId);
243            if (container != null) {
244                container.clear();
245                mapsContainer.removeRoot(container.getIndexManager(), containerId);
246                container.close();
247            }
248        }
249    
250        public synchronized Set<ContainerId> getMapContainerIds() throws IOException {
251            initialize();
252            Set<ContainerId> set = new HashSet<ContainerId>();
253            for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
254                ContainerId id = (ContainerId)i.next();
255                set.add(id);
256            }
257            return set;
258        }
259    
260        public boolean doesListContainerExist(Object id) throws IOException {
261            return doesListContainerExist(id, defaultContainerName);
262        }
263    
264        public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException {
265            initialize();
266            ContainerId containerId = new ContainerId(id, containerName);
267            return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId);
268        }
269    
270        public ListContainer getListContainer(Object id) throws IOException {
271            return getListContainer(id, defaultContainerName);
272        }
273    
274        public ListContainer getListContainer(Object id, String containerName) throws IOException {
275            return getListContainer(id, containerName, persistentIndex);
276        }
277    
278        public synchronized ListContainer getListContainer(Object id, String containerName,
279                                                           boolean persistentIndex) throws IOException {
280            initialize();
281            ContainerId containerId = new ContainerId(id, containerName);
282            ListContainerImpl result = lists.get(containerId);
283            if (result == null) {
284                DataManager dm = getDataManager(containerName);
285                IndexManager im = getIndexManager(dm, containerName);
286    
287                IndexItem root = listsContainer.getRoot(im, containerId);
288                if (root == null) {
289                    root = listsContainer.addRoot(im, containerId);
290                }
291                result = new ListContainerImpl(containerId, root, im, dm, persistentIndex);
292                lists.put(containerId, result);
293            }
294            return result;
295        }
296    
297        public void deleteListContainer(Object id) throws IOException {
298            deleteListContainer(id, defaultContainerName);
299        }
300    
301        public synchronized void deleteListContainer(Object id, String containerName) throws IOException {
302            ContainerId containerId = new ContainerId(id, containerName);
303            deleteListContainer(containerId);
304        }
305    
306        public synchronized void deleteListContainer(ContainerId containerId) throws IOException {
307            initialize();
308            ListContainerImpl container = lists.remove(containerId);
309            if (container != null) {
310                listsContainer.removeRoot(container.getIndexManager(), containerId);
311                container.clear();
312                container.close();
313            }
314        }
315    
316        public synchronized Set<ContainerId> getListContainerIds() throws IOException {
317            initialize();
318            Set<ContainerId> set = new HashSet<ContainerId>();
319            for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
320                ContainerId id = (ContainerId)i.next();
321                set.add(id);
322            }
323            return set;
324        }
325    
326        /**
327         * @return the listsContainer
328         */
329        public IndexRootContainer getListsContainer() {
330            return this.listsContainer;
331        }
332    
333        /**
334         * @return the mapsContainer
335         */
336        public IndexRootContainer getMapsContainer() {
337            return this.mapsContainer;
338        }
339    
340        public synchronized DataManager getDataManager(String name) throws IOException {
341            DataManager dm = dataManagers.get(name);
342            if (dm == null) {
343                if (isUseAsyncDataManager()) {
344                    AsyncDataManager t = new AsyncDataManager(storeSize);
345                    t.setDirectory(directory);
346                    t.setFilePrefix("async-data-" + name + "-");
347                    t.setMaxFileLength((int)maxDataFileLength);
348                    t.start();
349                    dm = new DataManagerFacade(t, name);
350                } else {
351                    DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
352                    t.setMaxFileLength(maxDataFileLength);
353                    dm = t;
354                }
355                if (logIndexChanges) {
356                    recover(dm);
357                }
358                dataManagers.put(name, dm);
359            }
360            return dm;
361        }
362    
363        public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
364            IndexManager im = indexManagers.get(name);
365            if (im == null) {
366                im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
367                indexManagers.put(name, im);
368            }
369            return im;
370        }
371    
372        private void recover(final DataManager dm) throws IOException {
373            dm.recoverRedoItems(new RedoListener() {
374                public void onRedoItem(StoreLocation item, Object o) throws Exception {
375                    RedoStoreIndexItem redo = (RedoStoreIndexItem)o;
376                    // IndexManager im = getIndexManager(dm, redo.getIndexName());
377                    IndexManager im = getIndexManager(dm, dm.getName());
378                    im.redo(redo);
379                }
380            });
381        }
382    
383        public synchronized boolean isLogIndexChanges() {
384            return logIndexChanges;
385        }
386    
387        public synchronized void setLogIndexChanges(boolean logIndexChanges) {
388            this.logIndexChanges = logIndexChanges;
389        }
390    
391        /**
392         * @return the maxDataFileLength
393         */
394        public synchronized long getMaxDataFileLength() {
395            return maxDataFileLength;
396        }
397    
398        /**
399         * @param maxDataFileLength the maxDataFileLength to set
400         */
401        public synchronized void setMaxDataFileLength(long maxDataFileLength) {
402            this.maxDataFileLength = maxDataFileLength;
403        }
404    
405        /**
406         * @return the default index type
407         */
408        public synchronized String getIndexTypeAsString() {
409            return persistentIndex ? "PERSISTENT" : "VM";
410        }
411    
412        /**
413         * Set the default index type
414         * 
415         * @param type "PERSISTENT" or "VM"
416         */
417        public synchronized void setIndexTypeAsString(String type) {
418            if (type.equalsIgnoreCase("VM")) {
419                persistentIndex = false;
420            } else {
421                persistentIndex = true;
422            }
423        }
424        
425        public boolean isPersistentIndex() {
426                    return persistentIndex;
427            }
428    
429            public void setPersistentIndex(boolean persistentIndex) {
430                    this.persistentIndex = persistentIndex;
431            }
432            
433    
434        public synchronized boolean isUseAsyncDataManager() {
435            return useAsyncDataManager;
436        }
437    
438        public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
439            this.useAsyncDataManager = useAsyncWriter;
440        }
441    
442        /**
443         * @return size of store
444         * @see org.apache.activemq.kaha.Store#size()
445         */
446        public long size(){
447            return storeSize.get();
448        }
449    
450        public String getDefaultContainerName() {
451            return defaultContainerName;
452        }
453    
454        public void setDefaultContainerName(String defaultContainerName) {
455            this.defaultContainerName = defaultContainerName;
456        }
457    
458        public synchronized void initialize() throws IOException {
459            if (closed) {
460                throw new IOException("Store has been closed.");
461            }
462            if (!initialized) {       
463                LOG.info("Kaha Store using data directory " + directory);
464                lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
465                lock();
466                DataManager defaultDM = getDataManager(defaultContainerName);
467                IndexManager rootIndexManager = getIndexManager(defaultDM, defaultContainerName);
468                IndexItem mapRoot = new IndexItem();
469                IndexItem listRoot = new IndexItem();
470                if (rootIndexManager.isEmpty()) {
471                    mapRoot.setOffset(0);
472                    rootIndexManager.storeIndex(mapRoot);
473                    listRoot.setOffset(IndexItem.INDEX_SIZE);
474                    rootIndexManager.storeIndex(listRoot);
475                    rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2);
476                } else {
477                    mapRoot = rootIndexManager.getIndex(0);
478                    listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
479                }
480                initialized = true;
481                mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM);
482                listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM);
483                /**
484                 * Add interest in data files - then consolidate them
485                 */
486                generateInterestInMapDataFiles();
487                generateInterestInListDataFiles();
488                for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) {
489                    DataManager dm = i.next();
490                    dm.consolidateDataFiles();
491                }
492            }
493        }
494    
495        private void lock() throws IOException {
496            synchronized (LOCKSET_MONITOR) {
497                if (!DISABLE_LOCKING && directory != null && lock == null) {
498                    String key = getPropertyKey();
499                    String property = System.getProperty(key);
500                    if (null == property) {
501                        if (!BROKEN_FILE_LOCK) {
502                            lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false);
503                            if (lock == null) {
504                                throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + "  is already opened by another application");
505                            } else
506                                System.setProperty(key, new Date().toString());
507                        }
508                    } else { //already locked
509                        throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
510                    }
511                }
512            }
513        }
514    
515        private void unlock() throws IOException {
516            synchronized (LOCKSET_MONITOR) {
517                if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
518                    System.getProperties().remove(getPropertyKey());
519                    if (lock.isValid()) {
520                        lock.release();
521                    }
522                    lock = null;
523                }
524            }
525        }
526    
527    
528        private String getPropertyKey() throws IOException {
529            return getClass().getName() + ".lock." + directory.getCanonicalPath();
530        }
531    
532        /**
533         * scans the directory and builds up the IndexManager and DataManager
534         * 
535         * @throws IOException if there is a problem accessing an index or data file
536         */
537        private void generateInterestInListDataFiles() throws IOException {
538            for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
539                ContainerId id = (ContainerId)i.next();
540                DataManager dm = getDataManager(id.getDataContainerName());
541                IndexManager im = getIndexManager(dm, id.getDataContainerName());
542                IndexItem theRoot = listsContainer.getRoot(im, id);
543                long nextItem = theRoot.getNextItem();
544                while (nextItem != Item.POSITION_NOT_SET) {
545                    IndexItem item = im.getIndex(nextItem);
546                    item.setOffset(nextItem);
547                    dm.addInterestInFile(item.getKeyFile());
548                    dm.addInterestInFile(item.getValueFile());
549                    nextItem = item.getNextItem();
550                }
551            }
552        }
553    
554        /**
555         * scans the directory and builds up the IndexManager and DataManager
556         * 
557         * @throws IOException if there is a problem accessing an index or data file
558         */
559        private void generateInterestInMapDataFiles() throws IOException {
560            for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
561                ContainerId id = (ContainerId)i.next();
562                DataManager dm = getDataManager(id.getDataContainerName());
563                IndexManager im = getIndexManager(dm, id.getDataContainerName());
564                IndexItem theRoot = mapsContainer.getRoot(im, id);
565                long nextItem = theRoot.getNextItem();
566                while (nextItem != Item.POSITION_NOT_SET) {
567                    IndexItem item = im.getIndex(nextItem);
568                    item.setOffset(nextItem);
569                    dm.addInterestInFile(item.getKeyFile());
570                    dm.addInterestInFile(item.getValueFile());
571                    nextItem = item.getNextItem();
572                }
573    
574            }
575        }
576    }