001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store;
018
019import java.io.IOException;
020
021import org.apache.activemq.broker.ConnectionContext;
022import org.apache.activemq.command.ActiveMQDestination;
023import org.apache.activemq.command.Message;
024import org.apache.activemq.command.MessageAck;
025import org.apache.activemq.command.MessageId;
026import org.apache.activemq.usage.MemoryUsage;
027
028abstract public class AbstractMessageStore implements MessageStore {
029    public static final ListenableFuture<Object> FUTURE;
030    protected final ActiveMQDestination destination;
031    protected boolean prioritizedMessages;
032    protected IndexListener indexListener;
033    protected final MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
034
035    public AbstractMessageStore(ActiveMQDestination destination) {
036        this.destination = destination;
037    }
038
039    @Override
040    public void dispose(ConnectionContext context) {
041    }
042
043    @Override
044    public void start() throws Exception {
045        recoverMessageStoreStatistics();
046    }
047
048    @Override
049    public void stop() throws Exception {
050    }
051
052    @Override
053    public ActiveMQDestination getDestination() {
054        return destination;
055    }
056
057    @Override
058    public void setMemoryUsage(MemoryUsage memoryUsage) {
059    }
060
061    @Override
062    public void setBatch(MessageId messageId) throws IOException, Exception {
063    }
064
065    /**
066     * flag to indicate if the store is empty
067     *
068     * @return true if the message count is 0
069     * @throws Exception
070     */
071    @Override
072    public boolean isEmpty() throws Exception {
073        return getMessageCount() == 0;
074    }
075
076    @Override
077    public void setPrioritizedMessages(boolean prioritizedMessages) {
078        this.prioritizedMessages = prioritizedMessages;
079    }
080
081    @Override
082    public boolean isPrioritizedMessages() {
083        return this.prioritizedMessages;
084    }
085
086    @Override
087    public void addMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException{
088        addMessage(context, message);
089    }
090
091    @Override
092    public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException {
093        addMessage(context, message);
094        return FUTURE;
095    }
096
097    @Override
098    public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
099        addMessage(context, message, canOptimizeHint);
100        return FUTURE;
101    }
102
103    @Override
104    public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message, final boolean canOptimizeHint) throws IOException {
105        addMessage(context, message, canOptimizeHint);
106        return FUTURE;
107    }
108
109    @Override
110    public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException {
111        addMessage(context, message);
112        return new InlineListenableFuture();
113    }
114
115    @Override
116    public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
117        removeMessage(context, ack);
118    }
119
120    @Override
121    public void updateMessage(Message message) throws IOException {
122        throw new IOException("update is not supported by: " + this);
123    }
124
125    @Override
126    public void registerIndexListener(IndexListener indexListener) {
127        this.indexListener = indexListener;
128    }
129
130    public IndexListener getIndexListener() {
131        return indexListener;
132    }
133
134    static {
135       FUTURE = new InlineListenableFuture();
136    }
137
138    @Override
139    public int getMessageCount() throws IOException {
140        return (int) getMessageStoreStatistics().getMessageCount().getCount();
141    }
142
143    @Override
144    public long getMessageSize() throws IOException {
145        return getMessageStoreStatistics().getMessageSize().getTotalSize();
146    }
147
148    @Override
149    public MessageStoreStatistics getMessageStoreStatistics() {
150        return messageStoreStatistics;
151    }
152
153    protected void recoverMessageStoreStatistics() throws IOException {
154
155    }
156}