1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.job.flow.support.state; |
17 | |
18 | import java.util.ArrayList; |
19 | import java.util.Collection; |
20 | import java.util.concurrent.Callable; |
21 | import java.util.concurrent.ExecutionException; |
22 | import java.util.concurrent.Future; |
23 | import java.util.concurrent.FutureTask; |
24 | |
25 | import org.springframework.batch.core.job.flow.Flow; |
26 | import org.springframework.batch.core.job.flow.FlowExecution; |
27 | import org.springframework.batch.core.job.flow.FlowExecutionException; |
28 | import org.springframework.batch.core.job.flow.FlowExecutionStatus; |
29 | import org.springframework.batch.core.job.flow.FlowExecutor; |
30 | import org.springframework.batch.core.job.flow.State; |
31 | import org.springframework.core.task.SyncTaskExecutor; |
32 | import org.springframework.core.task.TaskExecutor; |
33 | import org.springframework.core.task.TaskRejectedException; |
34 | |
35 | /** |
36 | * A {@link State} implementation that splits a {@link Flow} into multiple |
37 | * parallel subflows. |
38 | * |
39 | * @author Dave Syer |
40 | * @since 2.0 |
41 | */ |
42 | public class SplitState extends AbstractState { |
43 | |
44 | private final Collection<Flow> flows; |
45 | |
46 | private TaskExecutor taskExecutor = new SyncTaskExecutor(); |
47 | |
48 | private FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator(); |
49 | |
50 | /** |
51 | * @param name |
52 | */ |
53 | public SplitState(Collection<Flow> flows, String name) { |
54 | super(name); |
55 | this.flows = flows; |
56 | } |
57 | |
58 | /** |
59 | * Public setter for the taskExecutor. |
60 | * @param taskExecutor the taskExecutor to set |
61 | */ |
62 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
63 | this.taskExecutor = taskExecutor; |
64 | } |
65 | |
66 | /** |
67 | * Execute the flows in parallel by passing them to the {@link TaskExecutor} |
68 | * and wait for all of them to finish before proceeding. |
69 | * |
70 | * @see State#handle(FlowExecutor) |
71 | */ |
72 | @Override |
73 | public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception { |
74 | |
75 | // TODO: collect the last StepExecution from the flows as well, so they |
76 | // can be abandoned if necessary |
77 | Collection<Future<FlowExecution>> tasks = new ArrayList<Future<FlowExecution>>(); |
78 | |
79 | for (final Flow flow : flows) { |
80 | |
81 | final FutureTask<FlowExecution> task = new FutureTask<FlowExecution>(new Callable<FlowExecution>() { |
82 | public FlowExecution call() throws Exception { |
83 | return flow.start(executor); |
84 | } |
85 | }); |
86 | |
87 | tasks.add(task); |
88 | |
89 | try { |
90 | taskExecutor.execute(task); |
91 | } |
92 | catch (TaskRejectedException e) { |
93 | throw new FlowExecutionException("TaskExecutor rejected task for flow=" + flow.getName()); |
94 | } |
95 | |
96 | } |
97 | |
98 | Collection<FlowExecution> results = new ArrayList<FlowExecution>(); |
99 | |
100 | // Could use a CompletionService here? |
101 | for (Future<FlowExecution> task : tasks) { |
102 | try { |
103 | results.add(task.get()); |
104 | } |
105 | catch (ExecutionException e) { |
106 | // Unwrap the expected exceptions |
107 | Throwable cause = e.getCause(); |
108 | if (cause instanceof Exception) { |
109 | throw (Exception) cause; |
110 | } else { |
111 | throw e; |
112 | } |
113 | } |
114 | } |
115 | |
116 | return aggregator.aggregate(results); |
117 | |
118 | } |
119 | |
120 | /* |
121 | * (non-Javadoc) |
122 | * |
123 | * @see org.springframework.batch.core.job.flow.State#isEndState() |
124 | */ |
125 | public boolean isEndState() { |
126 | return false; |
127 | } |
128 | } |