001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.command.MessageAck;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.store.AbstractMessageStore;
030    import org.apache.activemq.store.MessageRecoveryListener;
031    import org.apache.activemq.util.ByteSequence;
032    import org.apache.activemq.util.ByteSequenceData;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.wireformat.WireFormat;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * 
040     */
041    public class JDBCMessageStore extends AbstractMessageStore {
042    
043        class Duration {
044            static final int LIMIT = 100;
045            final long start = System.currentTimeMillis();
046            final String name;
047    
048            Duration(String name) {
049                this.name = name;
050            }
051            void end() {
052                end(null);
053            }
054            void end(Object o) {
055                long duration = System.currentTimeMillis() - start;
056    
057                if (duration > LIMIT) {
058                    System.err.println(name + " took a long time: " + duration + "ms " + o);
059                }
060            }
061        }
062        private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063        protected final WireFormat wireFormat;
064        protected final JDBCAdapter adapter;
065        protected final JDBCPersistenceAdapter persistenceAdapter;
066        protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067        protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068    
069        protected ActiveMQMessageAudit audit;
070        
071        public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
072            super(destination);
073            this.persistenceAdapter = persistenceAdapter;
074            this.adapter = adapter;
075            this.wireFormat = wireFormat;
076            this.audit = audit;
077    
078            if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
079                recordDestinationCreation(destination);
080            }
081        }
082    
083        private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
084            TransactionContext c = persistenceAdapter.getTransactionContext();
085            try {
086                c = persistenceAdapter.getTransactionContext();
087                if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
088                    adapter.doRecordDestination(c, destination);
089                }
090            } catch (SQLException e) {
091                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
092                throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
093            } finally {
094                c.close();
095            }
096        }
097    
098        public void addMessage(ConnectionContext context, Message message) throws IOException {
099            MessageId messageId = message.getMessageId();
100            if (audit != null && audit.isDuplicate(message)) {
101                if (LOG.isDebugEnabled()) {
102                    LOG.debug(destination.getPhysicalName()
103                        + " ignoring duplicated (add) message, already stored: "
104                        + messageId);
105                }
106                return;
107            }
108            
109            long sequenceId = persistenceAdapter.getNextSequenceId();
110            
111            // Serialize the Message..
112            byte data[];
113            try {
114                ByteSequence packet = wireFormat.marshal(message);
115                data = ByteSequenceData.toByteArray(packet);
116            } catch (IOException e) {
117                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
118            }
119    
120            // Get a connection and insert the message into the DB.
121            TransactionContext c = persistenceAdapter.getTransactionContext(context);
122            try {      
123                adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(),
124                        this.isPrioritizedMessages() ? message.getPriority() : 0, context != null ? context.getXid() : null);
125            } catch (SQLException e) {
126                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
128            } finally {
129                c.close();
130            }
131            if (context != null && context.getXid() != null) {
132                message.getMessageId().setDataLocator(sequenceId);
133            } else {
134                onAdd(messageId, sequenceId, message.getPriority());
135            }
136        }
137    
138        protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
139        }
140    
141        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
142            // Get a connection and insert the message into the DB.
143            TransactionContext c = persistenceAdapter.getTransactionContext(context);
144            try {
145                adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
146            } catch (SQLException e) {
147                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
148                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
149            } finally {
150                c.close();
151            }
152        }
153    
154        public Message getMessage(MessageId messageId) throws IOException {
155            // Get a connection and pull the message out of the DB
156            TransactionContext c = persistenceAdapter.getTransactionContext();
157            try {
158                byte data[] = adapter.doGetMessage(c, messageId);
159                if (data == null) {
160                    return null;
161                }
162    
163                Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
164                return answer;
165            } catch (IOException e) {
166                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
167            } catch (SQLException e) {
168                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
169                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
170            } finally {
171                c.close();
172            }
173        }
174    
175        public String getMessageReference(MessageId messageId) throws IOException {
176            long id = messageId.getBrokerSequenceId();
177    
178            // Get a connection and pull the message out of the DB
179            TransactionContext c = persistenceAdapter.getTransactionContext();
180            try {
181                return adapter.doGetMessageReference(c, id);
182            } catch (IOException e) {
183                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
184            } catch (SQLException e) {
185                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
186                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
187            } finally {
188                c.close();
189            }
190        }
191    
192        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
193    
194            long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
195    
196            // Get a connection and remove the message from the DB
197            TransactionContext c = persistenceAdapter.getTransactionContext(context);
198            try {
199                adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null);
200            } catch (SQLException e) {
201                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
202                throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
203            } finally {
204                c.close();
205            }
206            if (context != null && context.getXid() != null) {
207                ack.getLastMessageId().setDataLocator(seq);
208            }
209        }
210    
211        public void recover(final MessageRecoveryListener listener) throws Exception {
212    
213            // Get all the Message ids out of the database.
214            TransactionContext c = persistenceAdapter.getTransactionContext();
215            try {
216                c = persistenceAdapter.getTransactionContext();
217                adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
218                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
219                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
220                        msg.getMessageId().setBrokerSequenceId(sequenceId);
221                        return listener.recoverMessage(msg);
222                    }
223    
224                    public boolean recoverMessageReference(String reference) throws Exception {
225                        return listener.recoverMessageReference(new MessageId(reference));
226                    }
227                });
228            } catch (SQLException e) {
229                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
230                throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
231            } finally {
232                c.close();
233            }
234        }
235    
236        /**
237         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
238         */
239        public void removeAllMessages(ConnectionContext context) throws IOException {
240            // Get a connection and remove the message from the DB
241            TransactionContext c = persistenceAdapter.getTransactionContext(context);
242            try {
243                adapter.doRemoveAllMessages(c, destination);
244            } catch (SQLException e) {
245                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
246                throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
247            } finally {
248                c.close();
249            }
250        }
251    
252        public int getMessageCount() throws IOException {
253            int result = 0;
254            TransactionContext c = persistenceAdapter.getTransactionContext();
255            try {
256    
257                result = adapter.doGetMessageCount(c, destination);
258    
259            } catch (SQLException e) {
260                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
261                throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
262            } finally {
263                c.close();
264            }
265            return result;
266        }
267    
268        /**
269         * @param maxReturned
270         * @param listener
271         * @throws Exception
272         * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
273         *      org.apache.activemq.store.MessageRecoveryListener)
274         */
275        public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
276            TransactionContext c = persistenceAdapter.getTransactionContext();
277            try {
278                adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
279                        maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
280    
281                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
282                        if (listener.hasSpace()) {
283                            Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
284                            msg.getMessageId().setBrokerSequenceId(sequenceId);
285                            listener.recoverMessage(msg);
286                            lastRecoveredSequenceId.set(sequenceId);
287                            lastRecoveredPriority.set(msg.getPriority());
288                            return true;
289                        }
290                        return false;
291                    }
292    
293                    public boolean recoverMessageReference(String reference) throws Exception {
294                        if (listener.hasSpace()) {
295                            listener.recoverMessageReference(new MessageId(reference));
296                            return true;
297                        }
298                        return false;
299                    }
300    
301                });
302            } catch (SQLException e) {
303                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
304            } finally {
305                c.close();
306            }
307    
308        }
309    
310        /**
311         * @see org.apache.activemq.store.MessageStore#resetBatching()
312         */
313        public void resetBatching() {
314            if (LOG.isTraceEnabled()) {
315                LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
316            }
317            lastRecoveredSequenceId.set(-1);
318            lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
319    
320        }
321    
322        @Override
323        public void setBatch(MessageId messageId) {
324            try {
325                long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(messageId, destination);
326                lastRecoveredSequenceId.set(storedValues[0]);
327                lastRecoveredPriority.set(storedValues[1]);
328            } catch (IOException ignoredAsAlreadyLogged) {
329                lastRecoveredSequenceId.set(-1);
330                lastRecoveredPriority.set(Byte.MAX_VALUE -1);
331            }
332            if (LOG.isTraceEnabled()) {
333                LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
334                        + ", priority: " + lastRecoveredPriority.get());
335            }
336        }
337    
338    
339        public void setPrioritizedMessages(boolean prioritizedMessages) {
340            super.setPrioritizedMessages(prioritizedMessages);
341        }   
342    }