View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.partition.support;
18  
19  import java.util.HashSet;
20  import java.util.Set;
21  import java.util.concurrent.Callable;
22  import java.util.concurrent.Future;
23  import java.util.concurrent.FutureTask;
24  
25  import org.springframework.batch.core.BatchStatus;
26  import org.springframework.batch.core.ExitStatus;
27  import org.springframework.batch.core.Step;
28  import org.springframework.batch.core.StepExecution;
29  import org.springframework.batch.core.partition.PartitionHandler;
30  import org.springframework.batch.core.step.StepHolder;
31  import org.springframework.beans.factory.InitializingBean;
32  import org.springframework.beans.factory.annotation.Required;
33  import org.springframework.core.task.SyncTaskExecutor;
34  import org.springframework.core.task.TaskExecutor;
35  import org.springframework.core.task.TaskRejectedException;
36  import org.springframework.util.Assert;
37  
38  /**
39   * A {@link PartitionHandler} that uses a {@link TaskExecutor} to execute the
40   * partitioned {@link Step} locally in multiple threads. This can be an
41   * effective approach for scaling batch steps that are IO intensive, like
42   * directory and filesystem scanning and copying.
43   * <p/>
44   * By default, the thread pool is synchronous.
45   *
46   * @author Sebastien Gerard
47   * @author Dave Syer
48   * @since 2.0
49   */
50  public class TaskExecutorPartitionHandler extends AbstractPartitionHandler implements StepHolder, InitializingBean {
51  
52  	private TaskExecutor taskExecutor = new SyncTaskExecutor();
53  
54  	private Step step;
55  
56      @Override
57  	public void afterPropertiesSet() throws Exception {
58  	}
59  
60  	/**
61  	 * Setter for the {@link TaskExecutor} that is used to farm out step
62  	 * executions to multiple threads.
63  	 * @param taskExecutor a {@link TaskExecutor}
64  	 */
65  	public void setTaskExecutor(TaskExecutor taskExecutor) {
66  		this.taskExecutor = taskExecutor;
67  	}
68  
69  	/**
70  	 * Setter for the {@link Step} that will be used to execute the partitioned
71  	 * {@link StepExecution}. This is a regular Spring Batch step, with all the
72  	 * business logic required to complete an execution based on the input
73  	 * parameters in its {@link StepExecution} context.
74  	 *
75  	 * @param step the {@link Step} instance to use to execute business logic
76  	 */
77      @Required
78  	public void setStep(Step step) {
79  		this.step = step;
80  	}
81  
82  	/**
83  	 * The step instance that will be executed in parallel by this handler.
84  	 *
85  	 * @return the step instance that will be used
86  	 * @see StepHolder#getStep()
87  	 */
88      @Override
89  	public Step getStep() {
90  		return this.step;
91  	}
92  
93      @Override
94      protected Set<StepExecution> doHandle(StepExecution masterStepExecution,
95                                            Set<StepExecution> partitionStepExecutions) throws Exception {
96          Assert.notNull(step, "A Step must be provided.");
97          final Set<Future<StepExecution>> tasks = new HashSet<Future<StepExecution>>(getGridSize());
98          final Set<StepExecution> result = new HashSet<StepExecution>();
99  
100         for (final StepExecution stepExecution : partitionStepExecutions) {
101             final FutureTask<StepExecution> task = createTask(step, stepExecution);
102 
103             try {
104                 taskExecutor.execute(task);
105                 tasks.add(task);
106             } catch (TaskRejectedException e) {
107                 // couldn't execute one of the tasks
108                 ExitStatus exitStatus = ExitStatus.FAILED
109                         .addExitDescription("TaskExecutor rejected the task for this step.");
110                 /*
111                  * Set the status in case the caller is tracking it through the
112                  * JobExecution.
113                  */
114                 stepExecution.setStatus(BatchStatus.FAILED);
115                 stepExecution.setExitStatus(exitStatus);
116                 result.add(stepExecution);
117             }
118         }
119 
120         for (Future<StepExecution> task : tasks) {
121             result.add(task.get());
122         }
123 
124         return result;
125 	}
126 
127     /**
128      * Creates the task executing the given step in the context of the given execution.
129      *
130      * @param step the step to execute
131      * @param stepExecution the given execution
132      * @return the task executing the given step
133      */
134     protected FutureTask<StepExecution> createTask(final Step step,
135                                                    final StepExecution stepExecution) {
136         return new FutureTask<StepExecution>(new Callable<StepExecution>() {
137             @Override
138             public StepExecution call() throws Exception {
139                 step.execute(stepExecution);
140                 return stepExecution;
141             }
142         });
143     }
144 
145 }