1 | /* |
2 | * Copyright 2012-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.job.builder; |
17 | |
18 | import java.util.ArrayList; |
19 | import java.util.Arrays; |
20 | import java.util.Collection; |
21 | import java.util.HashMap; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Map; |
25 | import java.util.Set; |
26 | |
27 | import org.springframework.batch.core.ExitStatus; |
28 | import org.springframework.batch.core.Step; |
29 | import org.springframework.batch.core.job.flow.Flow; |
30 | import org.springframework.batch.core.job.flow.FlowExecutionStatus; |
31 | import org.springframework.batch.core.job.flow.JobExecutionDecider; |
32 | import org.springframework.batch.core.job.flow.State; |
33 | import org.springframework.batch.core.job.flow.support.SimpleFlow; |
34 | import org.springframework.batch.core.job.flow.support.StateTransition; |
35 | import org.springframework.batch.core.job.flow.support.state.DecisionState; |
36 | import org.springframework.batch.core.job.flow.support.state.EndState; |
37 | import org.springframework.batch.core.job.flow.support.state.FlowState; |
38 | import org.springframework.batch.core.job.flow.support.state.SplitState; |
39 | import org.springframework.batch.core.job.flow.support.state.StepState; |
40 | import org.springframework.core.task.TaskExecutor; |
41 | |
42 | /** |
43 | * A builder for a flow of steps that can be executed as a job or as part of a job. Steps can be linked together with |
44 | * conditional transitions that depend on the exit status of the previous step. |
45 | * |
46 | * @author Dave Syer |
47 | * |
48 | * @since 2.2 |
49 | * |
50 | * @param <Q> the type of object returned by the builder (by default a Flow) |
51 | * |
52 | */ |
53 | public class FlowBuilder<Q> { |
54 | |
55 | private String name; |
56 | |
57 | private String prefix; |
58 | |
59 | private List<StateTransition> transitions = new ArrayList<StateTransition>(); |
60 | |
61 | private Map<String, State> tos = new HashMap<String, State>(); |
62 | |
63 | private State currentState; |
64 | |
65 | private EndState failedState; |
66 | |
67 | private EndState completedState; |
68 | |
69 | private EndState stoppedState; |
70 | |
71 | private int decisionCounter = 0; |
72 | |
73 | private int splitCounter = 0; |
74 | |
75 | private int endCounter = 0; |
76 | |
77 | private Map<Object, State> states = new HashMap<Object, State>(); |
78 | |
79 | private SimpleFlow flow; |
80 | |
81 | private boolean dirty = true; |
82 | |
83 | public FlowBuilder(String name) { |
84 | this.name = name; |
85 | this.prefix = name + "."; |
86 | this.failedState = new EndState(FlowExecutionStatus.FAILED, prefix + "FAILED"); |
87 | this.completedState = new EndState(FlowExecutionStatus.COMPLETED, prefix + "COMPLETED"); |
88 | this.stoppedState = new EndState(FlowExecutionStatus.STOPPED, prefix + "STOPPED"); |
89 | } |
90 | |
91 | /** |
92 | * Validate the current state of the builder and build a flow. Subclasses may override this to build an object of a |
93 | * different type that itself depends on the flow. |
94 | * |
95 | * @return a flow |
96 | */ |
97 | public Q build() { |
98 | @SuppressWarnings("unchecked") |
99 | Q result = (Q) flow(); |
100 | return result; |
101 | } |
102 | |
103 | /** |
104 | * Transition to the next step on successful completion of the current step. All other outcomes are treated as |
105 | * failures. |
106 | * |
107 | * @param step the next step |
108 | * @return this to enable chaining |
109 | */ |
110 | public FlowBuilder<Q> next(Step step) { |
111 | doNext(step); |
112 | return this; |
113 | } |
114 | |
115 | /** |
116 | * Start a flow. If some steps are already registered, just a synonym for {@link #from(Step)}. |
117 | * |
118 | * @param step the step to start with |
119 | * @return this to enable chaining |
120 | */ |
121 | public FlowBuilder<Q> start(Step step) { |
122 | doStart(step); |
123 | return this; |
124 | } |
125 | |
126 | /** |
127 | * Go back to a previously registered step and start a new path. If no steps are registered yet just a synonym for |
128 | * {@link #start(Step)}. |
129 | * |
130 | * @param step the step to start from (already registered) |
131 | * @return this to enable chaining |
132 | */ |
133 | public FlowBuilder<Q> from(Step step) { |
134 | doFrom(step); |
135 | return this; |
136 | } |
137 | |
138 | /** |
139 | * Transition to the decider on successful completion of the current step. All other outcomes are treated as |
140 | * failures. |
141 | * |
142 | * @param decider the JobExecutionDecider to determine the next step to execute |
143 | * @return this to enable chaining |
144 | */ |
145 | public UnterminatedFlowBuilder<Q> next(JobExecutionDecider decider) { |
146 | doNext(decider); |
147 | return new UnterminatedFlowBuilder<Q>(this); |
148 | } |
149 | |
150 | /** |
151 | * If a flow should start with a decision use this as the first state. |
152 | * |
153 | * @param decider the to start from |
154 | * @return a builder to enable chaining |
155 | */ |
156 | public UnterminatedFlowBuilder<Q> start(JobExecutionDecider decider) { |
157 | doStart(decider); |
158 | return new UnterminatedFlowBuilder<Q>(this); |
159 | } |
160 | |
161 | /** |
162 | * Start again from a decision that was already registered. |
163 | * |
164 | * @param decider the decider to start from (already registered) |
165 | * @return a builder to enable chaining |
166 | */ |
167 | public UnterminatedFlowBuilder<Q> from(JobExecutionDecider decider) { |
168 | doFrom(decider); |
169 | return new UnterminatedFlowBuilder<Q>(this); |
170 | } |
171 | |
172 | /** |
173 | * Go next on successful completion to a subflow. |
174 | * |
175 | * @param flow the flow to go to |
176 | * @return a builder to enable chaining |
177 | */ |
178 | public FlowBuilder<Q> next(Flow flow) { |
179 | doNext(flow); |
180 | return this; |
181 | } |
182 | |
183 | /** |
184 | * Start again from a subflow that was already registered. |
185 | * |
186 | * @param flow the flow to start from (already registered) |
187 | * @return a builder to enable chaining |
188 | */ |
189 | public FlowBuilder<Q> from(Flow flow) { |
190 | doFrom(flow); |
191 | return this; |
192 | } |
193 | |
194 | /** |
195 | * If a flow should start with a subflow use this as the first state. |
196 | * |
197 | * @param flow the flow to start from |
198 | * @return a builder to enable chaining |
199 | */ |
200 | public FlowBuilder<Q> start(Flow flow) { |
201 | doStart(flow); |
202 | return this; |
203 | } |
204 | |
205 | /** |
206 | * @param executor a task executor to execute the split flows |
207 | * @return a builder to enable fluent chaining |
208 | */ |
209 | public SplitBuilder<Q> split(TaskExecutor executor) { |
210 | return new SplitBuilder<Q>(this, executor); |
211 | } |
212 | |
213 | /** |
214 | * Start a transition to a new state if the exit status from the previous state matches the pattern given. |
215 | * Successful completion normally results in an exit status equal to (or starting with by convention) "COMPLETED". |
216 | * See {@link ExitStatus} for commonly used values. |
217 | * |
218 | * @param pattern the pattern of exit status on which to take this transition |
219 | * @return a builder to enable fluent chaining |
220 | */ |
221 | public TransitionBuilder<Q> on(String pattern) { |
222 | return new TransitionBuilder<Q>(this, pattern); |
223 | } |
224 | |
225 | /** |
226 | * A synonym for {@link #build()} which callers might find useful. Subclasses can override build to create an object |
227 | * of the desired type (e.g. a parent builder or an actual flow). |
228 | * |
229 | * @return the result of the builder |
230 | */ |
231 | public final Q end() { |
232 | return build(); |
233 | } |
234 | |
235 | protected Flow flow() { |
236 | if (!dirty) { |
237 | // optimization in case this method is called consecutively |
238 | return flow; |
239 | } |
240 | flow = new SimpleFlow(name); |
241 | // optimization for flows that only have one state that itself is a flow: |
242 | if (currentState instanceof FlowState && states.size() == 1) { |
243 | return ((FlowState) currentState).getFlows().iterator().next(); |
244 | } |
245 | addDanglingEndStates(); |
246 | flow.setStateTransitions(transitions); |
247 | dirty = false; |
248 | return flow; |
249 | } |
250 | |
251 | private void doNext(Object input) { |
252 | if (this.currentState == null) { |
253 | doStart(input); |
254 | } |
255 | State next = createState(input); |
256 | addTransition("COMPLETED", next); |
257 | addTransition("*", failedState); |
258 | this.currentState = next; |
259 | } |
260 | |
261 | private void doStart(Object input) { |
262 | if (this.currentState != null) { |
263 | doFrom(input); |
264 | } |
265 | this.currentState = createState(input); |
266 | } |
267 | |
268 | private void doFrom(Object input) { |
269 | if (currentState == null) { |
270 | doStart(input); |
271 | } |
272 | State state = createState(input); |
273 | tos.put(currentState.getName(), currentState); |
274 | this.currentState = state; |
275 | } |
276 | |
277 | private State createState(Object input) { |
278 | State result; |
279 | if (input instanceof Step) { |
280 | if (!states.containsKey(input)) { |
281 | Step step = (Step) input; |
282 | states.put(input, new StepState(prefix + step.getName(), step)); |
283 | } |
284 | result = states.get(input); |
285 | } |
286 | else if (input instanceof JobExecutionDecider) { |
287 | if (!states.containsKey(input)) { |
288 | states.put(input, new DecisionState((JobExecutionDecider) input, prefix + "decision" |
289 | + (decisionCounter++))); |
290 | } |
291 | result = states.get(input); |
292 | } |
293 | else if (input instanceof Flow) { |
294 | if (!states.containsKey(input)) { |
295 | states.put(input, new FlowState((Flow) input, prefix + ((Flow) input).getName())); |
296 | } |
297 | result = states.get(input); |
298 | } |
299 | else { |
300 | throw new FlowBuilderException("No state can be created for: " + input); |
301 | } |
302 | dirty = true; |
303 | return result; |
304 | } |
305 | |
306 | private SplitState createState(Collection<Flow> flows, TaskExecutor executor) { |
307 | if (!states.containsKey(flows)) { |
308 | states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++))); |
309 | } |
310 | SplitState result = (SplitState) states.get(flows); |
311 | if (executor != null) { |
312 | result.setTaskExecutor(executor); |
313 | } |
314 | dirty = true; |
315 | return result; |
316 | } |
317 | |
318 | private void addDanglingEndStates() { |
319 | Set<String> froms = new HashSet<String>(); |
320 | for (StateTransition transition : transitions) { |
321 | froms.add(transition.getState().getName()); |
322 | } |
323 | if (tos.isEmpty() && currentState != null) { |
324 | tos.put(currentState.getName(), currentState); |
325 | } |
326 | Map<String, State> copy = new HashMap<String, State>(tos); |
327 | // Find all the states that are really end states but not explicitly declared as such |
328 | for (String to : copy.keySet()) { |
329 | if (!froms.contains(to)) { |
330 | currentState = copy.get(to); |
331 | if (!currentState.isEndState()) { |
332 | addTransition("COMPLETED", completedState); |
333 | addTransition("*", failedState); |
334 | } |
335 | } |
336 | } |
337 | copy = new HashMap<String, State>(tos); |
338 | // Then find the states that do not have a default transition |
339 | for (String from : copy.keySet()) { |
340 | currentState = copy.get(from); |
341 | if (!currentState.isEndState()) { |
342 | if (!hasFail(from)) { |
343 | addTransition("*", failedState); |
344 | } |
345 | if (!hasCompleted(from)) { |
346 | addTransition("*", completedState); |
347 | } |
348 | } |
349 | } |
350 | } |
351 | |
352 | private boolean hasFail(String from) { |
353 | return matches(from, "FAILED"); |
354 | } |
355 | |
356 | private boolean hasCompleted(String from) { |
357 | return matches(from, "COMPLETED"); |
358 | } |
359 | |
360 | private boolean matches(String from, String status) { |
361 | for (StateTransition transition : transitions) { |
362 | if (from.equals(transition.getState().getName()) && transition.matches(status)) { |
363 | return true; |
364 | } |
365 | } |
366 | return false; |
367 | } |
368 | |
369 | private void addTransition(String pattern, State next) { |
370 | tos.put(next.getName(), next); |
371 | transitions.add(StateTransition.createStateTransition(currentState, pattern, next.getName())); |
372 | if (transitions.size() == 1) { |
373 | transitions.add(StateTransition.createEndStateTransition(failedState)); |
374 | transitions.add(StateTransition.createEndStateTransition(completedState)); |
375 | transitions.add(StateTransition.createEndStateTransition(stoppedState)); |
376 | } |
377 | if (next.isEndState()) { |
378 | transitions.add(StateTransition.createEndStateTransition(next)); |
379 | } |
380 | dirty = true; |
381 | } |
382 | |
383 | private void stop(String pattern) { |
384 | addTransition(pattern, stoppedState); |
385 | } |
386 | |
387 | private void stop(String pattern, State restart) { |
388 | EndState next = new EndState(FlowExecutionStatus.STOPPED, "STOPPED", prefix + "stop" + (endCounter++), true); |
389 | addTransition(pattern, next); |
390 | currentState = next; |
391 | addTransition("*", restart); |
392 | } |
393 | |
394 | private void end(String pattern) { |
395 | addTransition(pattern, completedState); |
396 | } |
397 | |
398 | private void end(String pattern, String code) { |
399 | addTransition(pattern, new EndState(FlowExecutionStatus.COMPLETED, code, prefix + "end" + (endCounter++))); |
400 | } |
401 | |
402 | private void fail(String pattern) { |
403 | addTransition(pattern, failedState); |
404 | } |
405 | |
406 | /** |
407 | * A builder for continuing a flow from a decision state. |
408 | * |
409 | * @author Dave Syer |
410 | * |
411 | * @param <Q> the result of the builder's build() |
412 | */ |
413 | public static class UnterminatedFlowBuilder<Q> { |
414 | |
415 | private final FlowBuilder<Q> parent; |
416 | |
417 | public UnterminatedFlowBuilder(FlowBuilder<Q> parent) { |
418 | this.parent = parent; |
419 | } |
420 | |
421 | /** |
422 | * Start a transition to a new state if the exit status from the previous state matches the pattern given. |
423 | * Successful completion normally results in an exit status equal to (or starting with by convention) |
424 | * "COMPLETED". See {@link ExitStatus} for commonly used values. |
425 | * |
426 | * @param pattern the pattern of exit status on which to take this transition |
427 | * @return a TransitionBuilder |
428 | */ |
429 | public TransitionBuilder<Q> on(String pattern) { |
430 | return new TransitionBuilder<Q>(parent, pattern); |
431 | } |
432 | |
433 | } |
434 | |
435 | /** |
436 | * A builder for transitions within a flow. |
437 | * |
438 | * @author Dave Syer |
439 | * |
440 | * @param <Q> the result of the parent builder's build() |
441 | */ |
442 | public static class TransitionBuilder<Q> { |
443 | |
444 | private final FlowBuilder<Q> parent; |
445 | |
446 | private final String pattern; |
447 | |
448 | public TransitionBuilder(FlowBuilder<Q> parent, String pattern) { |
449 | this.parent = parent; |
450 | this.pattern = pattern; |
451 | } |
452 | |
453 | /** |
454 | * Specify the next step. |
455 | * |
456 | * @param step the next step after this transition |
457 | * @return a FlowBuilder |
458 | */ |
459 | public FlowBuilder<Q> to(Step step) { |
460 | State next = parent.createState(step); |
461 | parent.addTransition(pattern, next); |
462 | parent.currentState = next; |
463 | return parent; |
464 | } |
465 | |
466 | /** |
467 | * Specify the next state as a complete flow. |
468 | * |
469 | * @param flow the next flow after this transition |
470 | * @return a FlowBuilder |
471 | */ |
472 | public FlowBuilder<Q> to(Flow flow) { |
473 | State next = parent.createState(flow); |
474 | parent.addTransition(pattern, next); |
475 | parent.currentState = next; |
476 | return parent; |
477 | } |
478 | |
479 | /** |
480 | * Specify the next state as a decision. |
481 | * |
482 | * @param decider the decider to determine the next step |
483 | * @return a FlowBuilder |
484 | */ |
485 | public FlowBuilder<Q> to(JobExecutionDecider decider) { |
486 | State next = parent.createState(decider); |
487 | parent.addTransition(pattern, next); |
488 | parent.currentState = next; |
489 | return parent; |
490 | } |
491 | |
492 | /** |
493 | * Signal the successful end of the flow. |
494 | * |
495 | * @return a FlowBuilder |
496 | */ |
497 | public FlowBuilder<Q> stop() { |
498 | parent.stop(pattern); |
499 | return parent; |
500 | } |
501 | |
502 | /** |
503 | * Stop the flow and provide a flow to start with if the flow is restarted. |
504 | * |
505 | * @param flow the flow to restart with |
506 | * @return a FlowBuilder |
507 | */ |
508 | public FlowBuilder<Q> stopAndRestart(Flow flow) { |
509 | State next = parent.createState(flow); |
510 | parent.stop(pattern, next); |
511 | return parent; |
512 | } |
513 | |
514 | /** |
515 | * Stop the flow and provide a decider to start with if the flow is restarted. |
516 | * |
517 | * @param decider a decider to restart with |
518 | * @return a FlowBuilder |
519 | */ |
520 | public FlowBuilder<Q> stopAndRestart(JobExecutionDecider decider) { |
521 | State next = parent.createState(decider); |
522 | parent.stop(pattern, next); |
523 | return parent; |
524 | } |
525 | |
526 | /** |
527 | * Stop the flow and provide a step to start with if the flow is restarted. |
528 | * |
529 | * @param restart the step to restart with |
530 | * @return a FlowBuilder |
531 | */ |
532 | public FlowBuilder<Q> stopAndRestart(Step restart) { |
533 | State next = parent.createState(restart); |
534 | parent.stop(pattern, next); |
535 | return parent; |
536 | } |
537 | |
538 | /** |
539 | * Signal the successful end of the flow. |
540 | * |
541 | * @return a FlowBuilder |
542 | */ |
543 | public FlowBuilder<Q> end() { |
544 | parent.end(pattern); |
545 | return parent; |
546 | } |
547 | |
548 | /** |
549 | * Signal the end of the flow with the status provided. |
550 | * |
551 | * @return a FlowBuilder |
552 | */ |
553 | public FlowBuilder<Q> end(String status) { |
554 | parent.end(pattern, status); |
555 | return parent; |
556 | } |
557 | |
558 | /** |
559 | * Signal the end of the flow with an error condition. |
560 | * |
561 | * @return a FlowBuilder |
562 | */ |
563 | public FlowBuilder<Q> fail() { |
564 | parent.fail(pattern); |
565 | return parent; |
566 | } |
567 | } |
568 | |
569 | /** |
570 | * A builder for building a split state. Example (<code>builder</code> is a {@link FlowBuilder}): |
571 | * |
572 | * <pre> |
573 | * Flow splitFlow = builder.start(flow1).split(new SyncTaskExecutor()).add(flow2).build(); |
574 | * </pre> |
575 | * |
576 | * where <code>flow1</code> and <code>flow2</code> will be executed (one after the other because of the task |
577 | * executor that was added). Another example |
578 | * |
579 | * <pre> |
580 | * Flow splitFlow = builder.start(step1).split(new SimpleAsyncTaskExecutor()).add(flow).build(); |
581 | * </pre> |
582 | * |
583 | * In this example, a flow consisting of <code>step1</code> will be executed in parallel with <code>flow</code>. |
584 | * |
585 | * @author Dave Syer |
586 | * |
587 | * @param <Q> the result of the parent builder's build() |
588 | */ |
589 | public static class SplitBuilder<Q> { |
590 | |
591 | private final FlowBuilder<Q> parent; |
592 | |
593 | private TaskExecutor executor; |
594 | |
595 | /** |
596 | * @param parent the parent builder |
597 | * @param executor the task executor to use in the split |
598 | */ |
599 | public SplitBuilder(FlowBuilder<Q> parent, TaskExecutor executor) { |
600 | this.parent = parent; |
601 | this.executor = executor; |
602 | } |
603 | |
604 | /** |
605 | * Add flows to the split, in addition to the current state already present in the parent builder. |
606 | * |
607 | * @param flows more flows to add to the split |
608 | * @return the parent builder |
609 | */ |
610 | public FlowBuilder<Q> add(Flow... flows) { |
611 | Collection<Flow> list = new ArrayList<Flow>(Arrays.asList(flows)); |
612 | String name = "split" + (parent.splitCounter++); |
613 | int counter = 0; |
614 | State one = parent.currentState; |
615 | Flow flow = null; |
616 | if (!(one instanceof FlowState)) { |
617 | FlowBuilder<Flow> stateBuilder = new FlowBuilder<Flow>(name + "_" + (counter++)); |
618 | stateBuilder.currentState = one; |
619 | flow = stateBuilder.build(); |
620 | } |
621 | if (flow != null) { |
622 | list.add(flow); |
623 | } |
624 | State next = parent.createState(list, executor); |
625 | parent.currentState = next; |
626 | return parent; |
627 | } |
628 | |
629 | } |
630 | |
631 | } |