View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.tasklet;
17  
18  import java.util.concurrent.Semaphore;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.springframework.batch.core.BatchStatus;
23  import org.springframework.batch.core.ChunkListener;
24  import org.springframework.batch.core.JobInterruptedException;
25  import org.springframework.batch.core.StepContribution;
26  import org.springframework.batch.core.StepExecution;
27  import org.springframework.batch.core.StepExecutionListener;
28  import org.springframework.batch.core.listener.CompositeChunkListener;
29  import org.springframework.batch.core.repository.JobRepository;
30  import org.springframework.batch.core.scope.context.ChunkContext;
31  import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
32  import org.springframework.batch.core.step.AbstractStep;
33  import org.springframework.batch.core.step.FatalStepExecutionException;
34  import org.springframework.batch.core.step.StepInterruptionPolicy;
35  import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
36  import org.springframework.batch.item.ExecutionContext;
37  import org.springframework.batch.item.ItemReader;
38  import org.springframework.batch.item.ItemStream;
39  import org.springframework.batch.item.ItemWriter;
40  import org.springframework.batch.item.support.CompositeItemStream;
41  import org.springframework.batch.repeat.RepeatContext;
42  import org.springframework.batch.repeat.RepeatOperations;
43  import org.springframework.batch.repeat.RepeatStatus;
44  import org.springframework.batch.repeat.support.RepeatTemplate;
45  import org.springframework.transaction.PlatformTransactionManager;
46  import org.springframework.transaction.TransactionStatus;
47  import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48  import org.springframework.transaction.interceptor.TransactionAttribute;
49  import org.springframework.transaction.support.TransactionCallback;
50  import org.springframework.transaction.support.TransactionSynchronization;
51  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
52  import org.springframework.transaction.support.TransactionSynchronizationManager;
53  import org.springframework.transaction.support.TransactionTemplate;
54  import org.springframework.util.Assert;
55  
56  /**
57   * Simple implementation of executing the step as a call to a {@link Tasklet},
58   * possibly repeated, and each call surrounded by a transaction. The structure
59   * is therefore that of a loop with transaction boundary inside the loop. The
60   * loop is controlled by the step operations (
61   * {@link #setStepOperations(RepeatOperations)}).<br/>
62   * <br/>
63   * 
64   * Clients can use interceptors in the step operations to intercept or listen to
65   * the iteration on a step-wide basis, for instance to get a callback when the
66   * step is complete. Those that want callbacks at the level of an individual
67   * tasks, can specify interceptors for the chunk operations.
68   * 
69   * @author Dave Syer
70   * @author Lucas Ward
71   * @author Ben Hale
72   * @author Robert Kasanicky
73   */
74  public class TaskletStep extends AbstractStep {
75  
76  	private static final Log logger = LogFactory.getLog(TaskletStep.class);
77  
78  	private RepeatOperations stepOperations = new RepeatTemplate();
79  
80  	private CompositeChunkListener chunkListener = new CompositeChunkListener();
81  
82  	// default to checking current thread for interruption.
83  	private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
84  
85  	private CompositeItemStream stream = new CompositeItemStream();
86  
87  	private PlatformTransactionManager transactionManager;
88  
89  	private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
90  
91  		@Override
92  		public boolean rollbackOn(Throwable ex) {
93  			return true;
94  		}
95  
96  	};
97  
98  	private Tasklet tasklet;
99  
100 	/**
101 	 * Default constructor.
102 	 */
103 	public TaskletStep() {
104 		this(null);
105 	}
106 
107 	/**
108 	 * @param name
109 	 */
110 	public TaskletStep(String name) {
111 		super(name);
112 	}
113 
114 	/*
115 	 * (non-Javadoc)
116 	 * 
117 	 * @see
118 	 * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
119 	 */
120 	@Override
121 	public void afterPropertiesSet() throws Exception {
122 		super.afterPropertiesSet();
123 		Assert.state(transactionManager != null, "A transaction manager must be provided");
124 	}
125 
126 	/**
127 	 * Public setter for the {@link PlatformTransactionManager}.
128 	 * 
129 	 * @param transactionManager the transaction manager to set
130 	 */
131 	public void setTransactionManager(PlatformTransactionManager transactionManager) {
132 		this.transactionManager = transactionManager;
133 	}
134 
135 	/**
136 	 * Public setter for the {@link TransactionAttribute}.
137 	 * 
138 	 * @param transactionAttribute the {@link TransactionAttribute} to set
139 	 */
140 	public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
141 		this.transactionAttribute = transactionAttribute;
142 	}
143 
144 	/**
145 	 * Public setter for the {@link Tasklet}.
146 	 * 
147 	 * @param tasklet the {@link Tasklet} to set
148 	 */
149 	public void setTasklet(Tasklet tasklet) {
150 		this.tasklet = tasklet;
151 		if (tasklet instanceof StepExecutionListener) {
152 			registerStepExecutionListener((StepExecutionListener) tasklet);
153 		}
154 	}
155 
156 	/**
157 	 * Register a chunk listener for callbacks at the appropriate stages in a
158 	 * step execution.
159 	 * 
160 	 * @param listener a {@link ChunkListener}
161 	 */
162 	public void registerChunkListener(ChunkListener listener) {
163 		this.chunkListener.register(listener);
164 	}
165 
166 	/**
167 	 * Register each of the objects as listeners.
168 	 * 
169 	 * @param listeners an array of listener objects of known types.
170 	 */
171 	public void setChunkListeners(ChunkListener[] listeners) {
172 		for (int i = 0; i < listeners.length; i++) {
173 			registerChunkListener(listeners[i]);
174 		}
175 	}
176 
177 	/**
178 	 * Register each of the streams for callbacks at the appropriate time in the
179 	 * step. The {@link ItemReader} and {@link ItemWriter} are automatically
180 	 * registered, but it doesn't hurt to also register them here. Injected
181 	 * dependencies of the reader and writer are not automatically registered,
182 	 * so if you implement {@link ItemWriter} using delegation to another object
183 	 * which itself is a {@link ItemStream}, you need to register the delegate
184 	 * here.
185 	 * 
186 	 * @param streams an array of {@link ItemStream} objects.
187 	 */
188 	public void setStreams(ItemStream[] streams) {
189 		for (int i = 0; i < streams.length; i++) {
190 			registerStream(streams[i]);
191 		}
192 	}
193 
194 	/**
195 	 * Register a single {@link ItemStream} for callbacks to the stream
196 	 * interface.
197 	 * 
198 	 * @param stream
199 	 */
200 	public void registerStream(ItemStream stream) {
201 		this.stream.register(stream);
202 	}
203 
204 	/**
205 	 * The {@link RepeatOperations} to use for the outer loop of the batch
206 	 * processing. Should be set up by the caller through a factory. Defaults to
207 	 * a plain {@link RepeatTemplate}.
208 	 * 
209 	 * @param stepOperations a {@link RepeatOperations} instance.
210 	 */
211 	public void setStepOperations(RepeatOperations stepOperations) {
212 		this.stepOperations = stepOperations;
213 	}
214 
215 	/**
216 	 * Setter for the {@link StepInterruptionPolicy}. The policy is used to
217 	 * check whether an external request has been made to interrupt the job
218 	 * execution.
219 	 * 
220 	 * @param interruptionPolicy a {@link StepInterruptionPolicy}
221 	 */
222 	public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
223 		this.interruptionPolicy = interruptionPolicy;
224 	}
225 
226 	/**
227 	 * Process the step and update its context so that progress can be monitored
228 	 * by the caller. The step is broken down into chunks, each one executing in
229 	 * a transaction. The step and its execution and execution context are all
230 	 * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
231 	 * used to store the result. Various reporting information are also added to
232 	 * the current context governing the step execution, which would normally be
233 	 * available to the caller through the step's {@link ExecutionContext}.<br/>
234 	 * 
235 	 * @throws JobInterruptedException if the step or a chunk is interrupted
236 	 * @throws RuntimeException if there is an exception during a chunk
237 	 * execution
238 	 * 
239 	 */
240 	@Override
241 	protected void doExecute(StepExecution stepExecution) throws Exception {
242 
243 		stream.update(stepExecution.getExecutionContext());
244 		getJobRepository().updateExecutionContext(stepExecution);
245 
246 		// Shared semaphore per step execution, so other step executions can run
247 		// in parallel without needing the lock
248 		final Semaphore semaphore = createSemaphore();
249 
250 		stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
251 
252 			@Override
253 			public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
254 					throws Exception {
255 
256 				StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
257 
258 				// Before starting a new transaction, check for
259 				// interruption.
260 				interruptionPolicy.checkInterrupted(stepExecution);
261 
262 				RepeatStatus result;
263 				try {
264 					result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute)
265 							.execute(new ChunkTransactionCallback(chunkContext, semaphore));
266 				}
267 				catch (UncheckedTransactionException e) {
268 					// Allow checked exceptions to be thrown inside callback
269 					throw (Exception) e.getCause();
270 				}
271 
272 				chunkListener.afterChunk();
273 
274 				// Check for interruption after transaction as well, so that
275 				// the interrupted exception is correctly propagated up to
276 				// caller
277 				interruptionPolicy.checkInterrupted(stepExecution);
278 
279 				return result;
280 			}
281 
282 		});
283 
284 	}
285 
286 	/**
287 	 * Extension point mainly for test purposes so that the behaviour of the
288 	 * lock can be manipulated to simulate various pathologies.
289 	 * 
290 	 * @return a semaphore for locking access to the JobRepository
291 	 */
292 	protected Semaphore createSemaphore() {
293 		return new Semaphore(1);
294 	}
295 
296 	protected void close(ExecutionContext ctx) throws Exception {
297 		stream.close();
298 	}
299 
300 	protected void open(ExecutionContext ctx) throws Exception {
301 		stream.open(ctx);
302 	}
303 
304 	/**
305 	 * A callback for the transactional work inside a chunk. Also detects
306 	 * failures in the transaction commit and rollback, only panicking if the
307 	 * transaction status is unknown (i.e. if a commit failure leads to a clean
308 	 * rollback then we assume the state is consistent).
309 	 * 
310 	 * @author Dave Syer
311 	 * 
312 	 */
313 	private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback {
314 
315 		private final StepExecution stepExecution;
316 
317 		private final ChunkContext chunkContext;
318 
319 		private boolean rolledBack = false;
320 
321 		private boolean stepExecutionUpdated = false;
322 
323 		private StepExecution oldVersion;
324 
325 		private boolean locked = false;
326 
327 		private final Semaphore semaphore;
328 
329 		public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
330 			this.chunkContext = chunkContext;
331 			this.stepExecution = chunkContext.getStepContext().getStepExecution();
332 			this.semaphore = semaphore;
333 		}
334 
335 		@Override
336 		public void afterCompletion(int status) {
337 			try {
338 				if (status != TransactionSynchronization.STATUS_COMMITTED) {
339 					if (stepExecutionUpdated) {
340 						// Wah! the commit failed. We need to rescue the step
341 						// execution data.
342 						logger.info("Commit failed while step execution data was already updated. "
343 								+ "Reverting to old version.");
344 						copy(oldVersion, stepExecution);
345 						if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
346 							rollback(stepExecution);
347 						}
348 					}
349 				}
350 				if (status == TransactionSynchronization.STATUS_UNKNOWN) {
351 					logger.error("Rolling back with transaction in unknown state");
352 					rollback(stepExecution);
353 					stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
354 					stepExecution.setTerminateOnly();
355 				}
356 			}
357 			finally {
358 				// Only release the lock if we acquired it, and release as late
359 				// as possible
360 				if (locked) {
361 					semaphore.release();
362 				}
363 				locked = false;
364 			}
365 		}
366 
367 		public Object doInTransaction(TransactionStatus status) {
368 
369 			TransactionSynchronizationManager.registerSynchronization(this);
370 
371 			RepeatStatus result = RepeatStatus.CONTINUABLE;
372 
373 			StepContribution contribution = stepExecution.createStepContribution();
374 
375 			chunkListener.beforeChunk();
376 
377 			// In case we need to push it back to its old value
378 			// after a commit fails...
379 			oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
380 			copy(stepExecution, oldVersion);
381 
382 			try {
383 
384 				try {
385 					try {
386 						result = tasklet.execute(contribution, chunkContext);
387 						if (result == null) {
388 							result = RepeatStatus.FINISHED;
389 						}
390 					}
391 					catch (Exception e) {
392 						if (transactionAttribute.rollbackOn(e)) {
393 							throw e;
394 						}
395 					}
396 
397 				}
398 				finally {
399 
400 					// If the step operations are asynchronous then we need
401 					// to synchronize changes to the step execution (at a
402 					// minimum). Take the lock *before* changing the step
403 					// execution.
404 					try {
405 						semaphore.acquire();
406 						locked = true;
407 					}
408 					catch (InterruptedException e) {
409 						logger.error("Thread interrupted while locking for repository update");
410 						stepExecution.setStatus(BatchStatus.STOPPED);
411 						stepExecution.setTerminateOnly();
412 						Thread.currentThread().interrupt();
413 					}
414 
415 					// Apply the contribution to the step
416 					// even if unsuccessful
417 					logger.debug("Applying contribution: " + contribution);
418 					stepExecution.apply(contribution);
419 
420 				}
421 
422 				stepExecutionUpdated = true;
423 
424 				stream.update(stepExecution.getExecutionContext());
425 
426 				try {
427 					// Going to attempt a commit. If it fails this flag will
428 					// stay false and we can use that later.
429 					getJobRepository().updateExecutionContext(stepExecution);
430 					stepExecution.incrementCommitCount();
431 					logger.debug("Saving step execution before commit: " + stepExecution);
432 					getJobRepository().update(stepExecution);
433 				}
434 				catch (Exception e) {
435 					// If we get to here there was a problem saving the step
436 					// execution and we have to fail.
437 					String msg = "JobRepository failure forcing exit with unknown status";
438 					logger.error(msg, e);
439 					stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
440 					stepExecution.setTerminateOnly();
441 					throw new FatalStepExecutionException(msg, e);
442 				}
443 
444 			}
445 			catch (Error e) {
446 				logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
447 				rollback(stepExecution);
448 				throw e;
449 			}
450 			catch (RuntimeException e) {
451 				logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
452 				rollback(stepExecution);
453 				throw e;
454 			}
455 			catch (Exception e) {
456 				logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
457 				rollback(stepExecution);
458 				// Allow checked exceptions
459 				throw new UncheckedTransactionException(e);
460 			}
461 
462 			return result;
463 
464 		}
465 
466 		private void rollback(StepExecution stepExecution) {
467 			if (!rolledBack) {
468 				stepExecution.incrementRollbackCount();
469 				rolledBack = true;
470 			}
471 		}
472 
473 		private void copy(final StepExecution source, final StepExecution target) {
474 			target.setVersion(source.getVersion());
475 			target.setWriteCount(source.getWriteCount());
476 			target.setFilterCount(source.getFilterCount());
477 			target.setCommitCount(source.getCommitCount());
478 			target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
479 		}
480 
481 	}
482 
483 	/**
484 	 * Convenience wrapper for a checked exception so that it can cause a
485 	 * rollback and be extracted afterwards.
486 	 * 
487 	 * @author Dave Syer
488 	 * 
489 	 */
490 	private static class UncheckedTransactionException extends RuntimeException {
491 
492 		public UncheckedTransactionException(Exception e) {
493 			super(e);
494 		}
495 
496 	}
497 
498 }