View Javadoc

1   /*
2    * Copyright 2012-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.job.builder;
17  
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.Collection;
21  import java.util.HashMap;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  
27  import org.springframework.batch.core.ExitStatus;
28  import org.springframework.batch.core.Step;
29  import org.springframework.batch.core.job.flow.Flow;
30  import org.springframework.batch.core.job.flow.FlowExecutionStatus;
31  import org.springframework.batch.core.job.flow.JobExecutionDecider;
32  import org.springframework.batch.core.job.flow.State;
33  import org.springframework.batch.core.job.flow.support.SimpleFlow;
34  import org.springframework.batch.core.job.flow.support.StateTransition;
35  import org.springframework.batch.core.job.flow.support.state.DecisionState;
36  import org.springframework.batch.core.job.flow.support.state.EndState;
37  import org.springframework.batch.core.job.flow.support.state.FlowState;
38  import org.springframework.batch.core.job.flow.support.state.SplitState;
39  import org.springframework.batch.core.job.flow.support.state.StepState;
40  import org.springframework.core.task.TaskExecutor;
41  
42  /**
43   * A builder for a flow of steps that can be executed as a job or as part of a job. Steps can be linked together with
44   * conditional transitions that depend on the exit status of the previous step.
45   *
46   * @author Dave Syer
47   *
48   * @since 2.2
49   *
50   * @param <Q> the type of object returned by the builder (by default a Flow)
51   *
52   */
53  public class FlowBuilder<Q> {
54  
55  	private String name;
56  
57  	private String prefix;
58  
59  	private List<StateTransition> transitions = new ArrayList<StateTransition>();
60  
61  	private Map<String, State> tos = new HashMap<String, State>();
62  
63  	private State currentState;
64  
65  	private EndState failedState;
66  
67  	private EndState completedState;
68  
69  	private EndState stoppedState;
70  
71  	private int decisionCounter = 0;
72  
73  	private int splitCounter = 0;
74  
75  	private int endCounter = 0;
76  
77  	private Map<Object, State> states = new HashMap<Object, State>();
78  
79  	private SimpleFlow flow;
80  
81  	private boolean dirty = true;
82  
83  	public FlowBuilder(String name) {
84  		this.name = name;
85  		this.prefix = name + ".";
86  		this.failedState = new EndState(FlowExecutionStatus.FAILED, prefix + "FAILED");
87  		this.completedState = new EndState(FlowExecutionStatus.COMPLETED, prefix + "COMPLETED");
88  		this.stoppedState = new EndState(FlowExecutionStatus.STOPPED, prefix + "STOPPED");
89  	}
90  
91  	/**
92  	 * Validate the current state of the builder and build a flow. Subclasses may override this to build an object of a
93  	 * different type that itself depends on the flow.
94  	 *
95  	 * @return a flow
96  	 */
97  	public Q build() {
98  		@SuppressWarnings("unchecked")
99  		Q result = (Q) flow();
100 		return result;
101 	}
102 
103 	/**
104 	 * Transition to the next step on successful completion of the current step. All other outcomes are treated as
105 	 * failures.
106 	 *
107 	 * @param step the next step
108 	 * @return this to enable chaining
109 	 */
110 	public FlowBuilder<Q> next(Step step) {
111 		doNext(step);
112 		return this;
113 	}
114 
115 	/**
116 	 * Start a flow. If some steps are already registered, just a synonym for {@link #from(Step)}.
117 	 *
118 	 * @param step the step to start with
119 	 * @return this to enable chaining
120 	 */
121 	public FlowBuilder<Q> start(Step step) {
122 		doStart(step);
123 		return this;
124 	}
125 
126 	/**
127 	 * Go back to a previously registered step and start a new path. If no steps are registered yet just a synonym for
128 	 * {@link #start(Step)}.
129 	 *
130 	 * @param step the step to start from (already registered)
131 	 * @return this to enable chaining
132 	 */
133 	public FlowBuilder<Q> from(Step step) {
134 		doFrom(step);
135 		return this;
136 	}
137 
138 	/**
139 	 * Transition to the decider on successful completion of the current step. All other outcomes are treated as
140 	 * failures.
141 	 *
142 	 * @param decider the JobExecutionDecider to determine the next step to execute
143 	 * @return this to enable chaining
144 	 */
145 	public UnterminatedFlowBuilder<Q> next(JobExecutionDecider decider) {
146 		doNext(decider);
147 		return new UnterminatedFlowBuilder<Q>(this);
148 	}
149 
150 	/**
151 	 * If a flow should start with a decision use this as the first state.
152 	 *
153 	 * @param decider the to start from
154 	 * @return a builder to enable chaining
155 	 */
156 	public UnterminatedFlowBuilder<Q> start(JobExecutionDecider decider) {
157 		doStart(decider);
158 		return new UnterminatedFlowBuilder<Q>(this);
159 	}
160 
161 	/**
162 	 * Start again from a decision that was already registered.
163 	 *
164 	 * @param decider the decider to start from (already registered)
165 	 * @return a builder to enable chaining
166 	 */
167 	public UnterminatedFlowBuilder<Q> from(JobExecutionDecider decider) {
168 		doFrom(decider);
169 		return new UnterminatedFlowBuilder<Q>(this);
170 	}
171 
172 	/**
173 	 * Go next on successful completion to a subflow.
174 	 *
175 	 * @param flow the flow to go to
176 	 * @return a builder to enable chaining
177 	 */
178 	public FlowBuilder<Q> next(Flow flow) {
179 		doNext(flow);
180 		return this;
181 	}
182 
183 	/**
184 	 * Start again from a subflow that was already registered.
185 	 *
186 	 * @param flow the flow to start from (already registered)
187 	 * @return a builder to enable chaining
188 	 */
189 	public FlowBuilder<Q> from(Flow flow) {
190 		doFrom(flow);
191 		return this;
192 	}
193 
194 	/**
195 	 * If a flow should start with a subflow use this as the first state.
196 	 *
197 	 * @param flow the flow to start from
198 	 * @return a builder to enable chaining
199 	 */
200 	public FlowBuilder<Q> start(Flow flow) {
201 		doStart(flow);
202 		return this;
203 	}
204 
205 	/**
206 	 * @param executor a task executor to execute the split flows
207 	 * @return a builder to enable fluent chaining
208 	 */
209 	public SplitBuilder<Q> split(TaskExecutor executor) {
210 		return new SplitBuilder<Q>(this, executor);
211 	}
212 
213 	/**
214 	 * Start a transition to a new state if the exit status from the previous state matches the pattern given.
215 	 * Successful completion normally results in an exit status equal to (or starting with by convention) "COMPLETED".
216 	 * See {@link ExitStatus} for commonly used values.
217 	 *
218 	 * @param pattern the pattern of exit status on which to take this transition
219 	 * @return a builder to enable fluent chaining
220 	 */
221 	public TransitionBuilder<Q> on(String pattern) {
222 		return new TransitionBuilder<Q>(this, pattern);
223 	}
224 
225 	/**
226 	 * A synonym for {@link #build()} which callers might find useful. Subclasses can override build to create an object
227 	 * of the desired type (e.g. a parent builder or an actual flow).
228 	 *
229 	 * @return the result of the builder
230 	 */
231 	public final Q end() {
232 		return build();
233 	}
234 
235 	protected Flow flow() {
236 		if (!dirty) {
237 			// optimization in case this method is called consecutively
238 			return flow;
239 		}
240 		flow = new SimpleFlow(name);
241 		// optimization for flows that only have one state that itself is a flow:
242 		if (currentState instanceof FlowState && states.size() == 1) {
243 			return ((FlowState) currentState).getFlows().iterator().next();
244 		}
245 		addDanglingEndStates();
246 		flow.setStateTransitions(transitions);
247 		dirty = false;
248 		return flow;
249 	}
250 
251 	private void doNext(Object input) {
252 		if (this.currentState == null) {
253 			doStart(input);
254 		}
255 		State next = createState(input);
256 		addTransition("COMPLETED", next);
257 		addTransition("*", failedState);
258 		this.currentState = next;
259 	}
260 
261 	private void doStart(Object input) {
262 		if (this.currentState != null) {
263 			doFrom(input);
264 		}
265 		this.currentState = createState(input);
266 	}
267 
268 	private void doFrom(Object input) {
269 		if (currentState == null) {
270 			doStart(input);
271 		}
272 		State state = createState(input);
273 		tos.put(currentState.getName(), currentState);
274 		this.currentState = state;
275 	}
276 
277 	private State createState(Object input) {
278 		State result;
279 		if (input instanceof Step) {
280 			if (!states.containsKey(input)) {
281 				Step step = (Step) input;
282 				states.put(input, new StepState(prefix + step.getName(), step));
283 			}
284 			result = states.get(input);
285 		}
286 		else if (input instanceof JobExecutionDecider) {
287 			if (!states.containsKey(input)) {
288 				states.put(input, new DecisionState((JobExecutionDecider) input, prefix + "decision"
289 						+ (decisionCounter++)));
290 			}
291 			result = states.get(input);
292 		}
293 		else if (input instanceof Flow) {
294 			if (!states.containsKey(input)) {
295 				states.put(input, new FlowState((Flow) input, prefix + ((Flow) input).getName()));
296 			}
297 			result = states.get(input);
298 		}
299 		else {
300 			throw new FlowBuilderException("No state can be created for: " + input);
301 		}
302 		dirty = true;
303 		return result;
304 	}
305 
306 	private SplitState createState(Collection<Flow> flows, TaskExecutor executor) {
307 		if (!states.containsKey(flows)) {
308 			states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++)));
309 		}
310 		SplitState result = (SplitState) states.get(flows);
311 		if (executor != null) {
312 			result.setTaskExecutor(executor);
313 		}
314 		dirty = true;
315 		return result;
316 	}
317 
318 	private void addDanglingEndStates() {
319 		Set<String> froms = new HashSet<String>();
320 		for (StateTransition transition : transitions) {
321 			froms.add(transition.getState().getName());
322 		}
323 		if (tos.isEmpty() && currentState != null) {
324 			tos.put(currentState.getName(), currentState);
325 		}
326 		Map<String, State> copy = new HashMap<String, State>(tos);
327 		// Find all the states that are really end states but not explicitly declared as such
328 		for (String to : copy.keySet()) {
329 			if (!froms.contains(to)) {
330 				currentState = copy.get(to);
331 				if (!currentState.isEndState()) {
332 					addTransition("COMPLETED", completedState);
333 					addTransition("*", failedState);
334 				}
335 			}
336 		}
337 		copy = new HashMap<String, State>(tos);
338 		// Then find the states that do not have a default transition
339 		for (String from : copy.keySet()) {
340 			currentState = copy.get(from);
341 			if (!currentState.isEndState()) {
342 				if (!hasFail(from)) {
343 					addTransition("*", failedState);
344 				}
345 				if (!hasCompleted(from)) {
346 					addTransition("*", completedState);
347 				}
348 			}
349 		}
350 	}
351 
352 	private boolean hasFail(String from) {
353 		return matches(from, "FAILED");
354 	}
355 
356 	private boolean hasCompleted(String from) {
357 		return matches(from, "COMPLETED");
358 	}
359 
360 	private boolean matches(String from, String status) {
361 		for (StateTransition transition : transitions) {
362 			if (from.equals(transition.getState().getName()) && transition.matches(status)) {
363 				return true;
364 			}
365 		}
366 		return false;
367 	}
368 
369 	private void addTransition(String pattern, State next) {
370 		tos.put(next.getName(), next);
371 		transitions.add(StateTransition.createStateTransition(currentState, pattern, next.getName()));
372 		if (transitions.size() == 1) {
373 			transitions.add(StateTransition.createEndStateTransition(failedState));
374 			transitions.add(StateTransition.createEndStateTransition(completedState));
375 			transitions.add(StateTransition.createEndStateTransition(stoppedState));
376 		}
377 		if (next.isEndState()) {
378 			transitions.add(StateTransition.createEndStateTransition(next));
379 		}
380 		dirty = true;
381 	}
382 
383 	private void stop(String pattern) {
384 		addTransition(pattern, stoppedState);
385 	}
386 
387 	private void stop(String pattern, State restart) {
388 		EndState next = new EndState(FlowExecutionStatus.STOPPED, "STOPPED", prefix + "stop" + (endCounter++), true);
389 		addTransition(pattern, next);
390 		currentState = next;
391 		addTransition("*", restart);
392 	}
393 
394 	private void end(String pattern) {
395 		addTransition(pattern, completedState);
396 	}
397 
398 	private void end(String pattern, String code) {
399 		addTransition(pattern, new EndState(FlowExecutionStatus.COMPLETED, code, prefix + "end" + (endCounter++)));
400 	}
401 
402 	private void fail(String pattern) {
403 		addTransition(pattern, failedState);
404 	}
405 
406 	/**
407 	 * A builder for continuing a flow from a decision state.
408 	 *
409 	 * @author Dave Syer
410 	 *
411 	 * @param <Q> the result of the builder's build()
412 	 */
413 	public static class UnterminatedFlowBuilder<Q> {
414 
415 		private final FlowBuilder<Q> parent;
416 
417 		public UnterminatedFlowBuilder(FlowBuilder<Q> parent) {
418 			this.parent = parent;
419 		}
420 
421 		/**
422 		 * Start a transition to a new state if the exit status from the previous state matches the pattern given.
423 		 * Successful completion normally results in an exit status equal to (or starting with by convention)
424 		 * "COMPLETED". See {@link ExitStatus} for commonly used values.
425 		 *
426 		 * @param pattern the pattern of exit status on which to take this transition
427 		 * @return a TransitionBuilder
428 		 */
429 		public TransitionBuilder<Q> on(String pattern) {
430 			return new TransitionBuilder<Q>(parent, pattern);
431 		}
432 
433 	}
434 
435 	/**
436 	 * A builder for transitions within a flow.
437 	 *
438 	 * @author Dave Syer
439 	 *
440 	 * @param <Q> the result of the parent builder's build()
441 	 */
442 	public static class TransitionBuilder<Q> {
443 
444 		private final FlowBuilder<Q> parent;
445 
446 		private final String pattern;
447 
448 		public TransitionBuilder(FlowBuilder<Q> parent, String pattern) {
449 			this.parent = parent;
450 			this.pattern = pattern;
451 		}
452 
453 		/**
454 		 * Specify the next step.
455 		 *
456 		 * @param step the next step after this transition
457 		 * @return a FlowBuilder
458 		 */
459 		public FlowBuilder<Q> to(Step step) {
460 			State next = parent.createState(step);
461 			parent.addTransition(pattern, next);
462 			parent.currentState = next;
463 			return parent;
464 		}
465 
466 		/**
467 		 * Specify the next state as a complete flow.
468 		 *
469 		 * @param flow the next flow after this transition
470 		 * @return a FlowBuilder
471 		 */
472 		public FlowBuilder<Q> to(Flow flow) {
473 			State next = parent.createState(flow);
474 			parent.addTransition(pattern, next);
475 			parent.currentState = next;
476 			return parent;
477 		}
478 
479 		/**
480 		 * Specify the next state as a decision.
481 		 *
482 		 * @param decider the decider to determine the next step
483 		 * @return a FlowBuilder
484 		 */
485 		public FlowBuilder<Q> to(JobExecutionDecider decider) {
486 			State next = parent.createState(decider);
487 			parent.addTransition(pattern, next);
488 			parent.currentState = next;
489 			return parent;
490 		}
491 
492 		/**
493 		 * Signal the successful end of the flow.
494 		 *
495 		 * @return a FlowBuilder
496 		 */
497 		public FlowBuilder<Q> stop() {
498 			parent.stop(pattern);
499 			return parent;
500 		}
501 
502 		/**
503 		 * Stop the flow and provide a flow to start with if the flow is restarted.
504 		 *
505 		 * @param flow the flow to restart with
506 		 * @return a FlowBuilder
507 		 */
508 		public FlowBuilder<Q> stopAndRestart(Flow flow) {
509 			State next = parent.createState(flow);
510 			parent.stop(pattern, next);
511 			return parent;
512 		}
513 
514 		/**
515 		 * Stop the flow and provide a decider to start with if the flow is restarted.
516 		 *
517 		 * @param decider a decider to restart with
518 		 * @return a FlowBuilder
519 		 */
520 		public FlowBuilder<Q> stopAndRestart(JobExecutionDecider decider) {
521 			State next = parent.createState(decider);
522 			parent.stop(pattern, next);
523 			return parent;
524 		}
525 
526 		/**
527 		 * Stop the flow and provide a step to start with if the flow is restarted.
528 		 *
529 		 * @param restart the step to restart with
530 		 * @return a FlowBuilder
531 		 */
532 		public FlowBuilder<Q> stopAndRestart(Step restart) {
533 			State next = parent.createState(restart);
534 			parent.stop(pattern, next);
535 			return parent;
536 		}
537 
538 		/**
539 		 * Signal the successful end of the flow.
540 		 *
541 		 * @return a FlowBuilder
542 		 */
543 		public FlowBuilder<Q> end() {
544 			parent.end(pattern);
545 			return parent;
546 		}
547 
548 		/**
549 		 * Signal the end of the flow with the status provided.
550 		 *
551 		 * @return a FlowBuilder
552 		 */
553 		public FlowBuilder<Q> end(String status) {
554 			parent.end(pattern, status);
555 			return parent;
556 		}
557 
558 		/**
559 		 * Signal the end of the flow with an error condition.
560 		 *
561 		 * @return a FlowBuilder
562 		 */
563 		public FlowBuilder<Q> fail() {
564 			parent.fail(pattern);
565 			return parent;
566 		}
567 	}
568 
569 	/**
570 	 * A builder for building a split state. Example (<code>builder</code> is a {@link FlowBuilder}):
571 	 *
572 	 * <pre>
573 	 * Flow splitFlow = builder.start(flow1).split(new SyncTaskExecutor()).add(flow2).build();
574 	 * </pre>
575 	 *
576 	 * where <code>flow1</code> and <code>flow2</code> will be executed (one after the other because of the task
577 	 * executor that was added). Another example
578 	 *
579 	 * <pre>
580 	 * Flow splitFlow = builder.start(step1).split(new SimpleAsyncTaskExecutor()).add(flow).build();
581 	 * </pre>
582 	 *
583 	 * In this example, a flow consisting of <code>step1</code> will be executed in parallel with <code>flow</code>.
584 	 *
585 	 * @author Dave Syer
586 	 *
587 	 * @param <Q> the result of the parent builder's build()
588 	 */
589 	public static class SplitBuilder<Q> {
590 
591 		private final FlowBuilder<Q> parent;
592 
593 		private TaskExecutor executor;
594 
595 		/**
596 		 * @param parent the parent builder
597 		 * @param executor the task executor to use in the split
598 		 */
599 		public SplitBuilder(FlowBuilder<Q> parent, TaskExecutor executor) {
600 			this.parent = parent;
601 			this.executor = executor;
602 		}
603 
604 		/**
605 		 * Add flows to the split, in addition to the current state already present in the parent builder.
606 		 *
607 		 * @param flows more flows to add to the split
608 		 * @return the parent builder
609 		 */
610 		public FlowBuilder<Q> add(Flow... flows) {
611 			Collection<Flow> list = new ArrayList<Flow>(Arrays.asList(flows));
612 			String name = "split" + (parent.splitCounter++);
613 			int counter = 0;
614 			State one = parent.currentState;
615 			Flow flow = null;
616 			if (!(one instanceof FlowState)) {
617 				FlowBuilder<Flow> stateBuilder = new FlowBuilder<Flow>(name + "_" + (counter++));
618 				stateBuilder.currentState = one;
619 				flow = stateBuilder.build();
620 			}
621 			if (flow != null) {
622 				list.add(flow);
623 			}
624 			State next = parent.createState(list, executor);
625 			parent.currentState = next;
626 			return parent;
627 		}
628 
629 	}
630 
631 }