001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.concurrent.atomic.AtomicLong;
022
023 import org.apache.activemq.ActiveMQMessageAudit;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.command.ActiveMQDestination;
026 import org.apache.activemq.command.Message;
027 import org.apache.activemq.command.MessageAck;
028 import org.apache.activemq.command.MessageId;
029 import org.apache.activemq.store.AbstractMessageStore;
030 import org.apache.activemq.store.MessageRecoveryListener;
031 import org.apache.activemq.util.ByteSequence;
032 import org.apache.activemq.util.ByteSequenceData;
033 import org.apache.activemq.util.IOExceptionSupport;
034 import org.apache.activemq.wireformat.WireFormat;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 /**
039 *
040 */
041 public class JDBCMessageStore extends AbstractMessageStore {
042
043 class Duration {
044 static final int LIMIT = 100;
045 final long start = System.currentTimeMillis();
046 final String name;
047
048 Duration(String name) {
049 this.name = name;
050 }
051 void end() {
052 end(null);
053 }
054 void end(Object o) {
055 long duration = System.currentTimeMillis() - start;
056
057 if (duration > LIMIT) {
058 System.err.println(name + " took a long time: " + duration + "ms " + o);
059 }
060 }
061 }
062 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063 protected final WireFormat wireFormat;
064 protected final JDBCAdapter adapter;
065 protected final JDBCPersistenceAdapter persistenceAdapter;
066 protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067 protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068
069 protected ActiveMQMessageAudit audit;
070
071 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
072 super(destination);
073 this.persistenceAdapter = persistenceAdapter;
074 this.adapter = adapter;
075 this.wireFormat = wireFormat;
076 this.audit = audit;
077
078 if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
079 recordDestinationCreation(destination);
080 }
081 }
082
083 private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
084 TransactionContext c = persistenceAdapter.getTransactionContext();
085 try {
086 c = persistenceAdapter.getTransactionContext();
087 if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
088 adapter.doRecordDestination(c, destination);
089 }
090 } catch (SQLException e) {
091 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
092 throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
093 } finally {
094 c.close();
095 }
096 }
097
098 public void addMessage(ConnectionContext context, Message message) throws IOException {
099 MessageId messageId = message.getMessageId();
100 if (audit != null && audit.isDuplicate(message)) {
101 if (LOG.isDebugEnabled()) {
102 LOG.debug(destination.getPhysicalName()
103 + " ignoring duplicated (add) message, already stored: "
104 + messageId);
105 }
106 return;
107 }
108
109 long sequenceId = persistenceAdapter.getNextSequenceId();
110
111 // Serialize the Message..
112 byte data[];
113 try {
114 ByteSequence packet = wireFormat.marshal(message);
115 data = ByteSequenceData.toByteArray(packet);
116 } catch (IOException e) {
117 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
118 }
119
120 // Get a connection and insert the message into the DB.
121 TransactionContext c = persistenceAdapter.getTransactionContext(context);
122 try {
123 adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(),
124 this.isPrioritizedMessages() ? message.getPriority() : 0, context != null ? context.getXid() : null);
125 } catch (SQLException e) {
126 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
127 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
128 } finally {
129 c.close();
130 }
131 if (context != null && context.getXid() != null) {
132 message.getMessageId().setDataLocator(sequenceId);
133 } else {
134 onAdd(messageId, sequenceId, message.getPriority());
135 }
136 }
137
138 protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
139 }
140
141 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
142 // Get a connection and insert the message into the DB.
143 TransactionContext c = persistenceAdapter.getTransactionContext(context);
144 try {
145 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
146 } catch (SQLException e) {
147 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
148 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
149 } finally {
150 c.close();
151 }
152 }
153
154 public Message getMessage(MessageId messageId) throws IOException {
155 // Get a connection and pull the message out of the DB
156 TransactionContext c = persistenceAdapter.getTransactionContext();
157 try {
158 byte data[] = adapter.doGetMessage(c, messageId);
159 if (data == null) {
160 return null;
161 }
162
163 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
164 return answer;
165 } catch (IOException e) {
166 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
167 } catch (SQLException e) {
168 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
169 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
170 } finally {
171 c.close();
172 }
173 }
174
175 public String getMessageReference(MessageId messageId) throws IOException {
176 long id = messageId.getBrokerSequenceId();
177
178 // Get a connection and pull the message out of the DB
179 TransactionContext c = persistenceAdapter.getTransactionContext();
180 try {
181 return adapter.doGetMessageReference(c, id);
182 } catch (IOException e) {
183 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
184 } catch (SQLException e) {
185 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
186 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
187 } finally {
188 c.close();
189 }
190 }
191
192 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
193
194 long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
195
196 // Get a connection and remove the message from the DB
197 TransactionContext c = persistenceAdapter.getTransactionContext(context);
198 try {
199 adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null);
200 } catch (SQLException e) {
201 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
202 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
203 } finally {
204 c.close();
205 }
206 if (context != null && context.getXid() != null) {
207 ack.getLastMessageId().setDataLocator(seq);
208 }
209 }
210
211 public void recover(final MessageRecoveryListener listener) throws Exception {
212
213 // Get all the Message ids out of the database.
214 TransactionContext c = persistenceAdapter.getTransactionContext();
215 try {
216 c = persistenceAdapter.getTransactionContext();
217 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
218 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
219 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
220 msg.getMessageId().setBrokerSequenceId(sequenceId);
221 return listener.recoverMessage(msg);
222 }
223
224 public boolean recoverMessageReference(String reference) throws Exception {
225 return listener.recoverMessageReference(new MessageId(reference));
226 }
227 });
228 } catch (SQLException e) {
229 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
230 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
231 } finally {
232 c.close();
233 }
234 }
235
236 /**
237 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
238 */
239 public void removeAllMessages(ConnectionContext context) throws IOException {
240 // Get a connection and remove the message from the DB
241 TransactionContext c = persistenceAdapter.getTransactionContext(context);
242 try {
243 adapter.doRemoveAllMessages(c, destination);
244 } catch (SQLException e) {
245 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
246 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
247 } finally {
248 c.close();
249 }
250 }
251
252 public int getMessageCount() throws IOException {
253 int result = 0;
254 TransactionContext c = persistenceAdapter.getTransactionContext();
255 try {
256
257 result = adapter.doGetMessageCount(c, destination);
258
259 } catch (SQLException e) {
260 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
261 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
262 } finally {
263 c.close();
264 }
265 return result;
266 }
267
268 /**
269 * @param maxReturned
270 * @param listener
271 * @throws Exception
272 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
273 * org.apache.activemq.store.MessageRecoveryListener)
274 */
275 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
276 TransactionContext c = persistenceAdapter.getTransactionContext();
277 try {
278 adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
279 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
280
281 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
282 if (listener.hasSpace()) {
283 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
284 msg.getMessageId().setBrokerSequenceId(sequenceId);
285 listener.recoverMessage(msg);
286 lastRecoveredSequenceId.set(sequenceId);
287 lastRecoveredPriority.set(msg.getPriority());
288 return true;
289 }
290 return false;
291 }
292
293 public boolean recoverMessageReference(String reference) throws Exception {
294 if (listener.hasSpace()) {
295 listener.recoverMessageReference(new MessageId(reference));
296 return true;
297 }
298 return false;
299 }
300
301 });
302 } catch (SQLException e) {
303 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
304 } finally {
305 c.close();
306 }
307
308 }
309
310 /**
311 * @see org.apache.activemq.store.MessageStore#resetBatching()
312 */
313 public void resetBatching() {
314 if (LOG.isTraceEnabled()) {
315 LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
316 }
317 lastRecoveredSequenceId.set(-1);
318 lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
319
320 }
321
322 @Override
323 public void setBatch(MessageId messageId) {
324 try {
325 long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(messageId, destination);
326 lastRecoveredSequenceId.set(storedValues[0]);
327 lastRecoveredPriority.set(storedValues[1]);
328 } catch (IOException ignoredAsAlreadyLogged) {
329 lastRecoveredSequenceId.set(-1);
330 lastRecoveredPriority.set(Byte.MAX_VALUE -1);
331 }
332 if (LOG.isTraceEnabled()) {
333 LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
334 + ", priority: " + lastRecoveredPriority.get());
335 }
336 }
337
338
339 public void setPrioritizedMessages(boolean prioritizedMessages) {
340 super.setPrioritizedMessages(prioritizedMessages);
341 }
342 }