Tasks is a sample demonstrating a parallel task handling within a regions and additionally adds an error handling to either automatically or manually fixing task problems before continuing back to a state where tasks can be run again.
On a high level what happens in this state machine is:
States.
public static enum States { READY, FORK, JOIN, CHOICE, TASKS, T1, T1E, T2, T2E, T3, T3E, ERROR, AUTOMATIC, MANUAL }
Events.
public static enum Events { RUN, FALLBACK, CONTINUE, FIX; }
Configuration - states.
@Override public void configure(StateMachineStateConfigurer<States, Events> states) throws Exception { states .withStates() .initial(States.READY) .fork(States.FORK) .state(States.TASKS) .join(States.JOIN) .choice(States.CHOICE) .state(States.ERROR) .and() .withStates() .parent(States.TASKS) .initial(States.T1) .end(States.T1E) .and() .withStates() .parent(States.TASKS) .initial(States.T2) .end(States.T2E) .and() .withStates() .parent(States.TASKS) .initial(States.T3) .end(States.T3E) .and() .withStates() .parent(States.ERROR) .initial(States.AUTOMATIC) .state(States.AUTOMATIC, automaticAction(), null) .state(States.MANUAL); }
Configuration - transitions.
@Override public void configure(StateMachineTransitionConfigurer<States, Events> transitions) throws Exception { transitions .withExternal() .source(States.READY).target(States.FORK) .event(Events.RUN) .and() .withFork() .source(States.FORK).target(States.TASKS) .and() .withExternal() .source(States.T1).target(States.T1E) .and() .withExternal() .source(States.T2).target(States.T2E) .and() .withExternal() .source(States.T3).target(States.T3E) .and() .withJoin() .source(States.TASKS).target(States.JOIN) .and() .withExternal() .source(States.JOIN).target(States.CHOICE) .and() .withChoice() .source(States.CHOICE) .first(States.ERROR, tasksChoiceGuard()) .last(States.READY) .and() .withExternal() .source(States.ERROR).target(States.READY) .event(Events.CONTINUE) .and() .withExternal() .source(States.AUTOMATIC).target(States.MANUAL) .event(Events.FALLBACK) .and() .withInternal() .source(States.MANUAL) .action(fixAction()) .event(Events.FIX); }
Guard below is guarding choice entry into a ERROR state and needs to return TRUE if error has happened. For this guard simply checks that all extended state variables(T1, T2 and T3) are TRUE.
@Bean public Guard<States, Events> tasksChoiceGuard() { return new Guard<States, Events>() { @Override public boolean evaluate(StateContext<States, Events> context) { Map<Object, Object> variables = context.getExtendedState().getVariables(); return !(ObjectUtils.nullSafeEquals(variables.get("T1"), true) && ObjectUtils.nullSafeEquals(variables.get("T2"), true) && ObjectUtils.nullSafeEquals(variables.get("T3"), true)); } }; }
Actions below will simply send event to a state machine to request next step which would be either fallback or continue back to ready.
@Bean public Action<States, Events> automaticAction() { return new Action<States, Events>() { @Override public void execute(StateContext<States, Events> context) { Map<Object, Object> variables = context.getExtendedState().getVariables(); if (ObjectUtils.nullSafeEquals(variables.get("T1"), true) && ObjectUtils.nullSafeEquals(variables.get("T2"), true) && ObjectUtils.nullSafeEquals(variables.get("T3"), true)) { context.getStateMachine().sendEvent(Events.CONTINUE); } else { context.getStateMachine().sendEvent(Events.FALLBACK); } } }; } @Bean public Action<States, Events> fixAction() { return new Action<States, Events>() { @Override public void execute(StateContext<States, Events> context) { Map<Object, Object> variables = context.getExtendedState().getVariables(); variables.put("T1", true); variables.put("T2", true); variables.put("T3", true); context.getStateMachine().sendEvent(Events.CONTINUE); } }; }
Currently default region execution is synchronous but it can be
changed to asynchronous by changing TaskExecutor
. Task will simulate
work by sleeping 2 seconds so you’ll able to see how actions in
regions are executed parallel.
@Bean(name = StateMachineSystemConstants.TASK_EXECUTOR_BEAN_NAME) public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); return taskExecutor; }
Lets see an examples how this state machine actually works.
sm>sm start State machine started Entry state READY sm>tasks run Entry state TASKS run task on T3 run task on T2 run task on T1 run task on T2 done run task on T1 done run task on T3 done Entry state T2 Entry state T3 Entry state T1 Entry state T1E Entry state T2E Entry state T3E Exit state TASKS Entry state JOIN Exit state JOIN Entry state READY
In above we can execute tasks multiple times.
sm>tasks list Tasks {T1=true, T3=true, T2=true} sm>tasks fail T1 sm>tasks list Tasks {T1=false, T3=true, T2=true} sm>tasks run Entry state TASKS run task on T1 run task on T3 run task on T2 run task on T1 done run task on T3 done run task on T2 done Entry state T1 Entry state T3 Entry state T2 Entry state T1E Entry state T2E Entry state T3E Exit state TASKS Entry state JOIN Exit state JOIN Entry state ERROR Entry state AUTOMATIC Exit state AUTOMATIC Exit state ERROR Entry state READY
In above, if we simulate failure for task T1, it is fixed automatically.
sm>tasks list Tasks {T1=true, T3=true, T2=true} sm>tasks fail T2 sm>tasks run Entry state TASKS run task on T2 run task on T1 run task on T3 run task on T2 done run task on T1 done run task on T3 done Entry state T2 Entry state T1 Entry state T3 Entry state T1E Entry state T2E Entry state T3E Exit state TASKS Entry state JOIN Exit state JOIN Entry state ERROR Entry state AUTOMATIC Exit state AUTOMATIC Entry state MANUAL sm>tasks fix Exit state MANUAL Exit state ERROR Entry state READY
In above if we simulate failure for either task T2 or T3, state machine goes to MANUAL state where problem needs to be fixed manually before we’re able to go back to READY state.