View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.job.flow.support.state;
17  
18  import java.util.ArrayList;
19  import java.util.Collection;
20  import java.util.concurrent.Callable;
21  import java.util.concurrent.ExecutionException;
22  import java.util.concurrent.Future;
23  import java.util.concurrent.FutureTask;
24  
25  import org.springframework.batch.core.job.flow.Flow;
26  import org.springframework.batch.core.job.flow.FlowExecution;
27  import org.springframework.batch.core.job.flow.FlowExecutionException;
28  import org.springframework.batch.core.job.flow.FlowExecutionStatus;
29  import org.springframework.batch.core.job.flow.FlowExecutor;
30  import org.springframework.batch.core.job.flow.FlowHolder;
31  import org.springframework.batch.core.job.flow.State;
32  import org.springframework.core.task.SyncTaskExecutor;
33  import org.springframework.core.task.TaskExecutor;
34  import org.springframework.core.task.TaskRejectedException;
35  
36  /**
37   * A {@link State} implementation that splits a {@link Flow} into multiple
38   * parallel subflows.
39   *
40   * @author Dave Syer
41   * @since 2.0
42   */
43  public class SplitState extends AbstractState implements FlowHolder {
44  
45  	private final Collection<Flow> flows;
46  
47  	private TaskExecutor taskExecutor = new SyncTaskExecutor();
48  
49  	private FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator();
50  
51  	/**
52  	 * @param name
53  	 */
54  	public SplitState(Collection<Flow> flows, String name) {
55  		super(name);
56  		this.flows = flows;
57  	}
58  
59  	/**
60  	 * Public setter for the taskExecutor.
61  	 * @param taskExecutor the taskExecutor to set
62  	 */
63  	public void setTaskExecutor(TaskExecutor taskExecutor) {
64  		this.taskExecutor = taskExecutor;
65  	}
66  
67  	/**
68  	 * @return the flows
69  	 */
70  	@Override
71  	public Collection<Flow> getFlows() {
72  		return flows;
73  	}
74  
75  	/**
76  	 * Execute the flows in parallel by passing them to the {@link TaskExecutor}
77  	 * and wait for all of them to finish before proceeding.
78  	 *
79  	 * @see State#handle(FlowExecutor)
80  	 */
81  	@Override
82  	public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception {
83  
84  		// TODO: collect the last StepExecution from the flows as well, so they
85  		// can be abandoned if necessary
86  		Collection<Future<FlowExecution>> tasks = new ArrayList<Future<FlowExecution>>();
87  
88  		for (final Flow flow : flows) {
89  
90  			final FutureTask<FlowExecution> task = new FutureTask<FlowExecution>(new Callable<FlowExecution>() {
91  				@Override
92  				public FlowExecution call() throws Exception {
93  					return flow.start(executor);
94  				}
95  			});
96  
97  			tasks.add(task);
98  
99  			try {
100 				taskExecutor.execute(task);
101 			}
102 			catch (TaskRejectedException e) {
103 				throw new FlowExecutionException("TaskExecutor rejected task for flow=" + flow.getName());
104 			}
105 
106 		}
107 
108 		Collection<FlowExecution> results = new ArrayList<FlowExecution>();
109 
110 		// Could use a CompletionService here?
111 		for (Future<FlowExecution> task : tasks) {
112 			try {
113 				results.add(task.get());
114 			}
115 			catch (ExecutionException e) {
116 				// Unwrap the expected exceptions
117 				Throwable cause = e.getCause();
118 				if (cause instanceof Exception) {
119 					throw (Exception) cause;
120 				} else {
121 					throw e;
122 				}
123 			}
124 		}
125 
126 		return aggregator.aggregate(results);
127 
128 	}
129 
130 	/*
131 	 * (non-Javadoc)
132 	 *
133 	 * @see org.springframework.batch.core.job.flow.State#isEndState()
134 	 */
135 	@Override
136 	public boolean isEndState() {
137 		return false;
138 	}
139 }