001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import java.util.concurrent.atomic.AtomicLong;
020    import javax.jms.JMSException;
021    import javax.jms.Message;
022    import javax.jms.MessageListener;
023    import org.apache.activemq.ActiveMQMessageTransformation;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.Destination;
027    import org.apache.activemq.broker.region.MessageReference;
028    import org.apache.activemq.broker.region.SubscriptionRecovery;
029    import org.apache.activemq.broker.region.Topic;
030    import org.apache.activemq.command.ActiveMQDestination;
031    import org.apache.activemq.command.ActiveMQMessage;
032    import org.apache.activemq.command.ConnectionId;
033    import org.apache.activemq.command.MessageId;
034    import org.apache.activemq.command.ProducerId;
035    import org.apache.activemq.command.SessionId;
036    import org.apache.activemq.util.IdGenerator;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * This implementation of {@link SubscriptionRecoveryPolicy} will perform a user
042     * specific query mechanism to load any messages they may have missed.
043     * 
044     * @org.apache.xbean.XBean
045     * 
046     */
047    public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
048    
049        private static final Logger LOG = LoggerFactory.getLogger(QueryBasedSubscriptionRecoveryPolicy.class);
050    
051        private MessageQuery query;
052        private final AtomicLong messageSequence = new AtomicLong(0);
053        private final IdGenerator idGenerator = new IdGenerator();
054        private final ProducerId producerId = createProducerId();
055    
056        public SubscriptionRecoveryPolicy copy() {
057            QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy();
058            rc.setQuery(query);
059            return rc;
060        }
061    
062        public boolean add(ConnectionContext context, MessageReference message) throws Exception {
063            return query.validateUpdate(message.getMessage());
064        }
065    
066        public void recover(final ConnectionContext context, final Topic topic, final SubscriptionRecovery sub) throws Exception {
067            if (query != null) {
068                ActiveMQDestination destination = sub.getActiveMQDestination();
069                query.execute(destination, new MessageListener() {
070    
071                    public void onMessage(Message message) {
072                        dispatchInitialMessage(message, topic, context, sub);
073                    }
074                });
075            }
076        }
077    
078        public void start() throws Exception {
079            if (query == null) {
080                throw new IllegalArgumentException("No query property configured");
081            }
082        }
083    
084        public void stop() throws Exception {
085        }
086    
087        public MessageQuery getQuery() {
088            return query;
089        }
090    
091        /**
092         * Sets the query strategy to load initial messages
093         */
094        public void setQuery(MessageQuery query) {
095            this.query = query;
096        }
097    
098        public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Exception {
099            return new org.apache.activemq.command.Message[0];
100        }
101        
102        public void setBroker(Broker broker) {        
103        }
104    
105        protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) {
106            try {
107                ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null);
108                ActiveMQDestination destination = activeMessage.getDestination();
109                if (destination == null) {
110                    destination = sub.getActiveMQDestination();
111                    activeMessage.setDestination(destination);
112                }
113                activeMessage.setRegionDestination(regionDestination);
114                configure(activeMessage);
115                sub.addRecoveredMessage(context, activeMessage);
116            } catch (Throwable e) {
117                LOG.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e);
118            }
119        }
120    
121        protected void configure(ActiveMQMessage msg) throws JMSException {
122            long sequenceNumber = messageSequence.incrementAndGet();
123            msg.setMessageId(new MessageId(producerId, sequenceNumber));
124            msg.onSend();
125            msg.setProducerId(producerId);
126        }
127    
128        protected ProducerId createProducerId() {
129            String id = idGenerator.generateId();
130            ConnectionId connectionId = new ConnectionId(id);
131            SessionId sessionId = new SessionId(connectionId, 1);
132            return new ProducerId(sessionId, 1);
133        }
134    }