1 | /* |
2 | * Copyright 2006-2009 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.job; |
18 | |
19 | import java.util.Collection; |
20 | import java.util.Date; |
21 | |
22 | import org.apache.commons.logging.Log; |
23 | import org.apache.commons.logging.LogFactory; |
24 | import org.springframework.batch.core.BatchStatus; |
25 | import org.springframework.batch.core.ExitStatus; |
26 | import org.springframework.batch.core.Job; |
27 | import org.springframework.batch.core.JobExecution; |
28 | import org.springframework.batch.core.JobExecutionException; |
29 | import org.springframework.batch.core.JobExecutionListener; |
30 | import org.springframework.batch.core.JobInstance; |
31 | import org.springframework.batch.core.JobInterruptedException; |
32 | import org.springframework.batch.core.JobParametersIncrementer; |
33 | import org.springframework.batch.core.StartLimitExceededException; |
34 | import org.springframework.batch.core.Step; |
35 | import org.springframework.batch.core.StepExecution; |
36 | import org.springframework.batch.core.listener.CompositeJobExecutionListener; |
37 | import org.springframework.batch.core.repository.JobRepository; |
38 | import org.springframework.batch.core.repository.JobRestartException; |
39 | import org.springframework.batch.core.step.StepLocator; |
40 | import org.springframework.batch.item.ExecutionContext; |
41 | import org.springframework.batch.repeat.RepeatException; |
42 | import org.springframework.beans.factory.BeanNameAware; |
43 | import org.springframework.beans.factory.InitializingBean; |
44 | import org.springframework.util.Assert; |
45 | import org.springframework.util.ClassUtils; |
46 | |
47 | /** |
48 | * Abstract implementation of the {@link Job} interface. Common dependencies such as a |
49 | * {@link JobRepository}, {@link JobExecutionListener}s, and various configuration |
50 | * parameters are set here. Therefore, common error handling and listener calling |
51 | * activities are abstracted away from implementations. |
52 | * |
53 | * @author Lucas Ward |
54 | * @author Dave Syer |
55 | */ |
56 | public abstract class AbstractJob implements Job, StepLocator, BeanNameAware, InitializingBean { |
57 | |
58 | protected static final Log logger = LogFactory.getLog(AbstractJob.class); |
59 | |
60 | private String name; |
61 | |
62 | private boolean restartable = true; |
63 | |
64 | private JobRepository jobRepository; |
65 | |
66 | private CompositeJobExecutionListener listener = new CompositeJobExecutionListener(); |
67 | |
68 | private JobParametersIncrementer jobParametersIncrementer; |
69 | |
70 | /** |
71 | * Default constructor. |
72 | */ |
73 | public AbstractJob() { |
74 | super(); |
75 | } |
76 | |
77 | /** |
78 | * Convenience constructor to immediately add name (which is mandatory but |
79 | * not final). |
80 | * |
81 | * @param name |
82 | */ |
83 | public AbstractJob(String name) { |
84 | super(); |
85 | this.name = name; |
86 | } |
87 | |
88 | /** |
89 | * Assert mandatory properties: {@link JobRepository}. |
90 | * |
91 | * @see InitializingBean#afterPropertiesSet() |
92 | */ |
93 | public void afterPropertiesSet() throws Exception { |
94 | Assert.notNull(jobRepository, "JobRepository must be set"); |
95 | } |
96 | |
97 | /** |
98 | * Set the name property if it is not already set. Because of the order of |
99 | * the callbacks in a Spring container the name property will be set first |
100 | * if it is present. Care is needed with bean definition inheritance - if a |
101 | * parent bean has a name, then its children need an explicit name as well, |
102 | * otherwise they will not be unique. |
103 | * |
104 | * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) |
105 | */ |
106 | public void setBeanName(String name) { |
107 | if (this.name == null) { |
108 | this.name = name; |
109 | } |
110 | } |
111 | |
112 | /** |
113 | * Set the name property. Always overrides the default value if this object |
114 | * is a Spring bean. |
115 | * |
116 | * @see #setBeanName(java.lang.String) |
117 | */ |
118 | public void setName(String name) { |
119 | this.name = name; |
120 | } |
121 | |
122 | /* |
123 | * (non-Javadoc) |
124 | * |
125 | * @see org.springframework.batch.core.domain.IJob#getName() |
126 | */ |
127 | public String getName() { |
128 | return name; |
129 | } |
130 | |
131 | /** |
132 | * Retrieve the step with the given name. If there is no Step with the |
133 | * given name, then return null. |
134 | * |
135 | * @param stepName |
136 | * @return the Step |
137 | */ |
138 | public abstract Step getStep(String stepName); |
139 | |
140 | /** |
141 | * Retrieve the step names. |
142 | * |
143 | * @return the step names |
144 | */ |
145 | public abstract Collection<String> getStepNames(); |
146 | |
147 | /** |
148 | * Boolean flag to prevent categorically a job from restarting, even if it |
149 | * has failed previously. |
150 | * |
151 | * @param restartable the value of the flag to set (default true) |
152 | */ |
153 | public void setRestartable(boolean restartable) { |
154 | this.restartable = restartable; |
155 | } |
156 | |
157 | /** |
158 | * @see Job#isRestartable() |
159 | */ |
160 | public boolean isRestartable() { |
161 | return restartable; |
162 | } |
163 | |
164 | /** |
165 | * Public setter for the {@link JobParametersIncrementer}. |
166 | * @param jobParametersIncrementer the {@link JobParametersIncrementer} to |
167 | * set |
168 | */ |
169 | public void setJobParametersIncrementer(JobParametersIncrementer jobParametersIncrementer) { |
170 | this.jobParametersIncrementer = jobParametersIncrementer; |
171 | } |
172 | |
173 | /* |
174 | * (non-Javadoc) |
175 | * |
176 | * @see org.springframework.batch.core.Job#getJobParametersIncrementer() |
177 | */ |
178 | public JobParametersIncrementer getJobParametersIncrementer() { |
179 | return this.jobParametersIncrementer; |
180 | } |
181 | |
182 | /** |
183 | * Public setter for injecting {@link JobExecutionListener}s. They will all |
184 | * be given the listener callbacks at the appropriate point in the job. |
185 | * |
186 | * @param listeners the listeners to set. |
187 | */ |
188 | public void setJobExecutionListeners(JobExecutionListener[] listeners) { |
189 | for (int i = 0; i < listeners.length; i++) { |
190 | this.listener.register(listeners[i]); |
191 | } |
192 | } |
193 | |
194 | /** |
195 | * Register a single listener for the {@link JobExecutionListener} |
196 | * callbacks. |
197 | * |
198 | * @param listener a {@link JobExecutionListener} |
199 | */ |
200 | public void registerJobExecutionListener(JobExecutionListener listener) { |
201 | this.listener.register(listener); |
202 | } |
203 | |
204 | /** |
205 | * Public setter for the {@link JobRepository} that is needed to manage the |
206 | * state of the batch meta domain (jobs, steps, executions) during the life |
207 | * of a job. |
208 | * |
209 | * @param jobRepository |
210 | */ |
211 | public void setJobRepository(JobRepository jobRepository) { |
212 | this.jobRepository = jobRepository; |
213 | } |
214 | |
215 | /** |
216 | * Extension point for subclasses allowing them to concentrate on processing |
217 | * logic and ignore listeners and repository calls. Implementations usually |
218 | * are concerned with the ordering of steps, and delegate actual step |
219 | * processing to {@link #handleStep(Step, JobExecution)}. |
220 | * |
221 | * @param execution the current {@link JobExecution} |
222 | * |
223 | * @throws JobExecutionException to signal a fatal batch framework error |
224 | * (not a business or validation exception) |
225 | */ |
226 | abstract protected void doExecute(JobExecution execution) throws JobExecutionException; |
227 | |
228 | /** |
229 | * Run the specified job, handling all listener and repository calls, and |
230 | * delegating the actual processing to {@link #doExecute(JobExecution)}. |
231 | * |
232 | * @see Job#execute(JobExecution) |
233 | * @throws StartLimitExceededException if start limit of one of the steps |
234 | * was exceeded |
235 | */ |
236 | public final void execute(JobExecution execution) { |
237 | |
238 | logger.debug("Job execution starting: "+execution); |
239 | |
240 | try { |
241 | |
242 | if (execution.getStatus() != BatchStatus.STOPPING) { |
243 | |
244 | execution.setStartTime(new Date()); |
245 | updateStatus(execution, BatchStatus.STARTED); |
246 | |
247 | listener.beforeJob(execution); |
248 | |
249 | try { |
250 | doExecute(execution); |
251 | logger.debug("Job execution complete: "+execution); |
252 | } catch (RepeatException e) { |
253 | throw e.getCause(); |
254 | } |
255 | } |
256 | else { |
257 | |
258 | // The job was already stopped before we even got this far. Deal |
259 | // with it in the same way as any other interruption. |
260 | execution.setStatus(BatchStatus.STOPPED); |
261 | execution.setExitStatus(ExitStatus.COMPLETED); |
262 | logger.debug("Job execution was stopped: "+execution); |
263 | |
264 | } |
265 | |
266 | } |
267 | catch (JobInterruptedException e) { |
268 | logger.error("Encountered interruption executing job", e); |
269 | execution.setExitStatus(ExitStatus.STOPPED); |
270 | execution.setStatus(BatchStatus.STOPPED); |
271 | execution.addFailureException(e); |
272 | } |
273 | catch (Throwable t) { |
274 | logger.error("Encountered fatal error executing job", t); |
275 | execution.setExitStatus(ExitStatus.FAILED); |
276 | execution.setStatus(BatchStatus.FAILED); |
277 | execution.addFailureException(t); |
278 | } |
279 | finally { |
280 | |
281 | if (execution.getStepExecutions().isEmpty()) { |
282 | execution.setExitStatus(ExitStatus.NOOP |
283 | .addExitDescription("All steps already completed or no steps configured for this job.")); |
284 | } |
285 | |
286 | execution.setEndTime(new Date()); |
287 | |
288 | try { |
289 | listener.afterJob(execution); |
290 | } |
291 | catch (Exception e) { |
292 | logger.error("Exception encountered in afterStep callback", e); |
293 | } |
294 | |
295 | jobRepository.update(execution); |
296 | |
297 | } |
298 | |
299 | } |
300 | |
301 | /** |
302 | * Convenience method for subclasses to delegate the handling of a specific |
303 | * step in the context of the current {@link JobExecution}. Clients of this |
304 | * method do not need access to the {@link JobRepository}, nor do they need |
305 | * to worry about populating the execution context on a restart, nor |
306 | * detecting the interrupted state (in job or step execution). |
307 | * |
308 | * @param step the {@link Step} to execute |
309 | * @param execution the current {@link JobExecution} |
310 | * @return the {@link StepExecution} corresponding to this step |
311 | * |
312 | * @throws JobInterruptedException if the {@link JobExecution} has been |
313 | * interrupted, and in particular if {@link BatchStatus#ABANDONED} or |
314 | * {@link BatchStatus#STOPPING} is detected |
315 | * @throws StartLimitExceededException if the start limit has been exceeded |
316 | * for this step |
317 | * @throws JobRestartException if the job is in an inconsistent state from |
318 | * an earlier failure |
319 | */ |
320 | protected final StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException, |
321 | JobRestartException, StartLimitExceededException { |
322 | if (execution.isStopping()) { |
323 | throw new JobInterruptedException("JobExecution interrupted."); |
324 | } |
325 | |
326 | JobInstance jobInstance = execution.getJobInstance(); |
327 | |
328 | StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName()); |
329 | StepExecution currentStepExecution = lastStepExecution; |
330 | |
331 | if (shouldStart(lastStepExecution, jobInstance, step)) { |
332 | |
333 | currentStepExecution = execution.createStepExecution(step.getName()); |
334 | |
335 | boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals( |
336 | BatchStatus.COMPLETED)); |
337 | |
338 | if (isRestart) { |
339 | currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); |
340 | } |
341 | else { |
342 | currentStepExecution.setExecutionContext(new ExecutionContext()); |
343 | } |
344 | |
345 | jobRepository.add(currentStepExecution); |
346 | |
347 | logger.info("Executing step: [" + step + "]"); |
348 | step.execute(currentStepExecution); |
349 | |
350 | jobRepository.updateExecutionContext(execution); |
351 | |
352 | if (currentStepExecution.getStatus() == BatchStatus.STOPPING || currentStepExecution.getStatus() == BatchStatus.STOPPED) { |
353 | throw new JobInterruptedException("Job interrupted by step execution"); |
354 | } |
355 | |
356 | } else { |
357 | // currentStepExecution.setExitStatus(ExitStatus.NOOP); |
358 | } |
359 | |
360 | return currentStepExecution; |
361 | |
362 | } |
363 | |
364 | /** |
365 | * Convenience method for subclasses so they can change the state of a |
366 | * {@link StepExecution} if necessary. Use with care (and not at all |
367 | * preferably) and only before or after a step is executed. |
368 | * |
369 | * @param stepExecution |
370 | */ |
371 | protected void updateStepExecution(StepExecution stepExecution) { |
372 | jobRepository.update(stepExecution); |
373 | } |
374 | |
375 | /** |
376 | * Given a step and configuration, return true if the step should start, |
377 | * false if it should not, and throw an exception if the job should finish. |
378 | * @param lastStepExecution the last step execution |
379 | * @param jobInstance |
380 | * @param step |
381 | * |
382 | * @throws StartLimitExceededException if the start limit has been exceeded |
383 | * for this step |
384 | * @throws JobRestartException if the job is in an inconsistent state from |
385 | * an earlier failure |
386 | */ |
387 | private boolean shouldStart(StepExecution lastStepExecution, JobInstance jobInstance, Step step) |
388 | throws JobRestartException, StartLimitExceededException { |
389 | |
390 | BatchStatus stepStatus; |
391 | if (lastStepExecution == null) { |
392 | stepStatus = BatchStatus.STARTING; |
393 | } |
394 | else { |
395 | stepStatus = lastStepExecution.getStatus(); |
396 | } |
397 | |
398 | if (stepStatus == BatchStatus.UNKNOWN) { |
399 | throw new JobRestartException("Cannot restart step from UNKNOWN status. " |
400 | + "The last execution ended with a failure that could not be rolled back, " |
401 | + "so it may be dangerous to proceed. " + "Manual intervention is probably necessary."); |
402 | } |
403 | |
404 | if ((stepStatus == BatchStatus.COMPLETED && step.isAllowStartIfComplete() == false) |
405 | || stepStatus == BatchStatus.ABANDONED) { |
406 | // step is complete, false should be returned, indicating that the |
407 | // step should not be started |
408 | logger.info("Step already complete or not restartable, so no action to execute: "+lastStepExecution); |
409 | return false; |
410 | } |
411 | |
412 | if (jobRepository.getStepExecutionCount(jobInstance, step.getName()) < step.getStartLimit()) { |
413 | // step start count is less than start max, return true |
414 | return true; |
415 | } |
416 | else { |
417 | // start max has been exceeded, throw an exception. |
418 | throw new StartLimitExceededException("Maximum start limit exceeded for step: " + step.getName() |
419 | + "StartMax: " + step.getStartLimit()); |
420 | } |
421 | } |
422 | |
423 | private void updateStatus(JobExecution jobExecution, BatchStatus status) { |
424 | jobExecution.setStatus(status); |
425 | jobRepository.update(jobExecution); |
426 | } |
427 | |
428 | public String toString() { |
429 | return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]"; |
430 | } |
431 | |
432 | } |