001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.Map.Entry;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.ActiveMQQueue;
030    import org.apache.activemq.command.ActiveMQTempQueue;
031    import org.apache.activemq.command.ActiveMQTempTopic;
032    import org.apache.activemq.command.ActiveMQTopic;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageId;
036    import org.apache.activemq.command.ProducerId;
037    import org.apache.activemq.command.SubscriptionInfo;
038    import org.apache.activemq.command.TransactionId;
039    import org.apache.activemq.command.XATransactionId;
040    import org.apache.activemq.openwire.OpenWireFormat;
041    import org.apache.activemq.protobuf.Buffer;
042    import org.apache.activemq.store.AbstractMessageStore;
043    import org.apache.activemq.store.MessageRecoveryListener;
044    import org.apache.activemq.store.MessageStore;
045    import org.apache.activemq.store.PersistenceAdapter;
046    import org.apache.activemq.store.TopicMessageStore;
047    import org.apache.activemq.store.TransactionRecoveryListener;
048    import org.apache.activemq.store.TransactionStore;
049    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
050    import org.apache.activemq.store.kahadb.data.KahaDestination;
051    import org.apache.activemq.store.kahadb.data.KahaLocation;
052    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
053    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
054    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
055    import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
056    import org.apache.activemq.usage.MemoryUsage;
057    import org.apache.activemq.usage.SystemUsage;
058    import org.apache.activemq.util.ByteSequence;
059    import org.apache.activemq.wireformat.WireFormat;
060    import org.apache.kahadb.journal.Location;
061    import org.apache.kahadb.page.Transaction;
062    
063    public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
064    
065        private final WireFormat wireFormat = new OpenWireFormat();
066    
067        public void setBrokerName(String brokerName) {
068        }
069        public void setUsageManager(SystemUsage usageManager) {
070        }
071    
072        public TransactionStore createTransactionStore() throws IOException {
073            return new TransactionStore(){
074                
075                public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
076                    if (preCommit != null) {
077                        preCommit.run();
078                    }
079                    processCommit(txid);
080                    if (postCommit != null) {
081                        postCommit.run();
082                    }
083                }
084                public void prepare(TransactionId txid) throws IOException {
085                    processPrepare(txid);
086                }
087                public void rollback(TransactionId txid) throws IOException {
088                    processRollback(txid);
089                }
090                public void recover(TransactionRecoveryListener listener) throws IOException {
091                    for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
092                        XATransactionId xid = (XATransactionId)entry.getKey();
093                        ArrayList<Message> messageList = new ArrayList<Message>();
094                        ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
095                        
096                        for (Operation op : entry.getValue()) {
097                            if( op.getClass() == AddOpperation.class ) {
098                                AddOpperation addOp = (AddOpperation)op;
099                                Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
100                                messageList.add(msg);
101                            } else {
102                                RemoveOpperation rmOp = (RemoveOpperation)op;
103                                MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
104                                ackList.add(ack);
105                            }
106                        }
107                        
108                        Message[] addedMessages = new Message[messageList.size()];
109                        MessageAck[] acks = new MessageAck[ackList.size()];
110                        messageList.toArray(addedMessages);
111                        ackList.toArray(acks);
112                        listener.recover(xid, addedMessages, acks);
113                    }
114                }
115                public void start() throws Exception {
116                }
117                public void stop() throws Exception {
118                }
119            };
120        }
121    
122        public class KahaDBMessageStore extends AbstractMessageStore {
123            protected KahaDestination dest;
124    
125            public KahaDBMessageStore(ActiveMQDestination destination) {
126                super(destination);
127                this.dest = convert( destination );
128            }
129    
130            @Override
131            public ActiveMQDestination getDestination() {
132                return destination;
133            }
134    
135            public void addMessage(ConnectionContext context, Message message) throws IOException {
136                KahaAddMessageCommand command = new KahaAddMessageCommand();
137                command.setDestination(dest);
138                command.setMessageId(message.getMessageId().toString());
139                processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
140            }
141            
142            public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
143                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
144                command.setDestination(dest);
145                command.setMessageId(ack.getLastMessageId().toString());
146                processRemove(command, ack.getTransactionId());
147            }
148    
149            public void removeAllMessages(ConnectionContext context) throws IOException {
150                KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
151                command.setDestination(dest);
152                process(command);
153            }
154    
155            public Message getMessage(MessageId identity) throws IOException {
156                final String key = identity.toString();
157                
158                // Hopefully one day the page file supports concurrent read operations... but for now we must
159                // externally synchronize...
160                ByteSequence data;
161                synchronized(indexMutex) {
162                    data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
163                        public ByteSequence execute(Transaction tx) throws IOException {
164                            StoredDestination sd = getStoredDestination(dest, tx);
165                            Long sequence = sd.messageIdIndex.get(tx, key);
166                            if( sequence ==null ) {
167                                return null;
168                            }
169                            return sd.orderIndex.get(tx, sequence).data;
170                        }
171                    });
172                }
173                if( data == null ) {
174                    return null;
175                }
176                
177                Message msg = (Message)wireFormat.unmarshal( data );
178                            return msg;
179            }
180            
181            public int getMessageCount() throws IOException {
182                synchronized(indexMutex) {
183                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
184                        public Integer execute(Transaction tx) throws IOException {
185                            // Iterate through all index entries to get a count of messages in the destination.
186                            StoredDestination sd = getStoredDestination(dest, tx);
187                            int rc=0;
188                            for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
189                                iterator.next();
190                                rc++;
191                            }
192                            return rc;
193                        }
194                    });
195                }
196            }
197    
198            public void recover(final MessageRecoveryListener listener) throws Exception {
199                synchronized(indexMutex) {
200                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
201                        public void execute(Transaction tx) throws Exception {
202                            StoredDestination sd = getStoredDestination(dest, tx);
203                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
204                                Entry<Long, MessageRecord> entry = iterator.next();
205                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
206                            }
207                        }
208                    });
209                }
210            }
211    
212            long cursorPos=0;
213            
214            public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
215                synchronized(indexMutex) {
216                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
217                        public void execute(Transaction tx) throws Exception {
218                            StoredDestination sd = getStoredDestination(dest, tx);
219                            Entry<Long, MessageRecord> entry=null;
220                            int counter = 0;
221                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
222                                entry = iterator.next();
223                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
224                                counter++;
225                                if( counter >= maxReturned ) {
226                                    break;
227                                }
228                            }
229                            if( entry!=null ) {
230                                cursorPos = entry.getKey()+1;
231                            }
232                        }
233                    });
234                }
235            }
236    
237            public void resetBatching() {
238                cursorPos=0;
239            }
240    
241            
242            @Override
243            public void setBatch(MessageId identity) throws IOException {
244                final String key = identity.toString();
245                
246                // Hopefully one day the page file supports concurrent read operations... but for now we must
247                // externally synchronize...
248                Long location;
249                synchronized(indexMutex) {
250                    location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
251                        public Long execute(Transaction tx) throws IOException {
252                            StoredDestination sd = getStoredDestination(dest, tx);
253                            return sd.messageIdIndex.get(tx, key);
254                        }
255                    });
256                }
257                if( location!=null ) {
258                    cursorPos=location+1;
259                }
260                
261            }
262    
263            @Override
264            public void setMemoryUsage(MemoryUsage memoeyUSage) {
265            }
266            @Override
267            public void start() throws Exception {
268            }
269            @Override
270            public void stop() throws Exception {
271            }
272            
273        }
274            
275        class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
276            public KahaDBTopicMessageStore(ActiveMQTopic destination) {
277                super(destination);
278            }
279            
280            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
281                                    MessageId messageId, MessageAck ack) throws IOException {
282                KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
283                command.setDestination(dest);
284                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
285                command.setMessageId(messageId.toString());
286                // We are not passed a transaction info.. so we can't participate in a transaction.
287                // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
288                // to pass back to the XA recover method.
289                // command.setTransactionInfo();
290                processRemove(command, null);
291            }
292    
293            public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
294                String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
295                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
296                command.setDestination(dest);
297                command.setSubscriptionKey(subscriptionKey);
298                command.setRetroactive(retroactive);
299                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
300                command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
301                process(command);
302            }
303    
304            public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
305                KahaSubscriptionCommand command = new KahaSubscriptionCommand();
306                command.setDestination(dest);
307                command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
308                process(command);
309            }
310    
311            public SubscriptionInfo[] getAllSubscriptions() throws IOException {
312                
313                final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
314                synchronized(indexMutex) {
315                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
316                        public void execute(Transaction tx) throws IOException {
317                            StoredDestination sd = getStoredDestination(dest, tx);
318                            for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
319                                Entry<String, KahaSubscriptionCommand> entry = iterator.next();
320                                SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
321                                subscriptions.add(info);
322    
323                            }
324                        }
325                    });
326                }
327                
328                SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
329                subscriptions.toArray(rc);
330                return rc;
331            }
332    
333            public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
334                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
335                synchronized(indexMutex) {
336                    return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
337                        public SubscriptionInfo execute(Transaction tx) throws IOException {
338                            StoredDestination sd = getStoredDestination(dest, tx);
339                            KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
340                            if( command ==null ) {
341                                return null;
342                            }
343                            return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
344                        }
345                    });
346                }
347            }
348           
349            public int getMessageCount(String clientId, String subscriptionName) throws IOException {
350                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
351                synchronized(indexMutex) {
352                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
353                        public Integer execute(Transaction tx) throws IOException {
354                            StoredDestination sd = getStoredDestination(dest, tx);
355                            Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
356                            if ( cursorPos==null ) {
357                                // The subscription might not exist.
358                                return 0;
359                            }
360                            cursorPos += 1;
361                            
362                            int counter = 0;
363                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
364                                iterator.next();
365                                counter++;
366                            }
367                            return counter;
368                        }
369                    });
370                }        
371            }
372    
373            public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
374                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
375                synchronized(indexMutex) {
376                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
377                        public void execute(Transaction tx) throws Exception {
378                            StoredDestination sd = getStoredDestination(dest, tx);
379                            Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
380                            cursorPos += 1;
381                            
382                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
383                                Entry<Long, MessageRecord> entry = iterator.next();
384                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
385                            }
386                        }
387                    });
388                }
389            }
390    
391            public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
392                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
393                synchronized(indexMutex) {
394                    pageFile.tx().execute(new Transaction.Closure<Exception>(){
395                        public void execute(Transaction tx) throws Exception {
396                            StoredDestination sd = getStoredDestination(dest, tx);
397                            Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
398                            if( cursorPos == null ) {
399                                cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
400                                cursorPos += 1;
401                            }
402                            
403                            Entry<Long, MessageRecord> entry=null;
404                            int counter = 0;
405                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
406                                entry = iterator.next();
407                                listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
408                                counter++;
409                                if( counter >= maxReturned ) {
410                                    break;
411                                }
412                            }
413                            if( entry!=null ) {
414                                sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
415                            }
416                        }
417                    });
418                }
419            }
420    
421            public void resetBatching(String clientId, String subscriptionName) {
422                try {
423                    final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
424                    synchronized(indexMutex) {
425                        pageFile.tx().execute(new Transaction.Closure<IOException>(){
426                            public void execute(Transaction tx) throws IOException {
427                                StoredDestination sd = getStoredDestination(dest, tx);
428                                sd.subscriptionCursors.remove(subscriptionKey);
429                            }
430                        });
431                    }
432                } catch (IOException e) {
433                    throw new RuntimeException(e);
434                }
435            }
436        }
437    
438        String subscriptionKey(String clientId, String subscriptionName){
439            return clientId+":"+subscriptionName;
440        }
441        
442        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
443            return new KahaDBMessageStore(destination);
444        }
445    
446        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
447            return new KahaDBTopicMessageStore(destination);
448        }
449    
450        /**
451         * Cleanup method to remove any state associated with the given destination.
452         * This method does not stop the message store (it might not be cached).
453         *
454         * @param destination Destination to forget
455         */
456        public void removeQueueMessageStore(ActiveMQQueue destination) {
457        }
458    
459        /**
460         * Cleanup method to remove any state associated with the given destination
461         * This method does not stop the message store (it might not be cached).
462         *
463         * @param destination Destination to forget
464         */
465        public void removeTopicMessageStore(ActiveMQTopic destination) {
466        }
467    
468        public void deleteAllMessages() throws IOException {
469        }
470        
471        
472        public Set<ActiveMQDestination> getDestinations() {
473            try {
474                final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
475                synchronized(indexMutex) {
476                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
477                        public void execute(Transaction tx) throws IOException {
478                            for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
479                                Entry<String, StoredDestination> entry = iterator.next();
480                                rc.add(convert(entry.getKey()));
481                            }
482                        }
483                    });
484                }
485                return rc;
486            } catch (IOException e) {
487                throw new RuntimeException(e);
488            }
489        }
490        
491        public long getLastMessageBrokerSequenceId() throws IOException {
492            return 0;
493        }
494        
495        public long size() {
496            if ( !started.get() ) {
497                return 0;
498            }
499            try {
500                return pageFile.getDiskSize();
501            } catch (IOException e) {
502                throw new RuntimeException(e);
503            }
504        }
505    
506        public void beginTransaction(ConnectionContext context) throws IOException {
507            throw new IOException("Not yet implemented.");
508        }
509        public void commitTransaction(ConnectionContext context) throws IOException {
510            throw new IOException("Not yet implemented.");
511        }
512        public void rollbackTransaction(ConnectionContext context) throws IOException {
513            throw new IOException("Not yet implemented.");
514        }
515        
516        public void checkpoint(boolean sync) throws IOException {
517        }    
518    
519        ///////////////////////////////////////////////////////////////////
520        // Internal conversion methods.
521        ///////////////////////////////////////////////////////////////////
522        
523    
524        
525        KahaLocation convert(Location location) {
526            KahaLocation rc = new KahaLocation();
527            rc.setLogId(location.getDataFileId());
528            rc.setOffset(location.getOffset());
529            return rc;
530        }
531        
532        KahaDestination convert(ActiveMQDestination dest) {
533            KahaDestination rc = new KahaDestination();
534            rc.setName(dest.getPhysicalName());
535            switch( dest.getDestinationType() ) {
536            case ActiveMQDestination.QUEUE_TYPE:
537                rc.setType(DestinationType.QUEUE);
538                return rc;
539            case ActiveMQDestination.TOPIC_TYPE:
540                rc.setType(DestinationType.TOPIC);
541                return rc;
542            case ActiveMQDestination.TEMP_QUEUE_TYPE:
543                rc.setType(DestinationType.TEMP_QUEUE);
544                return rc;
545            case ActiveMQDestination.TEMP_TOPIC_TYPE:
546                rc.setType(DestinationType.TEMP_TOPIC);
547                return rc;
548            default:
549                return null;
550            }
551        }
552    
553        ActiveMQDestination convert(String dest) {
554            int p = dest.indexOf(":");
555            if( p<0 ) {
556                throw new IllegalArgumentException("Not in the valid destination format");
557            }
558            int type = Integer.parseInt(dest.substring(0, p));
559            String name = dest.substring(p+1);
560            
561            switch( KahaDestination.DestinationType.valueOf(type) ) {
562            case QUEUE:
563                return new ActiveMQQueue(name);
564            case TOPIC:
565                return new ActiveMQTopic(name);
566            case TEMP_QUEUE:
567                return new ActiveMQTempQueue(name);
568            case TEMP_TOPIC:
569                return new ActiveMQTempTopic(name);
570            default:    
571                throw new IllegalArgumentException("Not in the valid destination format");
572            }
573        }
574        
575        public long getLastProducerSequenceId(ProducerId id) {
576            return -1;
577        }
578            
579    }