1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.partition.support; |
18 | |
19 | import java.util.Collection; |
20 | |
21 | import org.springframework.batch.core.BatchStatus; |
22 | import org.springframework.batch.core.JobExecutionException; |
23 | import org.springframework.batch.core.Step; |
24 | import org.springframework.batch.core.StepExecution; |
25 | import org.springframework.batch.core.partition.PartitionHandler; |
26 | import org.springframework.batch.core.partition.StepExecutionSplitter; |
27 | import org.springframework.batch.core.step.AbstractStep; |
28 | import org.springframework.batch.item.ExecutionContext; |
29 | import org.springframework.util.Assert; |
30 | |
31 | /** |
32 | * Implementation of {@link Step} which partitions the execution and spreads the |
33 | * load using a {@link PartitionHandler}. |
34 | * |
35 | * @author Dave Syer |
36 | * @since 2.0 |
37 | */ |
38 | public class PartitionStep extends AbstractStep { |
39 | |
40 | private StepExecutionSplitter stepExecutionSplitter; |
41 | |
42 | private PartitionHandler partitionHandler; |
43 | |
44 | private StepExecutionAggregator stepExecutionAggregator = new DefaultStepExecutionAggregator(); |
45 | |
46 | /** |
47 | * A {@link PartitionHandler} which can send out step executions for remote |
48 | * processing and bring back the results. |
49 | * |
50 | * @param partitionHandler the {@link PartitionHandler} to set |
51 | */ |
52 | public void setPartitionHandler(PartitionHandler partitionHandler) { |
53 | this.partitionHandler = partitionHandler; |
54 | } |
55 | |
56 | /** |
57 | * A {@link StepExecutionAggregator} that can aggregate step executions when |
58 | * they come back from the handler. Defaults to a |
59 | * {@link DefaultStepExecutionAggregator}. |
60 | * |
61 | * @param stepExecutionAggregator the {@link StepExecutionAggregator} to set |
62 | */ |
63 | public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) { |
64 | this.stepExecutionAggregator = stepExecutionAggregator; |
65 | } |
66 | |
67 | /** |
68 | * Public setter for mandatory property {@link StepExecutionSplitter}. |
69 | * @param stepExecutionSplitter the {@link StepExecutionSplitter} to set |
70 | */ |
71 | public void setStepExecutionSplitter(StepExecutionSplitter stepExecutionSplitter) { |
72 | this.stepExecutionSplitter = stepExecutionSplitter; |
73 | } |
74 | |
75 | /** |
76 | * Assert that mandatory properties are set (stepExecutionSplitter, |
77 | * partitionHandler) and delegate top superclass. |
78 | * |
79 | * @see AbstractStep#afterPropertiesSet() |
80 | */ |
81 | @Override |
82 | public void afterPropertiesSet() throws Exception { |
83 | Assert.notNull(stepExecutionSplitter, "StepExecutionSplitter must be provided"); |
84 | Assert.notNull(partitionHandler, "PartitionHandler must be provided"); |
85 | super.afterPropertiesSet(); |
86 | } |
87 | |
88 | /** |
89 | * Delegate execution to the {@link PartitionHandler} provided. The |
90 | * {@link StepExecution} passed in here becomes the parent or master |
91 | * execution for the partition, summarising the status on exit of the |
92 | * logical grouping of work carried out by the {@link PartitionHandler}. The |
93 | * individual step executions and their input parameters (through |
94 | * {@link ExecutionContext}) for the partition elements are provided by the |
95 | * {@link StepExecutionSplitter}. |
96 | * |
97 | * @param stepExecution the master step execution for the partition |
98 | * |
99 | * @see Step#execute(StepExecution) |
100 | */ |
101 | @Override |
102 | protected void doExecute(StepExecution stepExecution) throws Exception { |
103 | |
104 | // Wait for task completion and then aggregate the results |
105 | Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution); |
106 | stepExecution.upgradeStatus(BatchStatus.COMPLETED); |
107 | stepExecutionAggregator.aggregate(stepExecution, executions); |
108 | |
109 | // If anything failed or had a problem we need to crap out |
110 | if (stepExecution.getStatus().isUnsuccessful()) { |
111 | throw new JobExecutionException("Partition handler returned an unsuccessful step"); |
112 | } |
113 | |
114 | } |
115 | |
116 | } |