View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.job.flow.support;
17  
18  import java.util.ArrayList;
19  import java.util.Collection;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.SortedSet;
26  import java.util.TreeSet;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.springframework.batch.core.JobExecutionException;
31  import org.springframework.batch.core.Step;
32  import org.springframework.batch.core.job.flow.Flow;
33  import org.springframework.batch.core.job.flow.FlowExecution;
34  import org.springframework.batch.core.job.flow.FlowExecutionException;
35  import org.springframework.batch.core.job.flow.FlowExecutionStatus;
36  import org.springframework.batch.core.job.flow.FlowExecutor;
37  import org.springframework.batch.core.job.flow.State;
38  import org.springframework.beans.factory.InitializingBean;
39  
40  /**
41   * A {@link Flow} that branches conditionally depending on the exit status of
42   * the last {@link State}. The input parameters are the state transitions (in no
43   * particular order). The start state name can be specified explicitly (and must
44   * exist in the set of transitions), or computed from the existing transitions,
45   * if unambiguous.
46   *
47   * @author Dave Syer
48   * @since 2.0
49   */
50  public class SimpleFlow implements Flow, InitializingBean {
51  
52  	private static final Log logger = LogFactory.getLog(SimpleFlow.class);
53  
54  	private State startState;
55  
56  	private Map<String, SortedSet<StateTransition>> transitionMap = new HashMap<String, SortedSet<StateTransition>>();
57  
58  	private Map<String, State> stateMap = new HashMap<String, State>();
59  
60  	private List<StateTransition> stateTransitions = new ArrayList<StateTransition>();
61  
62  	private final String name;
63  
64  	/**
65  	 * Create a flow with the given name.
66  	 *
67  	 * @param name the name of the flow
68  	 */
69  	public SimpleFlow(String name) {
70  		this.name = name;
71  	}
72  
73  	/**
74  	 * Get the name for this flow.
75  	 *
76  	 * @see Flow#getName()
77  	 */
78  	@Override
79  	public String getName() {
80  		return name;
81  	}
82  
83  	/**
84  	 * Public setter for the stateTransitions.
85  	 *
86  	 * @param stateTransitions the stateTransitions to set
87  	 */
88  	public void setStateTransitions(List<StateTransition> stateTransitions) {
89  
90  		this.stateTransitions = stateTransitions;
91  	}
92  
93  	/**
94  	 * {@inheritDoc}
95  	 */
96  	@Override
97  	public State getState(String stateName) {
98  		return stateMap.get(stateName);
99  	}
100 
101 	/**
102 	 * {@inheritDoc}
103 	 */
104 	@Override
105 	public Collection<State> getStates() {
106 		return new HashSet<State>(stateMap.values());
107 	}
108 
109 	/**
110 	 * Locate start state and pre-populate data structures needed for execution.
111 	 *
112 	 * @see InitializingBean#afterPropertiesSet()
113 	 */
114 	@Override
115 	public void afterPropertiesSet() throws Exception {
116 		initializeTransitions();
117 	}
118 
119 	/**
120 	 * @see Flow#start(FlowExecutor)
121 	 */
122 	@Override
123 	public FlowExecution start(FlowExecutor executor) throws FlowExecutionException {
124 		if (startState == null) {
125 			initializeTransitions();
126 		}
127 		State state = startState;
128 		String stateName = state.getName();
129 		return resume(stateName, executor);
130 	}
131 
132 	/**
133 	 * @see Flow#resume(String, FlowExecutor)
134 	 */
135 	@Override
136 	public FlowExecution resume(String stateName, FlowExecutor executor) throws FlowExecutionException {
137 
138 		FlowExecutionStatus status = FlowExecutionStatus.UNKNOWN;
139 		State state = stateMap.get(stateName);
140 
141 		logger.debug("Resuming state="+stateName+" with status="+status);
142 
143 		// Terminate if there are no more states
144 		while (state != null && status!=FlowExecutionStatus.STOPPED) {
145 
146 			stateName = state.getName();
147 
148 			try {
149 				logger.debug("Handling state="+stateName);
150 				status = state.handle(executor);
151 			}
152 			catch (FlowExecutionException e) {
153 				executor.close(new FlowExecution(stateName, status));
154 				throw e;
155 			}
156 			catch (Exception e) {
157 				executor.close(new FlowExecution(stateName, status));
158 				throw new FlowExecutionException(String.format("Ended flow=%s at state=%s with exception", name,
159 						stateName), e);
160 			}
161 
162 			logger.debug("Completed state="+stateName+" with status="+status);
163 
164 			state = nextState(stateName, status);
165 
166 		}
167 
168 		FlowExecution result = new FlowExecution(stateName, status);
169 		executor.close(result);
170 		return result;
171 
172 	}
173 
174 	/**
175 	 * @return the next {@link Step} (or null if this is the end)
176 	 * @throws JobExecutionException
177 	 */
178 	private State nextState(String stateName, FlowExecutionStatus status) throws FlowExecutionException {
179 
180 		Set<StateTransition> set = transitionMap.get(stateName);
181 
182 		if (set == null) {
183 			throw new FlowExecutionException(String.format("No transitions found in flow=%s for state=%s", getName(),
184 					stateName));
185 		}
186 
187 		String next = null;
188 		String exitCode = status.getName();
189 		for (StateTransition stateTransition : set) {
190 			if (stateTransition.matches(exitCode)) {
191 				if (stateTransition.isEnd()) {
192 					// End of job
193 					return null;
194 				}
195 				next = stateTransition.getNext();
196 				break;
197 			}
198 		}
199 
200 		if (next == null) {
201 			throw new FlowExecutionException(String.format(
202 					"Next state not found in flow=%s for state=%s with exit status=%s", getName(), stateName, status.getName()));
203 		}
204 
205 		if (!stateMap.containsKey(next)) {
206 			throw new FlowExecutionException(String.format("Next state not specified in flow=%s for next=%s",
207 					getName(), next));
208 		}
209 
210 		return stateMap.get(next);
211 
212 	}
213 
214 	/**
215 	 * Analyse the transitions provided and generate all the information needed
216 	 * to execute the flow.
217 	 */
218 	private void initializeTransitions() {
219 		startState = null;
220 		transitionMap.clear();
221 		stateMap.clear();
222 		boolean hasEndStep = false;
223 
224 		if (stateTransitions.isEmpty()) {
225 			throw new IllegalArgumentException("No start state was found. You must specify at least one step in a job.");
226 		}
227 
228 		for (StateTransition stateTransition : stateTransitions) {
229 			State state = stateTransition.getState();
230 			String stateName = state.getName();
231 			stateMap.put(stateName, state);
232 		}
233 
234 		for (StateTransition stateTransition : stateTransitions) {
235 
236 			State state = stateTransition.getState();
237 
238 			if (!stateTransition.isEnd()) {
239 
240 				String next = stateTransition.getNext();
241 
242 				if (!stateMap.containsKey(next)) {
243 					throw new IllegalArgumentException("Missing state for [" + stateTransition + "]");
244 				}
245 
246 			}
247 			else {
248 				hasEndStep = true;
249 			}
250 
251 			String name = state.getName();
252 
253 			SortedSet<StateTransition> set = transitionMap.get(name);
254 			if (set == null) {
255 				set = new TreeSet<StateTransition>();
256 				transitionMap.put(name, set);
257 			}
258 			set.add(stateTransition);
259 
260 		}
261 
262 		if (!hasEndStep) {
263 			throw new IllegalArgumentException(
264 					"No end state was found.  You must specify at least one transition with no next state.");
265 		}
266 
267 		startState = stateTransitions.get(0).getState();
268 
269 	}
270 }