001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.scheduler;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicBoolean;
021
022import org.apache.activemq.ScheduledMessage;
023import org.apache.activemq.advisory.AdvisorySupport;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.BrokerFilter;
026import org.apache.activemq.broker.BrokerService;
027import org.apache.activemq.broker.ConnectionContext;
028import org.apache.activemq.broker.ProducerBrokerExchange;
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.ProducerId;
033import org.apache.activemq.command.ProducerInfo;
034import org.apache.activemq.openwire.OpenWireFormat;
035import org.apache.activemq.security.SecurityContext;
036import org.apache.activemq.state.ProducerState;
037import org.apache.activemq.transaction.Synchronization;
038import org.apache.activemq.usage.JobSchedulerUsage;
039import org.apache.activemq.usage.SystemUsage;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.IdGenerator;
042import org.apache.activemq.util.LongSequenceGenerator;
043import org.apache.activemq.util.TypeConversionSupport;
044import org.apache.activemq.wireformat.WireFormat;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048public class SchedulerBroker extends BrokerFilter implements JobListener {
049    private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
050    private static final IdGenerator ID_GENERATOR = new IdGenerator();
051    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
052    private final AtomicBoolean started = new AtomicBoolean();
053    private final WireFormat wireFormat = new OpenWireFormat();
054    private final ConnectionContext context = new ConnectionContext();
055    private final ProducerId producerId = new ProducerId();
056    private final SystemUsage systemUsage;
057
058    private final JobSchedulerStore store;
059    private JobScheduler scheduler;
060
061    public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
062        super(next);
063
064        this.store = store;
065        this.producerId.setConnectionId(ID_GENERATOR.generateId());
066        this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
067        this.context.setBroker(next);
068        this.systemUsage = brokerService.getSystemUsage();
069
070        wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
071    }
072
073    public synchronized JobScheduler getJobScheduler() throws Exception {
074        return new JobSchedulerFacade(this);
075    }
076
077    @Override
078    public void start() throws Exception {
079        this.started.set(true);
080        getInternalScheduler();
081        super.start();
082    }
083
084    @Override
085    public void stop() throws Exception {
086        if (this.started.compareAndSet(true, false)) {
087
088            if (this.store != null) {
089                this.store.stop();
090            }
091            if (this.scheduler != null) {
092                this.scheduler.removeListener(this);
093                this.scheduler = null;
094            }
095        }
096        super.stop();
097    }
098
099    @Override
100    public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
101        ConnectionContext context = producerExchange.getConnectionContext();
102
103        final String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
104        final Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
105        final Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
106        final Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
107
108        String physicalName = messageSend.getDestination().getPhysicalName();
109        boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
110            ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
111
112        if (schedularManage == true) {
113
114            JobScheduler scheduler = getInternalScheduler();
115            ActiveMQDestination replyTo = messageSend.getReplyTo();
116
117            String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
118
119            if (action != null) {
120
121                Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
122                Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
123
124                if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
125
126                    if (startTime != null && endTime != null) {
127
128                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
129                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
130
131                        for (Job job : scheduler.getAllJobs(start, finish)) {
132                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
133                        }
134                    } else {
135                        for (Job job : scheduler.getAllJobs()) {
136                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
137                        }
138                    }
139                }
140                if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
141                    scheduler.remove(jobId);
142                } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
143
144                    if (startTime != null && endTime != null) {
145
146                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
147                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
148
149                        scheduler.removeAllJobs(start, finish);
150                    } else {
151                        scheduler.removeAllJobs();
152                    }
153                }
154            }
155
156        } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
157
158            // Check for room in the job scheduler store
159            if (systemUsage.getJobSchedulerUsage() != null) {
160                JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
161                if (usage.isFull()) {
162                    final String logMessage = "Job Scheduler Store is Full (" +
163                        usage.getPercentUsage() + "% of " + usage.getLimit() +
164                        "). Stopping producer (" + messageSend.getProducerId() +
165                        ") to prevent flooding of the job scheduler store." +
166                        " See http://activemq.apache.org/producer-flow-control.html for more info";
167
168                    long start = System.currentTimeMillis();
169                    long nextWarn = start;
170                    while (!usage.waitForSpace(1000)) {
171                        if (context.getStopping().get()) {
172                            throw new IOException("Connection closed, send aborted.");
173                        }
174
175                        long now = System.currentTimeMillis();
176                        if (now >= nextWarn) {
177                            LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
178                            nextWarn = now + 30000l;
179                        }
180                    }
181                }
182            }
183
184            if (context.isInTransaction()) {
185                context.getTransaction().addSynchronization(new Synchronization() {
186                    @Override
187                    public void afterCommit() throws Exception {
188                        doSchedule(messageSend, cronValue, periodValue, delayValue);
189                    }
190                });
191            } else {
192                doSchedule(messageSend, cronValue, periodValue, delayValue);
193            }
194        } else {
195            super.send(producerExchange, messageSend);
196        }
197    }
198
199    private void doSchedule(Message messageSend, Object cronValue, Object periodValue, Object delayValue) throws Exception {
200        long delay = 0;
201        long period = 0;
202        int repeat = 0;
203        String cronEntry = "";
204
205        // clear transaction context
206        Message msg = messageSend.copy();
207        msg.setTransactionId(null);
208        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
209        if (cronValue != null) {
210            cronEntry = cronValue.toString();
211        }
212        if (periodValue != null) {
213            period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
214        }
215        if (delayValue != null) {
216            delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
217        }
218        Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
219        if (repeatValue != null) {
220            repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
221        }
222
223        getInternalScheduler().schedule(msg.getMessageId().toString(),
224            new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
225    }
226
227    @Override
228    public void scheduledJob(String id, ByteSequence job) {
229        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
230        try {
231            Message messageSend = (Message) wireFormat.unmarshal(packet);
232            messageSend.setOriginalTransactionId(null);
233            Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
234            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
235            String cronStr = cronValue != null ? cronValue.toString() : null;
236            int repeat = 0;
237            if (repeatValue != null) {
238                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
239            }
240
241            if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
242                // create a unique id - the original message could be sent
243                // lots of times
244                messageSend.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
245            }
246
247            // Add the jobId as a property
248            messageSend.setProperty("scheduledJobId", id);
249
250            // if this goes across a network - we don't want it rescheduled
251            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
252            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
253            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
254            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
255
256            if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
257
258                long oldExpiration = messageSend.getExpiration();
259                long newTimeStamp = System.currentTimeMillis();
260                long timeToLive = 0;
261                long oldTimestamp = messageSend.getTimestamp();
262
263                if (oldExpiration > 0) {
264                    timeToLive = oldExpiration - oldTimestamp;
265                }
266
267                long expiration = timeToLive + newTimeStamp;
268
269                if (expiration > oldExpiration) {
270                    if (timeToLive > 0 && expiration > 0) {
271                        messageSend.setExpiration(expiration);
272                    }
273                    messageSend.setTimestamp(newTimeStamp);
274                    LOG.debug("Set message {} timestamp from {} to {}", new Object[]{ messageSend.getMessageId(), oldTimestamp, newTimeStamp });
275                }
276            }
277
278            // Repackage the message contents prior to send now that all updates are complete.
279            messageSend.beforeMarshall(wireFormat);
280
281            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
282            producerExchange.setConnectionContext(context);
283            producerExchange.setMutable(true);
284            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
285            super.send(producerExchange, messageSend);
286        } catch (Exception e) {
287            LOG.error("Failed to send scheduled message {}", id, e);
288        }
289    }
290
291    protected synchronized JobScheduler getInternalScheduler() throws Exception {
292        if (this.started.get()) {
293            if (this.scheduler == null && store != null) {
294                this.scheduler = store.getJobScheduler("JMS");
295                this.scheduler.addListener(this);
296                this.scheduler.startDispatching();
297            }
298            return this.scheduler;
299        }
300        return null;
301    }
302
303    protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
304
305        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
306        try {
307            Message msg = (Message) this.wireFormat.unmarshal(packet);
308            msg.setOriginalTransactionId(null);
309            msg.setPersistent(false);
310            msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
311            msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
312            msg.setDestination(replyTo);
313            msg.setResponseRequired(false);
314            msg.setProducerId(this.producerId);
315
316            // Add the jobId as a property
317            msg.setProperty("scheduledJobId", job.getJobId());
318
319            final boolean originalFlowControl = context.isProducerFlowControl();
320            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
321            producerExchange.setConnectionContext(context);
322            producerExchange.setMutable(true);
323            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
324            try {
325                context.setProducerFlowControl(false);
326                this.next.send(producerExchange, msg);
327            } finally {
328                context.setProducerFlowControl(originalFlowControl);
329            }
330        } catch (Exception e) {
331            LOG.error("Failed to send scheduled message {}", job.getJobId(), e);
332        }
333    }
334}