001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.quartz;
018
019 import org.apache.camel.Processor;
020 import org.apache.camel.Producer;
021 import org.apache.camel.impl.DefaultEndpoint;
022 import org.apache.camel.processor.loadbalancer.LoadBalancer;
023 import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
024 import org.apache.camel.util.ObjectHelper;
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.quartz.JobDetail;
028 import org.quartz.JobExecutionContext;
029 import org.quartz.JobExecutionException;
030 import org.quartz.Scheduler;
031 import org.quartz.SchedulerException;
032 import org.quartz.Trigger;
033 import org.quartz.SimpleTrigger;
034
035 import java.util.Date;
036 import java.util.Map;
037 import java.util.Set;
038
039 /**
040 * A <a href="http://activemq.apache.org/quartz.html">Quartz Endpoint</a>
041 *
042 * @version $Revision:520964 $
043 */
044 public class QuartzEndpoint extends DefaultEndpoint<QuartzExchange> {
045 public static final String ENDPOINT_KEY = "org.apache.camel.quartz";
046 private static final transient Log log = LogFactory.getLog(QuartzEndpoint.class);
047 private Scheduler scheduler;
048 private LoadBalancer loadBalancer;
049 private Trigger trigger;
050 private JobDetail jobDetail;
051 private boolean started;
052
053 public QuartzEndpoint(String endpointUri, QuartzComponent component, Scheduler scheduler) {
054 super(endpointUri, component);
055 this.scheduler = scheduler;
056 }
057
058 public void addTriggers(Map<Trigger, JobDetail> triggerMap) throws SchedulerException {
059 if (triggerMap != null) {
060 Set<Map.Entry<Trigger, JobDetail>> entries = triggerMap.entrySet();
061 for (Map.Entry<Trigger, JobDetail> entry : entries) {
062 Trigger key = entry.getKey();
063 JobDetail value = entry.getValue();
064 ObjectHelper.notNull(key, "key");
065 ObjectHelper.notNull(value, "value");
066
067 addTrigger(key, value);
068 }
069 }
070 }
071
072 public void addTrigger(Trigger trigger, JobDetail detail) throws SchedulerException {
073 // lets default the trigger name to the job name
074 if (trigger.getName() == null) {
075 trigger.setName(detail.getName());
076 }
077 // lets default the trigger group to the job group
078 if (trigger.getGroup() == null) {
079 trigger.setGroup(detail.getGroup());
080 }
081 // default start time to now if not specified
082 if (trigger.getStartTime() == null) {
083 trigger.setStartTime(new Date());
084 }
085 detail.getJobDataMap().put(ENDPOINT_KEY, this);
086 Class jobClass = detail.getJobClass();
087 if (jobClass == null) {
088 detail.setJobClass(CamelJob.class);
089 }
090 if (detail.getName() == null) {
091 detail.setName(getEndpointUri());
092 }
093 getScheduler().scheduleJob(detail, trigger);
094 }
095
096 public void removeTrigger(Trigger trigger, JobDetail jobDetail) throws SchedulerException {
097 getScheduler().unscheduleJob(trigger.getName(), trigger.getGroup());
098 }
099
100 /**
101 * This method is invoked when a Quartz job is fired.
102 *
103 * @param jobExecutionContext the Quartz Job context
104 */
105 public void onJobExecute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
106 if (log.isDebugEnabled()) {
107 log.debug("Firing Quartz Job with context: " + jobExecutionContext);
108 }
109 QuartzExchange exchange = createExchange(jobExecutionContext);
110 try {
111 getLoadBalancer().process(exchange);
112 }
113 catch (JobExecutionException e) {
114 throw e;
115 }
116 catch (Exception e) {
117 throw new JobExecutionException(e);
118 }
119 }
120
121 public QuartzExchange createExchange() {
122 return new QuartzExchange(getContext(), null);
123 }
124
125 public QuartzExchange createExchange(JobExecutionContext jobExecutionContext) {
126 return new QuartzExchange(getContext(), jobExecutionContext);
127 }
128
129 public Producer<QuartzExchange> createProducer() throws Exception {
130 throw new UnsupportedOperationException("You cannot send messages to this endpoint");
131 }
132
133 public QuartzConsumer createConsumer(Processor processor) throws Exception {
134 return new QuartzConsumer(this, processor);
135 }
136
137 // Properties
138 //-------------------------------------------------------------------------
139
140 @Override
141 public QuartzComponent getComponent() {
142 return (QuartzComponent) super.getComponent();
143 }
144
145 public boolean isSingleton() {
146 return true;
147 }
148
149 public Scheduler getScheduler() {
150 return scheduler;
151 }
152
153 public LoadBalancer getLoadBalancer() {
154 if (loadBalancer == null) {
155 loadBalancer = createLoadBalancer();
156 }
157 return loadBalancer;
158 }
159
160 public void setLoadBalancer(LoadBalancer loadBalancer) {
161 this.loadBalancer = loadBalancer;
162 }
163
164 public JobDetail getJobDetail() {
165 if (jobDetail == null) {
166 jobDetail = createJobDetail();
167 }
168 return jobDetail;
169 }
170
171 public void setJobDetail(JobDetail jobDetail) {
172 this.jobDetail = jobDetail;
173 }
174
175 public Trigger getTrigger() {
176 if (trigger == null) {
177 trigger = createTrigger();
178 }
179 return trigger;
180 }
181
182 public void setTrigger(Trigger trigger) {
183 this.trigger = trigger;
184 }
185
186 // Implementation methods
187 //-------------------------------------------------------------------------
188 public synchronized void consumerStarted(QuartzConsumer consumer) throws SchedulerException {
189 getLoadBalancer().addProcessor(consumer.getProcessor());
190
191 // if we have not yet added our default trigger, then lets do it
192 if (!started) {
193 addTrigger(getTrigger(), getJobDetail());
194 started = true;
195 }
196 }
197
198 public synchronized void consumerStopped(QuartzConsumer consumer) throws SchedulerException {
199 getLoadBalancer().removeProcessor(consumer.getProcessor());
200 if (getLoadBalancer().getProcessors().isEmpty() && started) {
201 removeTrigger(getTrigger(), getJobDetail());
202 started = false;
203 }
204 }
205
206 protected LoadBalancer createLoadBalancer() {
207 return new RoundRobinLoadBalancer();
208 }
209
210 protected JobDetail createJobDetail() {
211 return new JobDetail();
212 }
213
214 protected Trigger createTrigger() {
215 return new SimpleTrigger();
216 }
217 }