View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.partition.support;
18  
19  import java.util.Collection;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.Map;
23  import java.util.Map.Entry;
24  import java.util.Set;
25  
26  import org.springframework.batch.core.BatchStatus;
27  import org.springframework.batch.core.JobExecution;
28  import org.springframework.batch.core.JobExecutionException;
29  import org.springframework.batch.core.JobInstance;
30  import org.springframework.batch.core.Step;
31  import org.springframework.batch.core.StepExecution;
32  import org.springframework.batch.core.partition.StepExecutionSplitter;
33  import org.springframework.batch.core.repository.JobRepository;
34  import org.springframework.batch.item.ExecutionContext;
35  import org.springframework.beans.factory.InitializingBean;
36  import org.springframework.util.Assert;
37  
38  /**
39   * Generic implementation of {@link StepExecutionSplitter} that delegates to a
40   * {@link Partitioner} to generate {@link ExecutionContext} instances. Takes
41   * care of restartability and identifying the step executions from previous runs
42   * of the same job. The generated {@link StepExecution} instances have names
43   * that identify them uniquely in the partition. The name is constructed from a
44   * base (name of the target step) plus a suffix taken from the
45   * {@link Partitioner} identifiers, separated by a colon, e.g.
46   * <code>{step1:partition0, step1:partition1, ...}</code>.
47   *
48   * @author Dave Syer
49   * @since 2.0
50   */
51  public class SimpleStepExecutionSplitter implements StepExecutionSplitter, InitializingBean {
52  
53  	private static final String STEP_NAME_SEPARATOR = ":";
54  
55  	private String stepName;
56  
57  	private Partitioner partitioner;
58  
59  	private boolean allowStartIfComplete = false;
60  
61  	private JobRepository jobRepository;
62  
63  	/**
64  	 * Default constructor for convenience in configuration.
65  	 */
66  	public SimpleStepExecutionSplitter() {
67  	}
68  
69  	/**
70  	 * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
71  	 * properties.
72  	 *
73  	 * @param jobRepository the {@link JobRepository}
74  	 * @param allowStartIfComplete flag specifying preferences on restart
75  	 * @param stepName the target step name
76  	 * @param partitioner a {@link Partitioner} to use for generating input
77  	 * parameters
78  	 */
79  	public SimpleStepExecutionSplitter(JobRepository jobRepository, boolean allowStartIfComplete, String stepName, Partitioner partitioner) {
80  		this.jobRepository = jobRepository;
81  		this.allowStartIfComplete = allowStartIfComplete;
82  		this.partitioner = partitioner;
83  		this.stepName = stepName;
84  	}
85  
86  	/**
87  	 * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
88  	 * properties.
89  	 *
90  	 * @param jobRepository the {@link JobRepository}
91  	 * @param step the target step (a local version of it), used to extract the
92  	 * name and allowStartIfComplete flags
93  	 * @param partitioner a {@link Partitioner} to use for generating input
94  	 * parameters
95  	 *
96  	 * @deprecated use {@link #SimpleStepExecutionSplitter(JobRepository, boolean, String, Partitioner)} instead
97  	 */
98  	@Deprecated
99  	public SimpleStepExecutionSplitter(JobRepository jobRepository, Step step, Partitioner partitioner) {
100 		this.jobRepository = jobRepository;
101 		this.allowStartIfComplete = step.isAllowStartIfComplete();
102 		this.partitioner = partitioner;
103 		this.stepName = step.getName();
104 	}
105 
106 	/**
107 	 * Check mandatory properties (step name, job repository and partitioner).
108 	 *
109 	 * @see InitializingBean#afterPropertiesSet()
110 	 */
111 	@Override
112 	public void afterPropertiesSet() throws Exception {
113 		Assert.state(jobRepository != null, "A JobRepository is required");
114 		Assert.state(stepName != null, "A step name is required");
115 		Assert.state(partitioner != null, "A Partitioner is required");
116 	}
117 
118 	/**
119 	 * Flag to indicate that the partition target step is allowed to start if an
120 	 * execution is complete. Defaults to the same value as the underlying step.
121 	 * Set this manually to override the underlying step properties.
122 	 *
123 	 * @see Step#isAllowStartIfComplete()
124 	 *
125 	 * @param allowStartIfComplete the value to set
126 	 */
127 	public void setAllowStartIfComplete(boolean allowStartIfComplete) {
128 		this.allowStartIfComplete = allowStartIfComplete;
129 	}
130 
131 	/**
132 	 * The job repository that will be used to manage the persistence of the
133 	 * delegate step executions.
134 	 *
135 	 * @param jobRepository the JobRepository to set
136 	 */
137 	public void setJobRepository(JobRepository jobRepository) {
138 		this.jobRepository = jobRepository;
139 	}
140 
141 	/**
142 	 * The {@link Partitioner} that will be used to generate step execution meta
143 	 * data for the target step.
144 	 *
145 	 * @param partitioner the partitioner to set
146 	 */
147 	public void setPartitioner(Partitioner partitioner) {
148 		this.partitioner = partitioner;
149 	}
150 
151 	/**
152 	 * The name of the target step that will be executed across the partitions.
153 	 * Mandatory with no default.
154 	 *
155 	 * @param stepName the step name to set
156 	 */
157 	public void setStepName(String stepName) {
158 		this.stepName = stepName;
159 	}
160 
161 	/**
162 	 * @see StepExecutionSplitter#getStepName()
163 	 */
164 	@Override
165 	public String getStepName() {
166 		return this.stepName;
167 	}
168 
169 	/**
170 	 * @see StepExecutionSplitter#split(StepExecution, int)
171 	 */
172 	@Override
173 	public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException {
174 
175 		JobExecution jobExecution = stepExecution.getJobExecution();
176 
177 		Map<String, ExecutionContext> contexts = getContexts(stepExecution, gridSize);
178 		Set<StepExecution> set = new HashSet<StepExecution>(contexts.size());
179 
180 		for (Entry<String, ExecutionContext> context : contexts.entrySet()) {
181 
182 			// Make the step execution name unique and repeatable
183 			String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();
184 
185 			StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);
186 
187 			boolean startable = getStartable(currentStepExecution, context.getValue());
188 
189 			if (startable) {
190 				jobRepository.add(currentStepExecution);
191 				set.add(currentStepExecution);
192 			}
193 
194 		}
195 
196 		return set;
197 
198 	}
199 
200 	private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) {
201 
202 		ExecutionContext context = stepExecution.getExecutionContext();
203 		String key = SimpleStepExecutionSplitter.class.getSimpleName() + ".GRID_SIZE";
204 
205 		// If this is a restart we must retain the same grid size, ignoring the
206 		// one passed in...
207 		int splitSize = (int) context.getLong(key, gridSize);
208 		context.putLong(key, splitSize);
209 
210 		Map<String, ExecutionContext> result;
211 		if (context.isDirty()) {
212 			// The context changed so we didn't already know the partitions
213 			jobRepository.updateExecutionContext(stepExecution);
214 			result = partitioner.partition(splitSize);
215 		}
216 		else {
217 			if (partitioner instanceof PartitionNameProvider) {
218 				result = new HashMap<String, ExecutionContext>();
219 				Collection<String> names = ((PartitionNameProvider) partitioner).getPartitionNames(splitSize);
220 				for (String name : names) {
221 					/*
222 					 * We need to return the same keys as the original (failed)
223 					 * execution, but the execution contexts will be discarded
224 					 * so they can be empty.
225 					 */
226 					result.put(name, new ExecutionContext());
227 				}
228 			}
229 			else {
230 				// If no names are provided, grab the partition again.
231 				result = partitioner.partition(splitSize);
232 			}
233 		}
234 
235 		return result;
236 	}
237 
238 	private boolean getStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
239 
240 		JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
241 		String stepName = stepExecution.getStepName();
242 		StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, stepName);
243 
244 		boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED);
245 
246 		if (isRestart) {
247 			stepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
248 		}
249 		else {
250 			stepExecution.setExecutionContext(context);
251 		}
252 
253 		return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart;
254 
255 	}
256 
257 	private boolean shouldStart(boolean allowStartIfComplete, StepExecution stepExecution, StepExecution lastStepExecution)
258 			throws JobExecutionException {
259 
260 		if (lastStepExecution == null) {
261 			return true;
262 		}
263 
264 		BatchStatus stepStatus = lastStepExecution.getStatus();
265 
266 		if (stepStatus == BatchStatus.UNKNOWN) {
267 			throw new JobExecutionException("Cannot restart step from UNKNOWN status.  "
268 					+ "The last execution ended with a failure that could not be rolled back, "
269 					+ "so it may be dangerous to proceed.  " + "Manual intervention is probably necessary.");
270 		}
271 
272 		if (stepStatus == BatchStatus.COMPLETED) {
273 			if (!allowStartIfComplete) {
274 				if (isSameJobExecution(stepExecution, lastStepExecution)) {
275 					// it's always OK to start again in the same JobExecution
276 					return true;
277 				}
278 				// step is complete, false should be returned, indicating that
279 				// the step should not be started
280 				return false;
281 			}
282 			else {
283 				return true;
284 			}
285 		}
286 
287 		if (stepStatus == BatchStatus.STOPPED || stepStatus == BatchStatus.FAILED) {
288 			return true;
289 		}
290 
291 		if (stepStatus == BatchStatus.STARTED || stepStatus == BatchStatus.STARTING
292 				|| stepStatus == BatchStatus.STOPPING) {
293 			throw new JobExecutionException(
294 					"Cannot restart step from "
295 							+ stepStatus
296 							+ " status.  "
297 							+ "The old execution may still be executing, so you may need to verify manually that this is the case.");
298 		}
299 
300 		throw new JobExecutionException("Cannot restart step from " + stepStatus + " status.  "
301 				+ "We believe the old execution was abandoned and therefore has been marked as un-restartable.");
302 
303 	}
304 
305 	private boolean isSameJobExecution(StepExecution stepExecution, StepExecution lastStepExecution) {
306 		if (stepExecution.getJobExecutionId()==null) {
307 			return lastStepExecution.getJobExecutionId()==null;
308 		}
309 		return stepExecution.getJobExecutionId().equals(lastStepExecution.getJobExecutionId());
310 	}
311 
312 }