30. Tasks

Tasks is a sample demonstrating a parallel task handling within a regions and additionally adds an error handling to either automatically or manually fixing task problems before continuing back to a state where tasks can be run again.

statechart5

On a high level what happens in this state machine is:

States. 

public enum States {
    READY,
    FORK, JOIN, CHOICE,
    TASKS, T1, T1E, T2, T2E, T3, T3E,
    ERROR, AUTOMATIC, MANUAL
}

Events. 

public enum Events {
    RUN, FALLBACK, CONTINUE, FIX;
}

Configuration - states. 

@Override
public void configure(StateMachineStateConfigurer<States, Events> states)
        throws Exception {
    states
        .withStates()
            .initial(States.READY)
            .fork(States.FORK)
            .state(States.TASKS)
            .join(States.JOIN)
            .choice(States.CHOICE)
            .state(States.ERROR)
            .and()
            .withStates()
                .parent(States.TASKS)
                .initial(States.T1)
                .end(States.T1E)
                .and()
            .withStates()
                .parent(States.TASKS)
                .initial(States.T2)
                .end(States.T2E)
                .and()
            .withStates()
                .parent(States.TASKS)
                .initial(States.T3)
                .end(States.T3E)
                .and()
            .withStates()
                .parent(States.ERROR)
                .initial(States.AUTOMATIC)
                .state(States.AUTOMATIC, automaticAction(), null)
                .state(States.MANUAL);
}

Configuration - transitions. 

@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
        throws Exception {
    transitions
        .withExternal()
            .source(States.READY).target(States.FORK)
            .event(Events.RUN)
            .and()
        .withFork()
            .source(States.FORK).target(States.TASKS)
            .and()
        .withExternal()
            .source(States.T1).target(States.T1E)
            .and()
        .withExternal()
            .source(States.T2).target(States.T2E)
            .and()
        .withExternal()
            .source(States.T3).target(States.T3E)
            .and()
        .withJoin()
            .source(States.TASKS).target(States.JOIN)
            .and()
        .withExternal()
            .source(States.JOIN).target(States.CHOICE)
            .and()
        .withChoice()
            .source(States.CHOICE)
            .first(States.ERROR, tasksChoiceGuard())
            .last(States.READY)
            .and()
        .withExternal()
            .source(States.ERROR).target(States.READY)
            .event(Events.CONTINUE)
            .and()
        .withExternal()
            .source(States.AUTOMATIC).target(States.MANUAL)
            .event(Events.FALLBACK)
            .and()
        .withInternal()
            .source(States.MANUAL)
            .action(fixAction())
            .event(Events.FIX);
}

Guard below is guarding choice entry into a ERROR state and needs to return TRUE if error has happened. For this guard simply checks that all extended state variables(T1, T2 and T3) are TRUE.

@Bean
public Guard<States, Events> tasksChoiceGuard() {
    return new Guard<States, Events>() {

        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            Map<Object, Object> variables = context.getExtendedState().getVariables();
            return !(ObjectUtils.nullSafeEquals(variables.get("T1"), true)
                    && ObjectUtils.nullSafeEquals(variables.get("T2"), true)
                    && ObjectUtils.nullSafeEquals(variables.get("T3"), true));
        }
    };
}

Actions below will simply send event to a state machine to request next step which would be either fallback or continue back to ready.

@Bean
public Action<States, Events> automaticAction() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            Map<Object, Object> variables = context.getExtendedState().getVariables();
            if (ObjectUtils.nullSafeEquals(variables.get("T1"), true)
                    && ObjectUtils.nullSafeEquals(variables.get("T2"), true)
                    && ObjectUtils.nullSafeEquals(variables.get("T3"), true)) {
                context.getStateMachine().sendEvent(Events.CONTINUE);
            } else {
                context.getStateMachine().sendEvent(Events.FALLBACK);
            }
        }
    };
}

@Bean
public Action<States, Events> fixAction() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            Map<Object, Object> variables = context.getExtendedState().getVariables();
            variables.put("T1", true);
            variables.put("T2", true);
            variables.put("T3", true);
            context.getStateMachine().sendEvent(Events.CONTINUE);
        }
    };
}

Currently default region execution is synchronous but it can be changed to asynchronous by changing TaskExecutor. Task will simulate work by sleeping 2 seconds so you’ll able to see how actions in regions are executed parallel.

@Bean(name = StateMachineSystemConstants.TASK_EXECUTOR_BEAN_NAME)
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    return taskExecutor;
}

Let’s see an examples how this state machine actually works.

sm>sm start
State machine started
Entry state READY

sm>tasks run
Entry state TASKS
run task on T3
run task on T2
run task on T1
run task on T2 done
run task on T1 done
run task on T3 done
Entry state T2
Entry state T3
Entry state T1
Entry state T1E
Entry state T2E
Entry state T3E
Exit state TASKS
Entry state JOIN
Exit state JOIN
Entry state READY

In above we can execute tasks multiple times.

sm>tasks list
Tasks {T1=true, T3=true, T2=true}

sm>tasks fail T1

sm>tasks list
Tasks {T1=false, T3=true, T2=true}

sm>tasks run
Entry state TASKS
run task on T1
run task on T3
run task on T2
run task on T1 done
run task on T3 done
run task on T2 done
Entry state T1
Entry state T3
Entry state T2
Entry state T1E
Entry state T2E
Entry state T3E
Exit state TASKS
Entry state JOIN
Exit state JOIN
Entry state ERROR
Entry state AUTOMATIC
Exit state AUTOMATIC
Exit state ERROR
Entry state READY

In above, if we simulate failure for task T1, it is fixed automatically.

sm>tasks list
Tasks {T1=true, T3=true, T2=true}

sm>tasks fail T2

sm>tasks run
Entry state TASKS
run task on T2
run task on T1
run task on T3
run task on T2 done
run task on T1 done
run task on T3 done
Entry state T2
Entry state T1
Entry state T3
Entry state T1E
Entry state T2E
Entry state T3E
Exit state TASKS
Entry state JOIN
Exit state JOIN
Entry state ERROR
Entry state AUTOMATIC
Exit state AUTOMATIC
Entry state MANUAL

sm>tasks fix
Exit state MANUAL
Exit state ERROR
Entry state READY

In above if we simulate failure for either task T2 or T3, state machine goes to MANUAL state where problem needs to be fixed manually before we’re able to go back to READY state.