View Javadoc

1   /*
2    * Copyright 2006-2011 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.builder;
17  
18  import org.springframework.batch.core.Step;
19  import org.springframework.batch.core.partition.PartitionHandler;
20  import org.springframework.batch.core.partition.StepExecutionSplitter;
21  import org.springframework.batch.core.partition.support.PartitionStep;
22  import org.springframework.batch.core.partition.support.Partitioner;
23  import org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter;
24  import org.springframework.batch.core.partition.support.StepExecutionAggregator;
25  import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
26  import org.springframework.core.task.SyncTaskExecutor;
27  import org.springframework.core.task.TaskExecutor;
28  
29  /**
30   * Step builder for {@link PartitionStep} instances. A partition step executes the same step (possibly remotely)
31   * multiple times with different input parameters (in the form of execution context). Useful for parallelization.
32   * 
33   * @author Dave Syer
34   * 
35   * @since 2.2
36   */
37  public class PartitionStepBuilder extends StepBuilderHelper<PartitionStepBuilder> {
38  
39  	private TaskExecutor taskExecutor;
40  
41  	private Partitioner partitioner;
42  
43  	private static final int DEFAULT_GRID_SIZE = 6;
44  
45  	private Step step;
46  
47  	private PartitionHandler partitionHandler;
48  
49  	private int gridSize = DEFAULT_GRID_SIZE;
50  
51  	private StepExecutionSplitter splitter;
52  
53  	private StepExecutionAggregator aggregator;
54  
55  	private String stepName;
56  
57  	/**
58  	 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
59  	 * 
60  	 * @param parent a parent helper containing common step properties
61  	 */
62  	public PartitionStepBuilder(StepBuilderHelper<?> parent) {
63  		super(parent);
64  	}
65  
66  	/**
67  	 * Add a partitioner which can be used to create a {@link StepExecutionSplitter}. Use either this or an explicit
68  	 * {@link #splitter(StepExecutionSplitter)} but not both.
69  	 * 
70  	 * @param slaveStepName the name of the slave step (used to construct step execution names)
71  	 * @param partitioner a partitioner to use
72  	 * @return this for fluent chaining
73  	 */
74  	public PartitionStepBuilder partitioner(String slaveStepName, Partitioner partitioner) {
75  		this.stepName = slaveStepName;
76  		this.partitioner = partitioner;
77  		return this;
78  	}
79  
80  	/**
81  	 * Provide an actual step instance to execute in parallel. If an explicit
82  	 * {@link #partitionHandler(PartitionHandler)} is provided, the step is optional and is only used to extract
83  	 * configuration data (name and other basic properties of a step).
84  	 * 
85  	 * @param step a step to execute in parallel
86  	 * @return this for fluent chaining
87  	 */
88  	public PartitionStepBuilder step(Step step) {
89  		this.step = step;
90  		return this;
91  	}
92  
93  	/**
94  	 * Provide a task executor to use when constructing a {@link PartitionHandler} from the {@link #step(Step)}. Mainly
95  	 * used for running a step locally in parallel, but can be used to execute remotely if the step is remote. Not used
96  	 * if an explicit {@link #partitionHandler(PartitionHandler)} is provided.
97  	 * 
98  	 * @param taskExecutor a task executor to use when executing steps in parallel
99  	 * @return this for fluent chaining
100 	 */
101 	public PartitionStepBuilder taskExecutor(TaskExecutor taskExecutor) {
102 		this.taskExecutor = taskExecutor;
103 		return this;
104 	}
105 
106 	/**
107 	 * Provide an explicit partition handler that will carry out the work of the partition step. The partition handler
108 	 * is the main SPI for adapting a partition step to a specific distributed computation environment. Optional if you
109 	 * only need local or remote processing through the Step interface.
110 	 * 
111 	 * @see #step(Step) for setting up a default handler that works with a local or remote Step
112 	 * 
113 	 * @param partitionHandler a partition handler
114 	 * @return this for fluent chaining
115 	 */
116 	public PartitionStepBuilder partitionHandler(PartitionHandler partitionHandler) {
117 		this.partitionHandler = partitionHandler;
118 		return this;
119 	}
120 
121 	/**
122 	 * A hint to the {@link #splitter(StepExecutionSplitter)} about how many step executions are required. If running
123 	 * locally or remotely through a {@link #taskExecutor(TaskExecutor)} determines precisely the number of step
124 	 * execution sin the first attempt at a partition step execution.
125 	 * 
126 	 * @param gridSize the grid size
127 	 * @return this for fluent chaining
128 	 */
129 	public PartitionStepBuilder gridSize(int gridSize) {
130 		this.gridSize = gridSize;
131 		return this;
132 	}
133 
134 	/**
135 	 * Provide an explicit {@link StepExecutionSplitter} instead of having one build from the
136 	 * {@link #partitioner(String, Partitioner)}. USeful if you need more control over the splitting.
137 	 * 
138 	 * @param splitter a step execution splitter
139 	 * @return this for fluent chaining
140 	 */
141 	public PartitionStepBuilder splitter(StepExecutionSplitter splitter) {
142 		this.splitter = splitter;
143 		return this;
144 	}
145 
146 	/**
147 	 * Provide a step execution aggregator for aggregating partitioned step executions into a single result for the
148 	 * {@link PartitionStep} itself.  Default is a simple implementation that works in most cases.
149 	 * 
150 	 * @param aggregator a step execution aggregator
151 	 * @return this for fluent chaining
152 	 */
153 	public PartitionStepBuilder aggregator(StepExecutionAggregator aggregator) {
154 		this.aggregator = aggregator;
155 		return this;
156 	}
157 
158 	public Step build() {
159 
160 		PartitionStep step = new PartitionStep();
161 		step.setName(getName());
162 		super.enhance(step);
163 
164 		if (partitionHandler != null) {
165 			step.setPartitionHandler(partitionHandler);
166 		}
167 		else {
168 			TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
169 			partitionHandler.setStep(this.step);
170 			if (taskExecutor == null) {
171 				taskExecutor = new SyncTaskExecutor();
172 			}
173 			partitionHandler.setGridSize(gridSize);
174 			partitionHandler.setTaskExecutor(taskExecutor);
175 			step.setPartitionHandler(partitionHandler);
176 		}
177 
178 		if (splitter != null) {
179 			step.setStepExecutionSplitter(splitter);
180 		}
181 		else {
182 
183 			boolean allowStartIfComplete = isAllowStartIfComplete();
184 			String name = stepName;
185 			if (this.step != null) {
186 				try {
187 					allowStartIfComplete = this.step.isAllowStartIfComplete();
188 					name = this.step.getName();
189 				}
190 				catch (Exception e) {
191 					logger.info("Ignored exception from step asking for name and allowStartIfComplete flag. "
192 							+ "Using default from enclosing PartitionStep (" + name + "," + allowStartIfComplete + ").");
193 				}
194 			}
195 			SimpleStepExecutionSplitter splitter = new SimpleStepExecutionSplitter();
196 			splitter.setPartitioner(partitioner);
197 			splitter.setJobRepository(getJobRepository());
198 			splitter.setAllowStartIfComplete(allowStartIfComplete);
199 			splitter.setStepName(name);
200 			this.splitter = splitter;
201 			step.setStepExecutionSplitter(splitter);
202 
203 		}
204 
205 		if (aggregator != null) {
206 			step.setStepExecutionAggregator(aggregator);
207 		}
208 
209 		try {
210 			step.afterPropertiesSet();
211 		}
212 		catch (Exception e) {
213 			throw new StepBuilderException(e);
214 		}
215 
216 		return step;
217 
218 	}
219 
220 }