View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.job;
18  
19  import org.apache.commons.logging.Log;
20  import org.apache.commons.logging.LogFactory;
21  import org.springframework.batch.core.BatchStatus;
22  import org.springframework.batch.core.JobExecution;
23  import org.springframework.batch.core.JobInstance;
24  import org.springframework.batch.core.JobInterruptedException;
25  import org.springframework.batch.core.StartLimitExceededException;
26  import org.springframework.batch.core.Step;
27  import org.springframework.batch.core.StepExecution;
28  import org.springframework.batch.core.repository.JobRepository;
29  import org.springframework.batch.core.repository.JobRestartException;
30  import org.springframework.batch.item.ExecutionContext;
31  import org.springframework.beans.factory.InitializingBean;
32  import org.springframework.util.Assert;
33  
34  /**
35   * Implementation of {@link StepHandler} that manages repository and restart
36   * concerns.
37   *
38   * @author Dave Syer
39   *
40   */
41  public class SimpleStepHandler implements StepHandler, InitializingBean {
42  
43  	private static final Log logger = LogFactory.getLog(SimpleStepHandler.class);
44  
45  	private JobRepository jobRepository;
46  
47  	private ExecutionContext executionContext;
48  
49  	/**
50  	 * Convenient default constructor for configuration usage.
51  	 */
52  	public SimpleStepHandler() {
53  		this(null);
54  	}
55  
56  	/**
57  	 * @param jobRepository
58  	 */
59  	public SimpleStepHandler(JobRepository jobRepository) {
60  		this(jobRepository, new ExecutionContext());
61  	}
62  
63  	/**
64  	 * @param jobRepository
65  	 * @param executionContext
66  	 */
67  	public SimpleStepHandler(JobRepository jobRepository, ExecutionContext executionContext) {
68  		this.jobRepository = jobRepository;
69  		this.executionContext = executionContext;
70  	}
71  
72  	/**
73  	 * Check mandatory properties (jobRepository).
74  	 *
75  	 * @see InitializingBean#afterPropertiesSet()
76  	 */
77  	@Override
78  	public void afterPropertiesSet() throws Exception {
79  		Assert.state(jobRepository != null, "A JobRepository must be provided");
80  	}
81  
82  	/**
83  	 * @param jobRepository the jobRepository to set
84  	 */
85  	public void setJobRepository(JobRepository jobRepository) {
86  		this.jobRepository = jobRepository;
87  	}
88  
89  	/**
90  	 * A context containing values to be added to the step execution before it
91  	 * is handled.
92  	 *
93  	 * @param executionContext the execution context to set
94  	 */
95  	public void setExecutionContext(ExecutionContext executionContext) {
96  		this.executionContext = executionContext;
97  	}
98  
99  	@Override
100 	public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException,
101 	JobRestartException, StartLimitExceededException {
102 		if (execution.isStopping()) {
103 			throw new JobInterruptedException("JobExecution interrupted.");
104 		}
105 
106 		JobInstance jobInstance = execution.getJobInstance();
107 
108 		StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
109 		if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
110 			// If the last execution of this step was in the same job, it's
111 			// probably intentional so we want to run it again...
112 			logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. "
113 					+ "If either step fails, both will be executed again on restart.", step.getName(), jobInstance
114 					.getJobName()));
115 			lastStepExecution = null;
116 		}
117 		StepExecution currentStepExecution = lastStepExecution;
118 
119 		if (shouldStart(lastStepExecution, jobInstance, step)) {
120 
121 			currentStepExecution = execution.createStepExecution(step.getName());
122 
123 			boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals(
124 					BatchStatus.COMPLETED));
125 
126 			if (isRestart) {
127 				currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
128 			}
129 			else {
130 				currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
131 			}
132 
133 			jobRepository.add(currentStepExecution);
134 
135 			logger.info("Executing step: [" + step.getName() + "]");
136 			try {
137 				step.execute(currentStepExecution);
138 			}
139 			catch (JobInterruptedException e) {
140 				// Ensure that the job gets the message that it is stopping
141 				// and can pass it on to other steps that are executing
142 				// concurrently.
143 				execution.setStatus(BatchStatus.STOPPING);
144 				throw e;
145 			}
146 
147 			jobRepository.updateExecutionContext(execution);
148 
149 			if (currentStepExecution.getStatus() == BatchStatus.STOPPING
150 					|| currentStepExecution.getStatus() == BatchStatus.STOPPED) {
151 				// Ensure that the job gets the message that it is stopping
152 				execution.setStatus(BatchStatus.STOPPING);
153 				throw new JobInterruptedException("Job interrupted by step execution");
154 			}
155 
156 		}
157 		else {
158 			// currentStepExecution.setExitStatus(ExitStatus.NOOP);
159 		}
160 
161 		return currentStepExecution;
162 	}
163 
164 	/**
165 	 * Detect whether a step execution belongs to this job execution.
166 	 * @param jobExecution the current job execution
167 	 * @param stepExecution an existing step execution
168 	 * @return
169 	 */
170 	private boolean stepExecutionPartOfExistingJobExecution(JobExecution jobExecution, StepExecution stepExecution) {
171 		return stepExecution != null && stepExecution.getJobExecutionId() != null
172 				&& stepExecution.getJobExecutionId().equals(jobExecution.getId());
173 	}
174 
175 	/**
176 	 * Given a step and configuration, return true if the step should start,
177 	 * false if it should not, and throw an exception if the job should finish.
178 	 * @param lastStepExecution the last step execution
179 	 * @param jobInstance
180 	 * @param step
181 	 *
182 	 * @throws StartLimitExceededException if the start limit has been exceeded
183 	 * for this step
184 	 * @throws JobRestartException if the job is in an inconsistent state from
185 	 * an earlier failure
186 	 */
187 	private boolean shouldStart(StepExecution lastStepExecution, JobInstance jobInstance, Step step)
188 			throws JobRestartException, StartLimitExceededException {
189 
190 		BatchStatus stepStatus;
191 		if (lastStepExecution == null) {
192 			stepStatus = BatchStatus.STARTING;
193 		}
194 		else {
195 			stepStatus = lastStepExecution.getStatus();
196 		}
197 
198 		if (stepStatus == BatchStatus.UNKNOWN) {
199 			throw new JobRestartException("Cannot restart step from UNKNOWN status. "
200 					+ "The last execution ended with a failure that could not be rolled back, "
201 					+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
202 		}
203 
204 		if ((stepStatus == BatchStatus.COMPLETED && step.isAllowStartIfComplete() == false)
205 				|| stepStatus == BatchStatus.ABANDONED) {
206 			// step is complete, false should be returned, indicating that the
207 			// step should not be started
208 			logger.info("Step already complete or not restartable, so no action to execute: " + lastStepExecution);
209 			return false;
210 		}
211 
212 		if (jobRepository.getStepExecutionCount(jobInstance, step.getName()) < step.getStartLimit()) {
213 			// step start count is less than start max, return true
214 			return true;
215 		}
216 		else {
217 			// start max has been exceeded, throw an exception.
218 			throw new StartLimitExceededException("Maximum start limit exceeded for step: " + step.getName()
219 					+ "StartMax: " + step.getStartLimit());
220 		}
221 	}
222 
223 }