001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.console.command.store;
018    
019    import org.apache.activemq.broker.BrokerFactory;
020    import org.apache.activemq.broker.BrokerService;
021    import org.apache.activemq.command.*;
022    import org.apache.activemq.console.command.store.proto.MessagePB;
023    import org.apache.activemq.console.command.store.proto.QueueEntryPB;
024    import org.apache.activemq.console.command.store.proto.QueuePB;
025    import org.apache.activemq.openwire.OpenWireFormat;
026    import org.apache.activemq.store.*;
027    import org.codehaus.jackson.map.ObjectMapper;
028    import org.fusesource.hawtbuf.AsciiBuffer;
029    import org.fusesource.hawtbuf.DataByteArrayOutputStream;
030    import org.fusesource.hawtbuf.UTF8Buffer;
031    
032    import java.io.BufferedOutputStream;
033    import java.io.File;
034    import java.io.FileOutputStream;
035    import java.io.IOException;
036    import java.net.URI;
037    import java.net.URISyntaxException;
038    import java.util.HashMap;
039    
040    /**
041     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
042     */
043    public class StoreExporter {
044    
045        static final int OPENWIRE_VERSION = 8;
046        static final boolean TIGHT_ENCODING = false;
047    
048        URI config;
049        File file;
050    
051        private ObjectMapper mapper = new ObjectMapper();
052        private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
053        private final AsciiBuffer ptp_kind = new AsciiBuffer("ptp");
054        private final AsciiBuffer codec_id = new AsciiBuffer("openwire");
055        private final OpenWireFormat wireformat = new OpenWireFormat();
056    
057        public StoreExporter() throws URISyntaxException {
058            config = new URI("xbean:activemq.xml");
059            wireformat.setCacheEnabled(false);
060            wireformat.setTightEncodingEnabled(TIGHT_ENCODING);
061            wireformat.setVersion(OPENWIRE_VERSION);
062        }
063    
064        public void execute() throws Exception {
065            if (config == null) {
066                throw new Exception("required --config option missing");
067            }
068            if (file == null) {
069                throw new Exception("required --file option missing");
070            }
071            System.out.println("Loading: " + config);
072            BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
073            BrokerService broker = BrokerFactory.createBroker(config);
074            BrokerFactory.resetStartDefault();
075            PersistenceAdapter store = broker.getPersistenceAdapter();
076            System.out.println("Starting: " + store);
077            store.start();
078            try {
079                BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(file));
080                try {
081                    export(store, fos);
082                } finally {
083                    fos.close();
084                }
085            } finally {
086                store.stop();
087            }
088        }
089    
090        void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
091    
092    
093            final long[] messageKeyCounter = new long[]{0};
094            final long[] containerKeyCounter = new long[]{0};
095            final ExportStreamManager manager = new ExportStreamManager(fos, 1);
096    
097    
098            final int[] preparedTxs = new int[]{0};
099            store.createTransactionStore().recover(new TransactionRecoveryListener() {
100                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
101                    preparedTxs[0] += 1;
102                }
103            });
104    
105            if (preparedTxs[0] > 0) {
106                throw new Exception("Cannot export a store with prepared XA transactions.  Please commit or rollback those transactions before attempting to export.");
107            }
108    
109            for (ActiveMQDestination odest : store.getDestinations()) {
110                containerKeyCounter[0]++;
111                if (odest instanceof ActiveMQQueue) {
112                    ActiveMQQueue dest = (ActiveMQQueue) odest;
113                    MessageStore queue = store.createQueueMessageStore(dest);
114    
115                    QueuePB.Bean destRecord = new QueuePB.Bean();
116                    destRecord.setKey(containerKeyCounter[0]);
117                    destRecord.setBindingKind(ptp_kind);
118    
119                    final long[] seqKeyCounter = new long[]{0};
120    
121                    HashMap<String, Object> jsonMap = new HashMap<String, Object>();
122                    jsonMap.put("@class", "queue_destination");
123                    jsonMap.put("name", dest.getQueueName());
124                    String json = mapper.writeValueAsString(jsonMap);
125                    System.out.println(json);
126                    destRecord.setBindingData(new UTF8Buffer(json));
127                    manager.store_queue(destRecord);
128    
129                    queue.recover(new MessageRecoveryListener() {
130                        public boolean hasSpace() {
131                            return true;
132                        }
133    
134                        public boolean recoverMessageReference(MessageId ref) throws Exception {
135                            return true;
136                        }
137    
138                        public boolean isDuplicate(MessageId ref) {
139                            return false;
140                        }
141    
142                        public boolean recoverMessage(Message message) throws IOException {
143                            messageKeyCounter[0]++;
144                            seqKeyCounter[0]++;
145    
146                            MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
147                            manager.store_message(messageRecord);
148    
149                            QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
150                            manager.store_queue_entry(entryRecord);
151    
152                            return true;
153                        }
154                    });
155    
156                } else if (odest instanceof ActiveMQTopic) {
157                    ActiveMQTopic dest = (ActiveMQTopic) odest;
158    
159                    TopicMessageStore topic = store.createTopicMessageStore(dest);
160                    for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
161    
162                        QueuePB.Bean destRecord = new QueuePB.Bean();
163                        destRecord.setKey(containerKeyCounter[0]);
164                        destRecord.setBindingKind(ds_kind);
165    
166                        // TODO: use a real JSON encoder like jackson.
167                        HashMap<String, Object> jsonMap = new HashMap<String, Object>();
168                        jsonMap.put("@class", "dsub_destination");
169                        jsonMap.put("name", sub.getClientId() + ":" + sub.getSubcriptionName());
170                        HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
171                        jsonTopic.put("name", dest.getTopicName());
172                        jsonMap.put("topics", new Object[]{jsonTopic});
173                        if (sub.getSelector() != null) {
174                            jsonMap.put("selector", sub.getSelector());
175                        }
176                        String json = mapper.writeValueAsString(jsonMap);
177                        System.out.println(json);
178    
179                        destRecord.setBindingData(new UTF8Buffer(json));
180                        manager.store_queue(destRecord);
181    
182                        final long seqKeyCounter[] = new long[]{0};
183                        topic.recoverSubscription(sub.getClientId(), sub.getSubcriptionName(), new MessageRecoveryListener() {
184                            public boolean hasSpace() {
185                                return true;
186                            }
187    
188                            public boolean recoverMessageReference(MessageId ref) throws Exception {
189                                return true;
190                            }
191    
192                            public boolean isDuplicate(MessageId ref) {
193                                return false;
194                            }
195    
196                            public boolean recoverMessage(Message message) throws IOException {
197                                messageKeyCounter[0]++;
198                                seqKeyCounter[0]++;
199    
200                                MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
201                                manager.store_message(messageRecord);
202    
203                                QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
204                                manager.store_queue_entry(entryRecord);
205                                return true;
206                            }
207                        });
208    
209                    }
210                }
211            }
212            manager.finish();
213        }
214    
215        private QueueEntryPB.Bean createQueueEntryPB(Message message, long queueKey, long queueSeq, long messageKey) {
216            QueueEntryPB.Bean entryRecord = new QueueEntryPB.Bean();
217            entryRecord.setQueueKey(queueKey);
218            entryRecord.setQueueSeq(queueSeq);
219            entryRecord.setMessageKey(messageKey);
220            entryRecord.setSize(message.getSize());
221            if (message.getExpiration() != 0) {
222                entryRecord.setExpiration(message.getExpiration());
223            }
224            if (message.getRedeliveryCounter() != 0) {
225                entryRecord.setRedeliveries(message.getRedeliveryCounter());
226            }
227            return entryRecord;
228        }
229    
230        private MessagePB.Bean createMessagePB(Message message, long messageKey) throws IOException {
231            DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
232            mos.writeBoolean(TIGHT_ENCODING);
233            mos.writeVarInt(OPENWIRE_VERSION);
234            wireformat.marshal(message, mos);
235    
236            MessagePB.Bean messageRecord = new MessagePB.Bean();
237            messageRecord.setCodec(codec_id);
238            messageRecord.setMessageKey(messageKey);
239            messageRecord.setSize(message.getSize());
240            messageRecord.setValue(mos.toBuffer());
241            return messageRecord;
242        }
243    
244        public File getFile() {
245            return file;
246        }
247    
248        public void setFile(String file) {
249            setFile(new File(file));
250        }
251    
252        public void setFile(File file) {
253            this.file = file;
254        }
255    
256        public URI getConfig() {
257            return config;
258        }
259    
260        public void setConfig(URI config) {
261            this.config = config;
262        }
263    }