001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.scheduler;
018    
019    import java.io.File;
020    import java.util.concurrent.atomic.AtomicBoolean;
021    
022    import org.apache.activemq.ScheduledMessage;
023    import org.apache.activemq.advisory.AdvisorySupport;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.BrokerFilter;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.ProducerBrokerExchange;
028    import org.apache.activemq.command.ActiveMQDestination;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.command.ProducerId;
032    import org.apache.activemq.command.ProducerInfo;
033    import org.apache.activemq.openwire.OpenWireFormat;
034    import org.apache.activemq.security.SecurityContext;
035    import org.apache.activemq.state.ProducerState;
036    import org.apache.activemq.util.IdGenerator;
037    import org.apache.activemq.util.LongSequenceGenerator;
038    import org.apache.activemq.util.TypeConversionSupport;
039    import org.apache.activemq.wireformat.WireFormat;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    import org.apache.kahadb.util.ByteSequence;
043    
044    public class SchedulerBroker extends BrokerFilter implements JobListener {
045        private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
046        private static final IdGenerator ID_GENERATOR = new IdGenerator();
047        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
048        private final AtomicBoolean started = new AtomicBoolean();
049        private final WireFormat wireFormat = new OpenWireFormat();
050        private final ConnectionContext context = new ConnectionContext();
051        private final ProducerId producerId = new ProducerId();
052        private File directory;
053    
054        private JobSchedulerStore store;
055        private JobScheduler scheduler;
056    
057        public SchedulerBroker(Broker next, File directory) throws Exception {
058            super(next);
059            this.directory = directory;
060            this.producerId.setConnectionId(ID_GENERATOR.generateId());
061            this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
062            context.setBroker(next);
063            LOG.info("Scheduler using directory: " + directory);
064    
065        }
066    
067        public synchronized JobScheduler getJobScheduler() throws Exception {
068            return new JobSchedulerFacade(this);
069        }
070    
071        /**
072         * @return the directory
073         */
074        public File getDirectory() {
075            return this.directory;
076        }
077        /**
078         * @param directory
079         *            the directory to set
080         */
081        public void setDirectory(File directory) {
082            this.directory = directory;
083        }
084    
085        @Override
086        public void start() throws Exception {
087            this.started.set(true);
088            getInternalScheduler();
089            super.start();
090        }
091    
092        @Override
093        public void stop() throws Exception {
094            if (this.started.compareAndSet(true, false)) {
095    
096                if (this.store != null) {
097                    this.store.stop();
098                }
099                if (this.scheduler != null) {
100                    this.scheduler.removeListener(this);
101                    this.scheduler = null;
102                }
103            }
104            super.stop();
105        }
106    
107        @Override
108        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
109            long delay = 0;
110            long period = 0;
111            int repeat = 0;
112            String cronEntry = "";
113            String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
114            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
115            Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
116            Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
117    
118            String physicalName = messageSend.getDestination().getPhysicalName();
119            boolean schedularManage = physicalName.regionMatches(true, 0,
120                    ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
121                    ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
122    
123            if (schedularManage == true) {
124    
125                JobScheduler scheduler = getInternalScheduler();
126                ActiveMQDestination replyTo = messageSend.getReplyTo();
127    
128                String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
129    
130                if (action != null ) {
131    
132                    Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
133                    Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
134    
135                    if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
136    
137                        if( startTime != null && endTime != null ) {
138    
139                            long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
140                            long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
141    
142                            for (Job job : scheduler.getAllJobs(start, finish)) {
143                                sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
144                            }
145                        } else {
146                            for (Job job : scheduler.getAllJobs()) {
147                                sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
148                            }
149                        }
150                    }
151                    if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
152                        scheduler.remove(jobId);
153                    } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
154    
155                        if( startTime != null && endTime != null ) {
156    
157                            long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
158                            long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
159    
160                            scheduler.removeAllJobs(start, finish);
161                        } else {
162                            scheduler.removeAllJobs();
163                        }
164                    }
165                }
166    
167            } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
168                //clear transaction context
169                Message msg = messageSend.copy();
170                msg.setTransactionId(null);
171                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
172                if (cronValue != null) {
173                    cronEntry = cronValue.toString();
174                }
175                if (periodValue != null) {
176                  period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
177                }
178                if (delayValue != null) {
179                    delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
180                }
181                Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
182                if (repeatValue != null) {
183                    repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
184                }
185                getInternalScheduler().schedule(msg.getMessageId().toString(),
186                        new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
187    
188            } else {
189                super.send(producerExchange, messageSend);
190            }
191        }
192    
193        public void scheduledJob(String id, ByteSequence job) {
194            org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
195                    .getOffset(), job.getLength());
196            try {
197                Message messageSend = (Message) this.wireFormat.unmarshal(packet);
198                messageSend.setOriginalTransactionId(null);
199                Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
200                Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
201                String cronStr = cronValue != null ? cronValue.toString() : null;
202                int repeat = 0;
203                if (repeatValue != null) {
204                    repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
205                }
206    
207                if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
208                    // create a unique id - the original message could be sent
209                    // lots of times
210                    messageSend.setMessageId(
211                            new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
212                }
213    
214                // Add the jobId as a property
215                messageSend.setProperty("scheduledJobId", id);
216    
217                // if this goes across a network - we don't want it rescheduled
218                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
219                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
220                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
221                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
222    
223                if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
224    
225                    long oldExpiration = messageSend.getExpiration();
226                    long newTimeStamp = System.currentTimeMillis();
227                    long timeToLive = 0;
228                    long oldTimestamp = messageSend.getTimestamp();
229    
230                    if (oldExpiration > 0) {
231                        timeToLive = oldExpiration - oldTimestamp;
232                    }
233    
234                    long expiration = timeToLive + newTimeStamp;
235    
236                    if(expiration > oldExpiration) {
237                        if (timeToLive > 0 && expiration > 0) {
238                            messageSend.setExpiration(expiration);
239                        }
240                        messageSend.setTimestamp(newTimeStamp);
241                        if (LOG.isDebugEnabled()) {
242                            LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
243                        }
244                    }
245                }
246    
247                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
248                producerExchange.setConnectionContext(context);
249                producerExchange.setMutable(true);
250                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
251                super.send(producerExchange, messageSend);
252            } catch (Exception e) {
253                LOG.error("Failed to send scheduled message " + id, e);
254            }
255        }
256    
257        protected synchronized JobScheduler getInternalScheduler() throws Exception {
258            if (this.started.get()) {
259                if (this.scheduler == null) {
260                    this.scheduler = getStore().getJobScheduler("JMS");
261                    this.scheduler.addListener(this);
262                }
263                return this.scheduler;
264            }
265            return null;
266        }
267    
268        private JobSchedulerStore getStore() throws Exception {
269            if (started.get()) {
270                if (this.store == null) {
271                    this.store = new JobSchedulerStore();
272                    this.store.setDirectory(directory);
273                    this.store.start();
274                }
275                return this.store;
276            }
277            return null;
278        }
279    
280        protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
281                throws Exception {
282    
283            org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
284            try {
285                Message msg = (Message) this.wireFormat.unmarshal(packet);
286                msg.setOriginalTransactionId(null);
287                msg.setPersistent(false);
288                msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
289                msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
290                msg.setDestination(replyTo);
291                msg.setResponseRequired(false);
292                msg.setProducerId(this.producerId);
293    
294                // Add the jobId as a property
295                msg.setProperty("scheduledJobId", job.getJobId());
296    
297                final boolean originalFlowControl = context.isProducerFlowControl();
298                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
299                producerExchange.setConnectionContext(context);
300                producerExchange.setMutable(true);
301                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
302                try {
303                    context.setProducerFlowControl(false);
304                    this.next.send(producerExchange, msg);
305                } finally {
306                    context.setProducerFlowControl(originalFlowControl);
307                }
308            } catch (Exception e) {
309                LOG.error("Failed to send scheduled message " + job.getJobId(), e);
310            }
311    
312        }
313    }