1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step; |
17 | |
18 | import java.io.PrintWriter; |
19 | import java.io.StringWriter; |
20 | import java.util.Date; |
21 | |
22 | import org.apache.commons.logging.Log; |
23 | import org.apache.commons.logging.LogFactory; |
24 | import org.springframework.batch.core.BatchStatus; |
25 | import org.springframework.batch.core.ExitStatus; |
26 | import org.springframework.batch.core.JobInterruptedException; |
27 | import org.springframework.batch.core.Step; |
28 | import org.springframework.batch.core.StepExecution; |
29 | import org.springframework.batch.core.StepExecutionListener; |
30 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
31 | import org.springframework.batch.core.launch.NoSuchJobException; |
32 | import org.springframework.batch.core.launch.support.ExitCodeMapper; |
33 | import org.springframework.batch.core.listener.CompositeStepExecutionListener; |
34 | import org.springframework.batch.core.repository.JobRepository; |
35 | import org.springframework.batch.core.scope.context.StepSynchronizationManager; |
36 | import org.springframework.batch.item.ExecutionContext; |
37 | import org.springframework.batch.repeat.RepeatException; |
38 | import org.springframework.beans.factory.BeanNameAware; |
39 | import org.springframework.beans.factory.InitializingBean; |
40 | import org.springframework.util.Assert; |
41 | import org.springframework.util.ClassUtils; |
42 | |
43 | /** |
44 | * A {@link Step} implementation that provides common behavior to subclasses, |
45 | * including registering and calling listeners. |
46 | * |
47 | * @author Dave Syer |
48 | * @author Ben Hale |
49 | * @author Robert Kasanicky |
50 | */ |
51 | public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware { |
52 | |
53 | private static final Log logger = LogFactory.getLog(AbstractStep.class); |
54 | |
55 | private String name; |
56 | |
57 | private int startLimit = Integer.MAX_VALUE; |
58 | |
59 | private boolean allowStartIfComplete = false; |
60 | |
61 | private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener(); |
62 | |
63 | private JobRepository jobRepository; |
64 | |
65 | /** |
66 | * Default constructor. |
67 | */ |
68 | public AbstractStep() { |
69 | super(); |
70 | } |
71 | |
72 | public void afterPropertiesSet() throws Exception { |
73 | Assert.notNull(jobRepository, "JobRepository is mandatory"); |
74 | } |
75 | |
76 | public String getName() { |
77 | return this.name; |
78 | } |
79 | |
80 | /** |
81 | * Set the name property. Always overrides the default value if this object |
82 | * is a Spring bean. |
83 | * |
84 | * @see #setBeanName(java.lang.String) |
85 | */ |
86 | public void setName(String name) { |
87 | this.name = name; |
88 | } |
89 | |
90 | /** |
91 | * Set the name property if it is not already set. Because of the order of |
92 | * the callbacks in a Spring container the name property will be set first |
93 | * if it is present. Care is needed with bean definition inheritance - if a |
94 | * parent bean has a name, then its children need an explicit name as well, |
95 | * otherwise they will not be unique. |
96 | * |
97 | * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) |
98 | */ |
99 | public void setBeanName(String name) { |
100 | if (this.name == null) { |
101 | this.name = name; |
102 | } |
103 | } |
104 | |
105 | public int getStartLimit() { |
106 | return this.startLimit; |
107 | } |
108 | |
109 | /** |
110 | * Public setter for the startLimit. |
111 | * |
112 | * @param startLimit the startLimit to set |
113 | */ |
114 | public void setStartLimit(int startLimit) { |
115 | this.startLimit = startLimit; |
116 | } |
117 | |
118 | public boolean isAllowStartIfComplete() { |
119 | return this.allowStartIfComplete; |
120 | } |
121 | |
122 | /** |
123 | * Public setter for flag that determines whether the step should start |
124 | * again if it is already complete. Defaults to false. |
125 | * |
126 | * @param allowStartIfComplete the value of the flag to set |
127 | */ |
128 | public void setAllowStartIfComplete(boolean allowStartIfComplete) { |
129 | this.allowStartIfComplete = allowStartIfComplete; |
130 | } |
131 | |
132 | /** |
133 | * Convenient constructor for setting only the name property. |
134 | * |
135 | * @param name |
136 | */ |
137 | public AbstractStep(String name) { |
138 | this.name = name; |
139 | } |
140 | |
141 | /** |
142 | * Extension point for subclasses to execute business logic. Subclasses |
143 | * should set the {@link ExitStatus} on the {@link StepExecution} before |
144 | * returning. |
145 | * |
146 | * @param stepExecution the current step context |
147 | * @throws Exception |
148 | */ |
149 | protected abstract void doExecute(StepExecution stepExecution) throws Exception; |
150 | |
151 | /** |
152 | * Extension point for subclasses to provide callbacks to their |
153 | * collaborators at the beginning of a step, to open or acquire resources. |
154 | * Does nothing by default. |
155 | * |
156 | * @param ctx the {@link ExecutionContext} to use |
157 | * @throws Exception |
158 | */ |
159 | protected void open(ExecutionContext ctx) throws Exception { |
160 | } |
161 | |
162 | /** |
163 | * Extension point for subclasses to provide callbacks to their |
164 | * collaborators at the end of a step (right at the end of the finally |
165 | * block), to close or release resources. Does nothing by default. |
166 | * |
167 | * @param ctx the {@link ExecutionContext} to use |
168 | * @throws Exception |
169 | */ |
170 | protected void close(ExecutionContext ctx) throws Exception { |
171 | } |
172 | |
173 | /** |
174 | * Template method for step execution logic - calls abstract methods for |
175 | * resource initialization ({@link #open(ExecutionContext)}), execution |
176 | * logic ({@link #doExecute(StepExecution)}) and resource closing ( |
177 | * {@link #close(ExecutionContext)}). |
178 | */ |
179 | public final void execute(StepExecution stepExecution) throws JobInterruptedException, |
180 | UnexpectedJobExecutionException { |
181 | |
182 | logger.debug("Executing: id="+stepExecution.getId()); |
183 | stepExecution.setStartTime(new Date()); |
184 | stepExecution.setStatus(BatchStatus.STARTED); |
185 | getJobRepository().update(stepExecution); |
186 | |
187 | // Start with a default value that will be trumped by anything |
188 | ExitStatus exitStatus = ExitStatus.EXECUTING; |
189 | Exception commitException = null; |
190 | |
191 | StepSynchronizationManager.register(stepExecution); |
192 | |
193 | try { |
194 | getCompositeListener().beforeStep(stepExecution); |
195 | open(stepExecution.getExecutionContext()); |
196 | |
197 | try { |
198 | doExecute(stepExecution); |
199 | } |
200 | catch (RepeatException e) { |
201 | throw e.getCause(); |
202 | } |
203 | exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus()); |
204 | |
205 | // Check if someone is trying to stop us |
206 | if (stepExecution.isTerminateOnly()) { |
207 | throw new JobInterruptedException("JobExecution interrupted."); |
208 | } |
209 | |
210 | // Need to upgrade here not set, in case the execution was stopped |
211 | stepExecution.upgradeStatus(BatchStatus.COMPLETED); |
212 | logger.debug("Step execution success: id=" + stepExecution.getId()); |
213 | } |
214 | catch (Throwable e) { |
215 | logger.error("Encountered an error executing the step", e); |
216 | stepExecution.setStatus(determineBatchStatus(e)); |
217 | exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e)); |
218 | stepExecution.addFailureException(e); |
219 | } |
220 | finally { |
221 | |
222 | try { |
223 | // Update the step execution to the latest known value so the listeners can act on it |
224 | exitStatus = exitStatus.and(stepExecution.getExitStatus()); |
225 | stepExecution.setExitStatus(exitStatus); |
226 | exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution)); |
227 | } |
228 | catch (Exception e) { |
229 | logger.error("Exception in afterStep callback", e); |
230 | } |
231 | |
232 | try { |
233 | getJobRepository().updateExecutionContext(stepExecution); |
234 | } |
235 | catch (Exception e) { |
236 | stepExecution.setStatus(BatchStatus.UNKNOWN); |
237 | exitStatus = exitStatus.and(ExitStatus.UNKNOWN); |
238 | stepExecution.addFailureException(e); |
239 | logger.error("Encountered an error saving batch meta data." |
240 | + "This job is now in an unknown state and should not be restarted.", commitException); |
241 | } |
242 | |
243 | stepExecution.setEndTime(new Date()); |
244 | stepExecution.setExitStatus(exitStatus); |
245 | |
246 | try { |
247 | getJobRepository().update(stepExecution); |
248 | } |
249 | catch (Exception e) { |
250 | stepExecution.setStatus(BatchStatus.UNKNOWN); |
251 | stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN)); |
252 | stepExecution.addFailureException(e); |
253 | logger.error("Encountered an error saving batch meta data." |
254 | + "This job is now in an unknown state and should not be restarted.", commitException); |
255 | } |
256 | |
257 | try { |
258 | close(stepExecution.getExecutionContext()); |
259 | } |
260 | catch (Exception e) { |
261 | logger.error("Exception while closing step execution resources", e); |
262 | stepExecution.addFailureException(e); |
263 | } |
264 | |
265 | StepSynchronizationManager.release(); |
266 | |
267 | logger.debug("Step execution complete: " + stepExecution.getSummary()); |
268 | } |
269 | } |
270 | |
271 | /** |
272 | * Determine the step status based on the exception. |
273 | */ |
274 | private static BatchStatus determineBatchStatus(Throwable e) { |
275 | if (e instanceof FatalException) { |
276 | return BatchStatus.UNKNOWN; |
277 | } |
278 | else if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) { |
279 | return BatchStatus.STOPPED; |
280 | } |
281 | else { |
282 | return BatchStatus.FAILED; |
283 | } |
284 | } |
285 | |
286 | /** |
287 | * Register a step listener for callbacks at the appropriate stages in a |
288 | * step execution. |
289 | * |
290 | * @param listener a {@link StepExecutionListener} |
291 | */ |
292 | public void registerStepExecutionListener(StepExecutionListener listener) { |
293 | this.stepExecutionListener.register(listener); |
294 | } |
295 | |
296 | /** |
297 | * Register each of the objects as listeners. |
298 | * |
299 | * @param listeners an array of listener objects of known types. |
300 | */ |
301 | public void setStepExecutionListeners(StepExecutionListener[] listeners) { |
302 | for (int i = 0; i < listeners.length; i++) { |
303 | registerStepExecutionListener(listeners[i]); |
304 | } |
305 | } |
306 | |
307 | /** |
308 | * @return composite listener that delegates to all registered listeners. |
309 | */ |
310 | protected StepExecutionListener getCompositeListener() { |
311 | return stepExecutionListener; |
312 | } |
313 | |
314 | /** |
315 | * Public setter for {@link JobRepository}. |
316 | * |
317 | * @param jobRepository is a mandatory dependence (no default). |
318 | */ |
319 | public void setJobRepository(JobRepository jobRepository) { |
320 | this.jobRepository = jobRepository; |
321 | } |
322 | |
323 | protected JobRepository getJobRepository() { |
324 | return jobRepository; |
325 | } |
326 | |
327 | public String toString() { |
328 | return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]"; |
329 | } |
330 | |
331 | /** |
332 | * Default mapping from throwable to {@link ExitStatus}. Clients can modify |
333 | * the exit code using a {@link StepExecutionListener}. |
334 | * |
335 | * @param ex the cause of the failure |
336 | * @return an {@link ExitStatus} |
337 | */ |
338 | private ExitStatus getDefaultExitStatusForFailure(Throwable ex) { |
339 | ExitStatus exitStatus; |
340 | if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) { |
341 | exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName()); |
342 | } |
343 | else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) { |
344 | exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName()); |
345 | } |
346 | else { |
347 | StringWriter writer = new StringWriter(); |
348 | ex.printStackTrace(new PrintWriter(writer)); |
349 | String message = writer.toString(); |
350 | exitStatus = ExitStatus.FAILED.addExitDescription(message); |
351 | } |
352 | |
353 | return exitStatus; |
354 | } |
355 | |
356 | /** |
357 | * Signals a fatal exception - e.g. unable to persist batch metadata or |
358 | * rollback transaction. Throwing this exception will result in storing |
359 | * {@link BatchStatus#UNKNOWN} as step's status. |
360 | */ |
361 | protected static class FatalException extends RuntimeException { |
362 | public FatalException(String string, Throwable e) { |
363 | super(string, e); |
364 | } |
365 | } |
366 | |
367 | } |