View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step;
17  
18  import java.util.Date;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.springframework.batch.core.BatchStatus;
23  import org.springframework.batch.core.ExitStatus;
24  import org.springframework.batch.core.JobInterruptedException;
25  import org.springframework.batch.core.Step;
26  import org.springframework.batch.core.StepExecution;
27  import org.springframework.batch.core.StepExecutionListener;
28  import org.springframework.batch.core.UnexpectedJobExecutionException;
29  import org.springframework.batch.core.launch.NoSuchJobException;
30  import org.springframework.batch.core.launch.support.ExitCodeMapper;
31  import org.springframework.batch.core.listener.CompositeStepExecutionListener;
32  import org.springframework.batch.core.repository.JobRepository;
33  import org.springframework.batch.core.scope.context.StepSynchronizationManager;
34  import org.springframework.batch.item.ExecutionContext;
35  import org.springframework.batch.repeat.RepeatException;
36  import org.springframework.beans.factory.BeanNameAware;
37  import org.springframework.beans.factory.InitializingBean;
38  import org.springframework.util.Assert;
39  import org.springframework.util.ClassUtils;
40  
41  /**
42   * A {@link Step} implementation that provides common behavior to subclasses, including registering and calling
43   * listeners.
44   *
45   * @author Dave Syer
46   * @author Ben Hale
47   * @author Robert Kasanicky
48   */
49  public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {
50  
51  	private static final Log logger = LogFactory.getLog(AbstractStep.class);
52  
53  	private String name;
54  
55  	private int startLimit = Integer.MAX_VALUE;
56  
57  	private boolean allowStartIfComplete = false;
58  
59  	private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener();
60  
61  	private JobRepository jobRepository;
62  
63  	/**
64  	 * Default constructor.
65  	 */
66  	public AbstractStep() {
67  		super();
68  	}
69  
70  	@Override
71  	public void afterPropertiesSet() throws Exception {
72  		Assert.state(name != null, "A Step must have a name");
73  		Assert.state(jobRepository != null, "JobRepository is mandatory");
74  	}
75  
76  	@Override
77  	public String getName() {
78  		return this.name;
79  	}
80  
81  	/**
82  	 * Set the name property. Always overrides the default value if this object is a Spring bean.
83  	 *
84  	 * @see #setBeanName(java.lang.String)
85  	 */
86  	public void setName(String name) {
87  		this.name = name;
88  	}
89  
90  	/**
91  	 * Set the name property if it is not already set. Because of the order of the callbacks in a Spring container the
92  	 * name property will be set first if it is present. Care is needed with bean definition inheritance - if a parent
93  	 * bean has a name, then its children need an explicit name as well, otherwise they will not be unique.
94  	 *
95  	 * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
96  	 */
97  	@Override
98  	public void setBeanName(String name) {
99  		if (this.name == null) {
100 			this.name = name;
101 		}
102 	}
103 
104 	@Override
105 	public int getStartLimit() {
106 		return this.startLimit;
107 	}
108 
109 	/**
110 	 * Public setter for the startLimit.
111 	 *
112 	 * @param startLimit the startLimit to set
113 	 */
114 	public void setStartLimit(int startLimit) {
115 		this.startLimit = startLimit;
116 	}
117 
118 	@Override
119 	public boolean isAllowStartIfComplete() {
120 		return this.allowStartIfComplete;
121 	}
122 
123 	/**
124 	 * Public setter for flag that determines whether the step should start again if it is already complete. Defaults to
125 	 * false.
126 	 *
127 	 * @param allowStartIfComplete the value of the flag to set
128 	 */
129 	public void setAllowStartIfComplete(boolean allowStartIfComplete) {
130 		this.allowStartIfComplete = allowStartIfComplete;
131 	}
132 
133 	/**
134 	 * Convenient constructor for setting only the name property.
135 	 *
136 	 * @param name
137 	 */
138 	public AbstractStep(String name) {
139 		this.name = name;
140 	}
141 
142 	/**
143 	 * Extension point for subclasses to execute business logic. Subclasses should set the {@link ExitStatus} on the
144 	 * {@link StepExecution} before returning.
145 	 *
146 	 * @param stepExecution the current step context
147 	 * @throws Exception
148 	 */
149 	protected abstract void doExecute(StepExecution stepExecution) throws Exception;
150 
151 	/**
152 	 * Extension point for subclasses to provide callbacks to their collaborators at the beginning of a step, to open or
153 	 * acquire resources. Does nothing by default.
154 	 *
155 	 * @param ctx the {@link ExecutionContext} to use
156 	 * @throws Exception
157 	 */
158 	protected void open(ExecutionContext ctx) throws Exception {
159 	}
160 
161 	/**
162 	 * Extension point for subclasses to provide callbacks to their collaborators at the end of a step (right at the end
163 	 * of the finally block), to close or release resources. Does nothing by default.
164 	 *
165 	 * @param ctx the {@link ExecutionContext} to use
166 	 * @throws Exception
167 	 */
168 	protected void close(ExecutionContext ctx) throws Exception {
169 	}
170 
171 	/**
172 	 * Template method for step execution logic - calls abstract methods for resource initialization (
173 	 * {@link #open(ExecutionContext)}), execution logic ({@link #doExecute(StepExecution)}) and resource closing (
174 	 * {@link #close(ExecutionContext)}).
175 	 */
176 	@Override
177 	public final void execute(StepExecution stepExecution) throws JobInterruptedException,
178 	UnexpectedJobExecutionException {
179 
180 		logger.debug("Executing: id=" + stepExecution.getId());
181 		stepExecution.setStartTime(new Date());
182 		stepExecution.setStatus(BatchStatus.STARTED);
183 		getJobRepository().update(stepExecution);
184 
185 		// Start with a default value that will be trumped by anything
186 		ExitStatus exitStatus = ExitStatus.EXECUTING;
187 
188 		StepSynchronizationManager.register(stepExecution);
189 
190 		try {
191 			getCompositeListener().beforeStep(stepExecution);
192 			open(stepExecution.getExecutionContext());
193 
194 			try {
195 				doExecute(stepExecution);
196 			}
197 			catch (RepeatException e) {
198 				throw e.getCause();
199 			}
200 			exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
201 
202 			// Check if someone is trying to stop us
203 			if (stepExecution.isTerminateOnly()) {
204 				throw new JobInterruptedException("JobExecution interrupted.");
205 			}
206 
207 			// Need to upgrade here not set, in case the execution was stopped
208 			stepExecution.upgradeStatus(BatchStatus.COMPLETED);
209 			logger.debug("Step execution success: id=" + stepExecution.getId());
210 		}
211 		catch (Throwable e) {
212 			stepExecution.upgradeStatus(determineBatchStatus(e));
213 			exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
214 			stepExecution.addFailureException(e);
215 			if (stepExecution.getStatus() == BatchStatus.STOPPED) {
216 				logger.info("Encountered interruption executing step: " + e.getMessage());
217 				if (logger.isDebugEnabled()) {
218 					logger.debug("Full exception", e);
219 				}
220 			}
221 			else {
222 				logger.error("Encountered an error executing the step", e);
223 			}
224 		}
225 		finally {
226 
227 			try {
228 				// Update the step execution to the latest known value so the
229 				// listeners can act on it
230 				exitStatus = exitStatus.and(stepExecution.getExitStatus());
231 				stepExecution.setExitStatus(exitStatus);
232 				exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
233 			}
234 			catch (Exception e) {
235 				logger.error("Exception in afterStep callback", e);
236 			}
237 
238 			try {
239 				getJobRepository().updateExecutionContext(stepExecution);
240 			}
241 			catch (Exception e) {
242 				stepExecution.setStatus(BatchStatus.UNKNOWN);
243 				exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
244 				stepExecution.addFailureException(e);
245 				logger.error("Encountered an error saving batch meta data. "
246 						+ "This job is now in an unknown state and should not be restarted.", e);
247 			}
248 
249 			stepExecution.setEndTime(new Date());
250 			stepExecution.setExitStatus(exitStatus);
251 
252 			try {
253 				getJobRepository().update(stepExecution);
254 			}
255 			catch (Exception e) {
256 				stepExecution.setStatus(BatchStatus.UNKNOWN);
257 				stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
258 				stepExecution.addFailureException(e);
259 				logger.error("Encountered an error saving batch meta data. "
260 						+ "This job is now in an unknown state and should not be restarted.", e);
261 			}
262 
263 			try {
264 				close(stepExecution.getExecutionContext());
265 			}
266 			catch (Exception e) {
267 				logger.error("Exception while closing step execution resources", e);
268 				stepExecution.addFailureException(e);
269 			}
270 
271 			StepSynchronizationManager.release();
272 
273 			logger.debug("Step execution complete: " + stepExecution.getSummary());
274 		}
275 	}
276 
277 	/**
278 	 * Determine the step status based on the exception.
279 	 */
280 	private static BatchStatus determineBatchStatus(Throwable e) {
281 		if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) {
282 			return BatchStatus.STOPPED;
283 		}
284 		else {
285 			return BatchStatus.FAILED;
286 		}
287 	}
288 
289 	/**
290 	 * Register a step listener for callbacks at the appropriate stages in a step execution.
291 	 *
292 	 * @param listener a {@link StepExecutionListener}
293 	 */
294 	public void registerStepExecutionListener(StepExecutionListener listener) {
295 		this.stepExecutionListener.register(listener);
296 	}
297 
298 	/**
299 	 * Register each of the objects as listeners.
300 	 *
301 	 * @param listeners an array of listener objects of known types.
302 	 */
303 	public void setStepExecutionListeners(StepExecutionListener[] listeners) {
304 		for (int i = 0; i < listeners.length; i++) {
305 			registerStepExecutionListener(listeners[i]);
306 		}
307 	}
308 
309 	/**
310 	 * @return composite listener that delegates to all registered listeners.
311 	 */
312 	protected StepExecutionListener getCompositeListener() {
313 		return stepExecutionListener;
314 	}
315 
316 	/**
317 	 * Public setter for {@link JobRepository}.
318 	 *
319 	 * @param jobRepository is a mandatory dependence (no default).
320 	 */
321 	public void setJobRepository(JobRepository jobRepository) {
322 		this.jobRepository = jobRepository;
323 	}
324 
325 	protected JobRepository getJobRepository() {
326 		return jobRepository;
327 	}
328 
329 	@Override
330 	public String toString() {
331 		return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]";
332 	}
333 
334 	/**
335 	 * Default mapping from throwable to {@link ExitStatus}. Clients can modify the exit code using a
336 	 * {@link StepExecutionListener}.
337 	 *
338 	 * @param ex the cause of the failure
339 	 * @return an {@link ExitStatus}
340 	 */
341 	private ExitStatus getDefaultExitStatusForFailure(Throwable ex) {
342 		ExitStatus exitStatus;
343 		if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) {
344 			exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName());
345 		}
346 		else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) {
347 			exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName());
348 		}
349 		else {
350 			exitStatus = ExitStatus.FAILED.addExitDescription(ex);
351 		}
352 
353 		return exitStatus;
354 	}
355 
356 }