View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.partition.support;
18  
19  import java.util.Collection;
20  
21  import org.springframework.batch.core.BatchStatus;
22  import org.springframework.batch.core.JobExecutionException;
23  import org.springframework.batch.core.Step;
24  import org.springframework.batch.core.StepExecution;
25  import org.springframework.batch.core.partition.PartitionHandler;
26  import org.springframework.batch.core.partition.StepExecutionSplitter;
27  import org.springframework.batch.core.step.AbstractStep;
28  import org.springframework.batch.item.ExecutionContext;
29  import org.springframework.util.Assert;
30  
31  /**
32   * Implementation of {@link Step} which partitions the execution and spreads the
33   * load using a {@link PartitionHandler}.
34   * 
35   * @author Dave Syer
36   * @since 2.0
37   */
38  public class PartitionStep extends AbstractStep {
39  
40  	private StepExecutionSplitter stepExecutionSplitter;
41  
42  	private PartitionHandler partitionHandler;
43  
44  	private StepExecutionAggregator stepExecutionAggregator = new DefaultStepExecutionAggregator();
45  
46  	/**
47  	 * A {@link PartitionHandler} which can send out step executions for remote
48  	 * processing and bring back the results.
49  	 * 
50  	 * @param partitionHandler the {@link PartitionHandler} to set
51  	 */
52  	public void setPartitionHandler(PartitionHandler partitionHandler) {
53  		this.partitionHandler = partitionHandler;
54  	}
55  
56  	/**
57  	 * A {@link StepExecutionAggregator} that can aggregate step executions when
58  	 * they come back from the handler. Defaults to a
59  	 * {@link DefaultStepExecutionAggregator}.
60  	 * 
61  	 * @param stepExecutionAggregator the {@link StepExecutionAggregator} to set
62  	 */
63  	public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) {
64  		this.stepExecutionAggregator = stepExecutionAggregator;
65  	}
66  
67  	/**
68  	 * Public setter for mandatory property {@link StepExecutionSplitter}.
69  	 * @param stepExecutionSplitter the {@link StepExecutionSplitter} to set
70  	 */
71  	public void setStepExecutionSplitter(StepExecutionSplitter stepExecutionSplitter) {
72  		this.stepExecutionSplitter = stepExecutionSplitter;
73  	}
74  
75  	/**
76  	 * Assert that mandatory properties are set (stepExecutionSplitter,
77  	 * partitionHandler) and delegate top superclass.
78  	 * 
79  	 * @see AbstractStep#afterPropertiesSet()
80  	 */
81  	public void afterPropertiesSet() throws Exception {
82  		Assert.notNull(stepExecutionSplitter, "StepExecutionSplitter must be provided");
83  		Assert.notNull(partitionHandler, "PartitionHandler must be provided");
84  		super.afterPropertiesSet();
85  	}
86  
87  	/**
88  	 * Delegate execution to the {@link PartitionHandler} provided. The
89  	 * {@link StepExecution} passed in here becomes the parent or master
90  	 * execution for the partition, summarising the status on exit of the
91  	 * logical grouping of work carried out by the {@link PartitionHandler}. The
92  	 * individual step executions and their input parameters (through
93  	 * {@link ExecutionContext}) for the partition elements are provided by the
94  	 * {@link StepExecutionSplitter}.
95  	 * 
96  	 * @param stepExecution the master step execution for the partition
97  	 * 
98  	 * @see Step#execute(StepExecution)
99  	 */
100 	@Override
101 	protected void doExecute(StepExecution stepExecution) throws Exception {
102 
103 		// Wait for task completion and then aggregate the results
104 		Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
105 		stepExecution.upgradeStatus(BatchStatus.COMPLETED);
106 		stepExecutionAggregator.aggregate(stepExecution, executions);
107 
108 		// If anything failed or had a problem we need to crap out
109 		if (stepExecution.getStatus().isUnsuccessful()) {
110 			throw new JobExecutionException("Partition handler returned an unsuccessful step");
111 		}
112 
113 	}
114 
115 }