1 | /* |
2 | * Copyright 2006-2011 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.builder; |
17 | |
18 | import org.springframework.batch.core.Step; |
19 | import org.springframework.batch.core.partition.PartitionHandler; |
20 | import org.springframework.batch.core.partition.StepExecutionSplitter; |
21 | import org.springframework.batch.core.partition.support.PartitionStep; |
22 | import org.springframework.batch.core.partition.support.Partitioner; |
23 | import org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter; |
24 | import org.springframework.batch.core.partition.support.StepExecutionAggregator; |
25 | import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler; |
26 | import org.springframework.core.task.SyncTaskExecutor; |
27 | import org.springframework.core.task.TaskExecutor; |
28 | |
29 | /** |
30 | * Step builder for {@link PartitionStep} instances. A partition step executes the same step (possibly remotely) |
31 | * multiple times with different input parameters (in the form of execution context). Useful for parallelization. |
32 | * |
33 | * @author Dave Syer |
34 | * |
35 | * @since 2.2 |
36 | */ |
37 | public class PartitionStepBuilder extends StepBuilderHelper<PartitionStepBuilder> { |
38 | |
39 | private TaskExecutor taskExecutor; |
40 | |
41 | private Partitioner partitioner; |
42 | |
43 | private static final int DEFAULT_GRID_SIZE = 6; |
44 | |
45 | private Step step; |
46 | |
47 | private PartitionHandler partitionHandler; |
48 | |
49 | private int gridSize = DEFAULT_GRID_SIZE; |
50 | |
51 | private StepExecutionSplitter splitter; |
52 | |
53 | private StepExecutionAggregator aggregator; |
54 | |
55 | private String stepName; |
56 | |
57 | /** |
58 | * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. |
59 | * |
60 | * @param parent a parent helper containing common step properties |
61 | */ |
62 | public PartitionStepBuilder(StepBuilderHelper<?> parent) { |
63 | super(parent); |
64 | } |
65 | |
66 | /** |
67 | * Add a partitioner which can be used to create a {@link StepExecutionSplitter}. Use either this or an explicit |
68 | * {@link #splitter(StepExecutionSplitter)} but not both. |
69 | * |
70 | * @param slaveStepName the name of the slave step (used to construct step execution names) |
71 | * @param partitioner a partitioner to use |
72 | * @return this for fluent chaining |
73 | */ |
74 | public PartitionStepBuilder partitioner(String slaveStepName, Partitioner partitioner) { |
75 | this.stepName = slaveStepName; |
76 | this.partitioner = partitioner; |
77 | return this; |
78 | } |
79 | |
80 | /** |
81 | * Provide an actual step instance to execute in parallel. If an explicit |
82 | * {@link #partitionHandler(PartitionHandler)} is provided, the step is optional and is only used to extract |
83 | * configuration data (name and other basic properties of a step). |
84 | * |
85 | * @param step a step to execute in parallel |
86 | * @return this for fluent chaining |
87 | */ |
88 | public PartitionStepBuilder step(Step step) { |
89 | this.step = step; |
90 | return this; |
91 | } |
92 | |
93 | /** |
94 | * Provide a task executor to use when constructing a {@link PartitionHandler} from the {@link #step(Step)}. Mainly |
95 | * used for running a step locally in parallel, but can be used to execute remotely if the step is remote. Not used |
96 | * if an explicit {@link #partitionHandler(PartitionHandler)} is provided. |
97 | * |
98 | * @param taskExecutor a task executor to use when executing steps in parallel |
99 | * @return this for fluent chaining |
100 | */ |
101 | public PartitionStepBuilder taskExecutor(TaskExecutor taskExecutor) { |
102 | this.taskExecutor = taskExecutor; |
103 | return this; |
104 | } |
105 | |
106 | /** |
107 | * Provide an explicit partition handler that will carry out the work of the partition step. The partition handler |
108 | * is the main SPI for adapting a partition step to a specific distributed computation environment. Optional if you |
109 | * only need local or remote processing through the Step interface. |
110 | * |
111 | * @see #step(Step) for setting up a default handler that works with a local or remote Step |
112 | * |
113 | * @param partitionHandler a partition handler |
114 | * @return this for fluent chaining |
115 | */ |
116 | public PartitionStepBuilder partitionHandler(PartitionHandler partitionHandler) { |
117 | this.partitionHandler = partitionHandler; |
118 | return this; |
119 | } |
120 | |
121 | /** |
122 | * A hint to the {@link #splitter(StepExecutionSplitter)} about how many step executions are required. If running |
123 | * locally or remotely through a {@link #taskExecutor(TaskExecutor)} determines precisely the number of step |
124 | * execution sin the first attempt at a partition step execution. |
125 | * |
126 | * @param gridSize the grid size |
127 | * @return this for fluent chaining |
128 | */ |
129 | public PartitionStepBuilder gridSize(int gridSize) { |
130 | this.gridSize = gridSize; |
131 | return this; |
132 | } |
133 | |
134 | /** |
135 | * Provide an explicit {@link StepExecutionSplitter} instead of having one build from the |
136 | * {@link #partitioner(String, Partitioner)}. USeful if you need more control over the splitting. |
137 | * |
138 | * @param splitter a step execution splitter |
139 | * @return this for fluent chaining |
140 | */ |
141 | public PartitionStepBuilder splitter(StepExecutionSplitter splitter) { |
142 | this.splitter = splitter; |
143 | return this; |
144 | } |
145 | |
146 | /** |
147 | * Provide a step execution aggregator for aggregating partitioned step executions into a single result for the |
148 | * {@link PartitionStep} itself. Default is a simple implementation that works in most cases. |
149 | * |
150 | * @param aggregator a step execution aggregator |
151 | * @return this for fluent chaining |
152 | */ |
153 | public PartitionStepBuilder aggregator(StepExecutionAggregator aggregator) { |
154 | this.aggregator = aggregator; |
155 | return this; |
156 | } |
157 | |
158 | public Step build() { |
159 | |
160 | PartitionStep step = new PartitionStep(); |
161 | step.setName(getName()); |
162 | super.enhance(step); |
163 | |
164 | if (partitionHandler != null) { |
165 | step.setPartitionHandler(partitionHandler); |
166 | } |
167 | else { |
168 | TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); |
169 | partitionHandler.setStep(this.step); |
170 | if (taskExecutor == null) { |
171 | taskExecutor = new SyncTaskExecutor(); |
172 | } |
173 | partitionHandler.setGridSize(gridSize); |
174 | partitionHandler.setTaskExecutor(taskExecutor); |
175 | step.setPartitionHandler(partitionHandler); |
176 | } |
177 | |
178 | if (splitter != null) { |
179 | step.setStepExecutionSplitter(splitter); |
180 | } |
181 | else { |
182 | |
183 | boolean allowStartIfComplete = isAllowStartIfComplete(); |
184 | String name = stepName; |
185 | if (this.step != null) { |
186 | try { |
187 | allowStartIfComplete = this.step.isAllowStartIfComplete(); |
188 | name = this.step.getName(); |
189 | } |
190 | catch (Exception e) { |
191 | logger.info("Ignored exception from step asking for name and allowStartIfComplete flag. " |
192 | + "Using default from enclosing PartitionStep (" + name + "," + allowStartIfComplete + ")."); |
193 | } |
194 | } |
195 | SimpleStepExecutionSplitter splitter = new SimpleStepExecutionSplitter(); |
196 | splitter.setPartitioner(partitioner); |
197 | splitter.setJobRepository(getJobRepository()); |
198 | splitter.setAllowStartIfComplete(allowStartIfComplete); |
199 | splitter.setStepName(name); |
200 | this.splitter = splitter; |
201 | step.setStepExecutionSplitter(splitter); |
202 | |
203 | } |
204 | |
205 | if (aggregator != null) { |
206 | step.setStepExecutionAggregator(aggregator); |
207 | } |
208 | |
209 | try { |
210 | step.afterPropertiesSet(); |
211 | } |
212 | catch (Exception e) { |
213 | throw new StepBuilderException(e); |
214 | } |
215 | |
216 | return step; |
217 | |
218 | } |
219 | |
220 | } |