1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.partition.support; |
18 | |
19 | import java.util.Collection; |
20 | |
21 | import org.springframework.batch.core.BatchStatus; |
22 | import org.springframework.batch.core.JobExecutionException; |
23 | import org.springframework.batch.core.Step; |
24 | import org.springframework.batch.core.StepExecution; |
25 | import org.springframework.batch.core.partition.PartitionHandler; |
26 | import org.springframework.batch.core.partition.StepExecutionSplitter; |
27 | import org.springframework.batch.core.step.AbstractStep; |
28 | import org.springframework.batch.item.ExecutionContext; |
29 | import org.springframework.util.Assert; |
30 | |
31 | /** |
32 | * Implementation of {@link Step} which partitions the execution and spreads the |
33 | * load using a {@link PartitionHandler}. |
34 | * |
35 | * @author Dave Syer |
36 | * @since 2.0 |
37 | */ |
38 | public class PartitionStep extends AbstractStep { |
39 | |
40 | private StepExecutionSplitter stepExecutionSplitter; |
41 | |
42 | private PartitionHandler partitionHandler; |
43 | |
44 | private StepExecutionAggregator aggregator = new StepExecutionAggregator(); |
45 | |
46 | /** |
47 | * Public setter for mandatory property {@link PartitionHandler}. |
48 | * @param partitionHandler the {@link PartitionHandler} to set |
49 | */ |
50 | public void setPartitionHandler(PartitionHandler partitionHandler) { |
51 | this.partitionHandler = partitionHandler; |
52 | } |
53 | |
54 | /** |
55 | * Public setter for mandatory property {@link StepExecutionSplitter}. |
56 | * @param stepExecutionSplitter the {@link StepExecutionSplitter} to set |
57 | */ |
58 | public void setStepExecutionSplitter(StepExecutionSplitter stepExecutionSplitter) { |
59 | this.stepExecutionSplitter = stepExecutionSplitter; |
60 | } |
61 | |
62 | /** |
63 | * Assert that mandatory properties are set (stepExecutionSplitter, |
64 | * partitionHandler) and delegate top superclass. |
65 | * |
66 | * @see AbstractStep#afterPropertiesSet() |
67 | */ |
68 | public void afterPropertiesSet() throws Exception { |
69 | Assert.notNull(stepExecutionSplitter, "StepExecutionSplitter must be provided"); |
70 | Assert.notNull(partitionHandler, "PartitionHandler must be provided"); |
71 | super.afterPropertiesSet(); |
72 | } |
73 | |
74 | /** |
75 | * Delegate execution to the {@link PartitionHandler} provided. The |
76 | * {@link StepExecution} passed in here becomes the parent or master |
77 | * execution for the partition, summarising the status on exit of the |
78 | * logical grouping of work carried out by the {@link PartitionHandler}. The |
79 | * individual step executions and their input parameters (through |
80 | * {@link ExecutionContext}) for the partition elements are provided by the |
81 | * {@link StepExecutionSplitter}. |
82 | * |
83 | * @param stepExecution the master step execution for the partition |
84 | * |
85 | * @see Step#execute(StepExecution) |
86 | */ |
87 | @Override |
88 | protected void doExecute(StepExecution stepExecution) throws Exception { |
89 | |
90 | // Wait for task completion and then aggregate the results |
91 | Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution); |
92 | stepExecution.upgradeStatus(BatchStatus.COMPLETED); |
93 | aggregator.aggregate(stepExecution, executions); |
94 | |
95 | // If anything failed or had a problem we need to crap out |
96 | if (stepExecution.getStatus().isUnsuccessful()) { |
97 | throw new JobExecutionException("Partition handler returned an unsuccessful step"); |
98 | } |
99 | |
100 | } |
101 | |
102 | } |