001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.scheduler;
018    
019    import java.io.IOException;
020    import java.util.concurrent.atomic.AtomicBoolean;
021    
022    import org.apache.activemq.ScheduledMessage;
023    import org.apache.activemq.advisory.AdvisorySupport;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.BrokerFilter;
026    import org.apache.activemq.broker.BrokerService;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.ProducerBrokerExchange;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.ProducerId;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.openwire.OpenWireFormat;
035    import org.apache.activemq.security.SecurityContext;
036    import org.apache.activemq.state.ProducerState;
037    import org.apache.activemq.usage.JobSchedulerUsage;
038    import org.apache.activemq.usage.SystemUsage;
039    import org.apache.activemq.util.ByteSequence;
040    import org.apache.activemq.util.IdGenerator;
041    import org.apache.activemq.util.LongSequenceGenerator;
042    import org.apache.activemq.util.TypeConversionSupport;
043    import org.apache.activemq.wireformat.WireFormat;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    public class SchedulerBroker extends BrokerFilter implements JobListener {
048        private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
049        private static final IdGenerator ID_GENERATOR = new IdGenerator();
050        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
051        private final AtomicBoolean started = new AtomicBoolean();
052        private final WireFormat wireFormat = new OpenWireFormat();
053        private final ConnectionContext context = new ConnectionContext();
054        private final ProducerId producerId = new ProducerId();
055        private final SystemUsage systemUsage;
056    
057        private final JobSchedulerStore store;
058        private JobScheduler scheduler;
059    
060        public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
061            super(next);
062    
063            this.store = store;
064            this.producerId.setConnectionId(ID_GENERATOR.generateId());
065            this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
066            this.context.setBroker(next);
067            this.systemUsage = brokerService.getSystemUsage();
068        }
069    
070        public synchronized JobScheduler getJobScheduler() throws Exception {
071            return new JobSchedulerFacade(this);
072        }
073    
074        @Override
075        public void start() throws Exception {
076            this.started.set(true);
077            getInternalScheduler();
078            super.start();
079        }
080    
081        @Override
082        public void stop() throws Exception {
083            if (this.started.compareAndSet(true, false)) {
084    
085                if (this.store != null) {
086                    this.store.stop();
087                }
088                if (this.scheduler != null) {
089                    this.scheduler.removeListener(this);
090                    this.scheduler = null;
091                }
092            }
093            super.stop();
094        }
095    
096        @Override
097        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
098            long delay = 0;
099            long period = 0;
100            int repeat = 0;
101            String cronEntry = "";
102            String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
103            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
104            Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
105            Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
106    
107            String physicalName = messageSend.getDestination().getPhysicalName();
108            boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
109                ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
110    
111            if (schedularManage == true) {
112    
113                JobScheduler scheduler = getInternalScheduler();
114                ActiveMQDestination replyTo = messageSend.getReplyTo();
115    
116                String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
117    
118                if (action != null) {
119    
120                    Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
121                    Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
122    
123                    if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
124    
125                        if (startTime != null && endTime != null) {
126    
127                            long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
128                            long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
129    
130                            for (Job job : scheduler.getAllJobs(start, finish)) {
131                                sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
132                            }
133                        } else {
134                            for (Job job : scheduler.getAllJobs()) {
135                                sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
136                            }
137                        }
138                    }
139                    if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
140                        scheduler.remove(jobId);
141                    } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
142    
143                        if (startTime != null && endTime != null) {
144    
145                            long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
146                            long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
147    
148                            scheduler.removeAllJobs(start, finish);
149                        } else {
150                            scheduler.removeAllJobs();
151                        }
152                    }
153                }
154    
155            } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
156                // clear transaction context
157                Message msg = messageSend.copy();
158                msg.setTransactionId(null);
159                org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
160                if (cronValue != null) {
161                    cronEntry = cronValue.toString();
162                }
163                if (periodValue != null) {
164                    period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
165                }
166                if (delayValue != null) {
167                    delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
168                }
169                Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
170                if (repeatValue != null) {
171                    repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
172                }
173                getInternalScheduler().schedule(msg.getMessageId().toString(), new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay,
174                    period, repeat);
175    
176            } else {
177                super.send(producerExchange, messageSend);
178            }
179        }
180    
181        @Override
182        public void scheduledJob(String id, ByteSequence job) {
183            org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
184            try {
185                Message messageSend = (Message) this.wireFormat.unmarshal(packet);
186                messageSend.setOriginalTransactionId(null);
187                Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
188                Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
189                String cronStr = cronValue != null ? cronValue.toString() : null;
190                int repeat = 0;
191                if (repeatValue != null) {
192                    repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
193                }
194    
195                // Check for room in the job scheduler store
196                if (systemUsage.getJobSchedulerUsage() != null) {
197                    JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
198                    if (usage.isFull()) {
199                        final String logMessage = "Job Scheduler Store is Full (" +
200                            usage.getPercentUsage() + "% of " + usage.getLimit() +
201                            "). Stopping producer (" + messageSend.getProducerId() +
202                            ") to prevent flooding of the job scheduler store." +
203                            " See http://activemq.apache.org/producer-flow-control.html for more info";
204    
205                        long start = System.currentTimeMillis();
206                        long nextWarn = start;
207                        while (!usage.waitForSpace(1000)) {
208                            if (context.getStopping().get()) {
209                                throw new IOException("Connection closed, send aborted.");
210                            }
211    
212                            long now = System.currentTimeMillis();
213                            if (now >= nextWarn) {
214                                LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start) / 1000 + "s)");
215                                nextWarn = now + 30000l;
216                            }
217                        }
218                    }
219                }
220    
221                if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
222                    // create a unique id - the original message could be sent
223                    // lots of times
224                    messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
225                }
226    
227                // Add the jobId as a property
228                messageSend.setProperty("scheduledJobId", id);
229    
230                // if this goes across a network - we don't want it rescheduled
231                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
232                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
233                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
234                messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
235    
236                if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
237    
238                    long oldExpiration = messageSend.getExpiration();
239                    long newTimeStamp = System.currentTimeMillis();
240                    long timeToLive = 0;
241                    long oldTimestamp = messageSend.getTimestamp();
242    
243                    if (oldExpiration > 0) {
244                        timeToLive = oldExpiration - oldTimestamp;
245                    }
246    
247                    long expiration = timeToLive + newTimeStamp;
248    
249                    if (expiration > oldExpiration) {
250                        if (timeToLive > 0 && expiration > 0) {
251                            messageSend.setExpiration(expiration);
252                        }
253                        messageSend.setTimestamp(newTimeStamp);
254                        if (LOG.isDebugEnabled()) {
255                            LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
256                        }
257                    }
258                }
259    
260                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
261                producerExchange.setConnectionContext(context);
262                producerExchange.setMutable(true);
263                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
264                super.send(producerExchange, messageSend);
265            } catch (Exception e) {
266                LOG.error("Failed to send scheduled message " + id, e);
267            }
268        }
269    
270        protected synchronized JobScheduler getInternalScheduler() throws Exception {
271            if (this.started.get()) {
272                if (this.scheduler == null) {
273                    this.scheduler = store.getJobScheduler("JMS");
274                    this.scheduler.addListener(this);
275                }
276                return this.scheduler;
277            }
278            return null;
279        }
280    
281        protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo) throws Exception {
282    
283            org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
284            try {
285                Message msg = (Message) this.wireFormat.unmarshal(packet);
286                msg.setOriginalTransactionId(null);
287                msg.setPersistent(false);
288                msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
289                msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
290                msg.setDestination(replyTo);
291                msg.setResponseRequired(false);
292                msg.setProducerId(this.producerId);
293    
294                // Add the jobId as a property
295                msg.setProperty("scheduledJobId", job.getJobId());
296    
297                final boolean originalFlowControl = context.isProducerFlowControl();
298                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
299                producerExchange.setConnectionContext(context);
300                producerExchange.setMutable(true);
301                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
302                try {
303                    context.setProducerFlowControl(false);
304                    this.next.send(producerExchange, msg);
305                } finally {
306                    context.setProducerFlowControl(originalFlowControl);
307                }
308            } catch (Exception e) {
309                LOG.error("Failed to send scheduled message " + job.getJobId(), e);
310            }
311        }
312    }