1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.core.partition.support;
18
19 import java.util.Collection;
20
21 import org.springframework.batch.core.BatchStatus;
22 import org.springframework.batch.core.JobExecutionException;
23 import org.springframework.batch.core.Step;
24 import org.springframework.batch.core.StepExecution;
25 import org.springframework.batch.core.partition.PartitionHandler;
26 import org.springframework.batch.core.partition.StepExecutionSplitter;
27 import org.springframework.batch.core.step.AbstractStep;
28 import org.springframework.batch.item.ExecutionContext;
29 import org.springframework.util.Assert;
30
31
32
33
34
35
36
37
38 public class PartitionStep extends AbstractStep {
39
40 private StepExecutionSplitter stepExecutionSplitter;
41
42 private PartitionHandler partitionHandler;
43
44 private StepExecutionAggregator stepExecutionAggregator = new DefaultStepExecutionAggregator();
45
46
47
48
49
50
51
52 public void setPartitionHandler(PartitionHandler partitionHandler) {
53 this.partitionHandler = partitionHandler;
54 }
55
56
57
58
59
60
61
62
63 public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) {
64 this.stepExecutionAggregator = stepExecutionAggregator;
65 }
66
67
68
69
70
71 public void setStepExecutionSplitter(StepExecutionSplitter stepExecutionSplitter) {
72 this.stepExecutionSplitter = stepExecutionSplitter;
73 }
74
75
76
77
78
79
80
81 public void afterPropertiesSet() throws Exception {
82 Assert.notNull(stepExecutionSplitter, "StepExecutionSplitter must be provided");
83 Assert.notNull(partitionHandler, "PartitionHandler must be provided");
84 super.afterPropertiesSet();
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 @Override
101 protected void doExecute(StepExecution stepExecution) throws Exception {
102
103
104 Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
105 stepExecution.upgradeStatus(BatchStatus.COMPLETED);
106 stepExecutionAggregator.aggregate(stepExecution, executions);
107
108
109 if (stepExecution.getStatus().isUnsuccessful()) {
110 throw new JobExecutionException("Partition handler returned an unsuccessful step");
111 }
112
113 }
114
115 }