View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.tasklet;
17  
18  import java.util.concurrent.Semaphore;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.springframework.batch.core.BatchStatus;
23  import org.springframework.batch.core.ChunkListener;
24  import org.springframework.batch.core.JobInterruptedException;
25  import org.springframework.batch.core.StepContribution;
26  import org.springframework.batch.core.StepExecution;
27  import org.springframework.batch.core.StepExecutionListener;
28  import org.springframework.batch.core.listener.CompositeChunkListener;
29  import org.springframework.batch.core.repository.JobRepository;
30  import org.springframework.batch.core.scope.context.ChunkContext;
31  import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
32  import org.springframework.batch.core.step.AbstractStep;
33  import org.springframework.batch.core.step.FatalStepExecutionException;
34  import org.springframework.batch.core.step.StepInterruptionPolicy;
35  import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
36  import org.springframework.batch.item.ExecutionContext;
37  import org.springframework.batch.item.ItemReader;
38  import org.springframework.batch.item.ItemStream;
39  import org.springframework.batch.item.ItemWriter;
40  import org.springframework.batch.item.support.CompositeItemStream;
41  import org.springframework.batch.repeat.RepeatContext;
42  import org.springframework.batch.repeat.RepeatOperations;
43  import org.springframework.batch.repeat.RepeatStatus;
44  import org.springframework.batch.repeat.support.RepeatTemplate;
45  import org.springframework.transaction.PlatformTransactionManager;
46  import org.springframework.transaction.TransactionStatus;
47  import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48  import org.springframework.transaction.interceptor.TransactionAttribute;
49  import org.springframework.transaction.support.TransactionCallback;
50  import org.springframework.transaction.support.TransactionSynchronization;
51  import org.springframework.transaction.support.TransactionSynchronizationAdapter;
52  import org.springframework.transaction.support.TransactionSynchronizationManager;
53  import org.springframework.transaction.support.TransactionTemplate;
54  import org.springframework.util.Assert;
55  
56  /**
57   * Simple implementation of executing the step as a call to a {@link Tasklet},
58   * possibly repeated, and each call surrounded by a transaction. The structure
59   * is therefore that of a loop with transaction boundary inside the loop. The
60   * loop is controlled by the step operations (
61   * {@link #setStepOperations(RepeatOperations)}).<br/>
62   * <br/>
63   *
64   * Clients can use interceptors in the step operations to intercept or listen to
65   * the iteration on a step-wide basis, for instance to get a callback when the
66   * step is complete. Those that want callbacks at the level of an individual
67   * tasks, can specify interceptors for the chunk operations.
68   *
69   * @author Dave Syer
70   * @author Lucas Ward
71   * @author Ben Hale
72   * @author Robert Kasanicky
73   * @author Michael Minella
74   */
75  @SuppressWarnings("serial")
76  public class TaskletStep extends AbstractStep {
77  
78  	private static final Log logger = LogFactory.getLog(TaskletStep.class);
79  
80  	private RepeatOperations stepOperations = new RepeatTemplate();
81  
82  	private CompositeChunkListener chunkListener = new CompositeChunkListener();
83  
84  	// default to checking current thread for interruption.
85  	private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
86  
87  	private CompositeItemStream stream = new CompositeItemStream();
88  
89  	private PlatformTransactionManager transactionManager;
90  
91  	private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
92  
93  		@Override
94  		public boolean rollbackOn(Throwable ex) {
95  			return true;
96  		}
97  
98  	};
99  
100 	private Tasklet tasklet;
101 
102 	/**
103 	 * Default constructor.
104 	 */
105 	public TaskletStep() {
106 		this(null);
107 	}
108 
109 	/**
110 	 * @param name
111 	 */
112 	public TaskletStep(String name) {
113 		super(name);
114 	}
115 
116 	/*
117 	 * (non-Javadoc)
118 	 *
119 	 * @see
120 	 * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
121 	 */
122 	@Override
123 	public void afterPropertiesSet() throws Exception {
124 		super.afterPropertiesSet();
125 		Assert.state(transactionManager != null, "A transaction manager must be provided");
126 	}
127 
128 	/**
129 	 * Public setter for the {@link PlatformTransactionManager}.
130 	 *
131 	 * @param transactionManager the transaction manager to set
132 	 */
133 	public void setTransactionManager(PlatformTransactionManager transactionManager) {
134 		this.transactionManager = transactionManager;
135 	}
136 
137 	/**
138 	 * Public setter for the {@link TransactionAttribute}.
139 	 *
140 	 * @param transactionAttribute the {@link TransactionAttribute} to set
141 	 */
142 	public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
143 		this.transactionAttribute = transactionAttribute;
144 	}
145 
146 	/**
147 	 * Public setter for the {@link Tasklet}.
148 	 *
149 	 * @param tasklet the {@link Tasklet} to set
150 	 */
151 	public void setTasklet(Tasklet tasklet) {
152 		this.tasklet = tasklet;
153 		if (tasklet instanceof StepExecutionListener) {
154 			registerStepExecutionListener((StepExecutionListener) tasklet);
155 		}
156 	}
157 
158 	/**
159 	 * Register a chunk listener for callbacks at the appropriate stages in a
160 	 * step execution.
161 	 *
162 	 * @param listener a {@link ChunkListener}
163 	 */
164 	public void registerChunkListener(ChunkListener listener) {
165 		this.chunkListener.register(listener);
166 	}
167 
168 	/**
169 	 * Register each of the objects as listeners.
170 	 *
171 	 * @param listeners an array of listener objects of known types.
172 	 */
173 	public void setChunkListeners(ChunkListener[] listeners) {
174 		for (int i = 0; i < listeners.length; i++) {
175 			registerChunkListener(listeners[i]);
176 		}
177 	}
178 
179 	/**
180 	 * Register each of the streams for callbacks at the appropriate time in the
181 	 * step. The {@link ItemReader} and {@link ItemWriter} are automatically
182 	 * registered, but it doesn't hurt to also register them here. Injected
183 	 * dependencies of the reader and writer are not automatically registered,
184 	 * so if you implement {@link ItemWriter} using delegation to another object
185 	 * which itself is a {@link ItemStream}, you need to register the delegate
186 	 * here.
187 	 *
188 	 * @param streams an array of {@link ItemStream} objects.
189 	 */
190 	public void setStreams(ItemStream[] streams) {
191 		for (int i = 0; i < streams.length; i++) {
192 			registerStream(streams[i]);
193 		}
194 	}
195 
196 	/**
197 	 * Register a single {@link ItemStream} for callbacks to the stream
198 	 * interface.
199 	 *
200 	 * @param stream
201 	 */
202 	public void registerStream(ItemStream stream) {
203 		this.stream.register(stream);
204 	}
205 
206 	/**
207 	 * The {@link RepeatOperations} to use for the outer loop of the batch
208 	 * processing. Should be set up by the caller through a factory. Defaults to
209 	 * a plain {@link RepeatTemplate}.
210 	 *
211 	 * @param stepOperations a {@link RepeatOperations} instance.
212 	 */
213 	public void setStepOperations(RepeatOperations stepOperations) {
214 		this.stepOperations = stepOperations;
215 	}
216 
217 	/**
218 	 * Setter for the {@link StepInterruptionPolicy}. The policy is used to
219 	 * check whether an external request has been made to interrupt the job
220 	 * execution.
221 	 *
222 	 * @param interruptionPolicy a {@link StepInterruptionPolicy}
223 	 */
224 	public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
225 		this.interruptionPolicy = interruptionPolicy;
226 	}
227 
228 	/**
229 	 * Process the step and update its context so that progress can be monitored
230 	 * by the caller. The step is broken down into chunks, each one executing in
231 	 * a transaction. The step and its execution and execution context are all
232 	 * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
233 	 * used to store the result. Various reporting information are also added to
234 	 * the current context governing the step execution, which would normally be
235 	 * available to the caller through the step's {@link ExecutionContext}.<br/>
236 	 *
237 	 * @throws JobInterruptedException if the step or a chunk is interrupted
238 	 * @throws RuntimeException if there is an exception during a chunk
239 	 * execution
240 	 *
241 	 */
242 	@Override
243 	@SuppressWarnings("unchecked")
244 	protected void doExecute(StepExecution stepExecution) throws Exception {
245 
246 		stream.update(stepExecution.getExecutionContext());
247 		getJobRepository().updateExecutionContext(stepExecution);
248 
249 		// Shared semaphore per step execution, so other step executions can run
250 		// in parallel without needing the lock
251 		final Semaphore semaphore = createSemaphore();
252 
253 		stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
254 
255 			@Override
256 			public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
257 					throws Exception {
258 
259 				StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
260 
261 				// Before starting a new transaction, check for
262 				// interruption.
263 				interruptionPolicy.checkInterrupted(stepExecution);
264 
265 				RepeatStatus result;
266 				try {
267 					result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute)
268 					.execute(new ChunkTransactionCallback(chunkContext, semaphore));
269 				}
270 				catch (UncheckedTransactionException e) {
271 					// Allow checked exceptions to be thrown inside callback
272 					throw (Exception) e.getCause();
273 				}
274 
275 				chunkListener.afterChunk(chunkContext);
276 
277 				// Check for interruption after transaction as well, so that
278 				// the interrupted exception is correctly propagated up to
279 				// caller
280 				interruptionPolicy.checkInterrupted(stepExecution);
281 
282 				return result;
283 			}
284 
285 		});
286 
287 	}
288 
289 	/**
290 	 * Extension point mainly for test purposes so that the behaviour of the
291 	 * lock can be manipulated to simulate various pathologies.
292 	 *
293 	 * @return a semaphore for locking access to the JobRepository
294 	 */
295 	protected Semaphore createSemaphore() {
296 		return new Semaphore(1);
297 	}
298 
299 	@Override
300 	protected void close(ExecutionContext ctx) throws Exception {
301 		stream.close();
302 	}
303 
304 	@Override
305 	protected void open(ExecutionContext ctx) throws Exception {
306 		stream.open(ctx);
307 	}
308 
309 	/**
310 	 * A callback for the transactional work inside a chunk. Also detects
311 	 * failures in the transaction commit and rollback, only panicking if the
312 	 * transaction status is unknown (i.e. if a commit failure leads to a clean
313 	 * rollback then we assume the state is consistent).
314 	 *
315 	 * @author Dave Syer
316 	 *
317 	 */
318 	@SuppressWarnings("rawtypes")
319 	private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback {
320 
321 		private final StepExecution stepExecution;
322 
323 		private final ChunkContext chunkContext;
324 
325 		private boolean rolledBack = false;
326 
327 		private boolean stepExecutionUpdated = false;
328 
329 		private StepExecution oldVersion;
330 
331 		private boolean locked = false;
332 
333 		private final Semaphore semaphore;
334 
335 		public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
336 			this.chunkContext = chunkContext;
337 			this.stepExecution = chunkContext.getStepContext().getStepExecution();
338 			this.semaphore = semaphore;
339 		}
340 
341 		@Override
342 		public void afterCompletion(int status) {
343 			try {
344 				if (status != TransactionSynchronization.STATUS_COMMITTED) {
345 					if (stepExecutionUpdated) {
346 						// Wah! the commit failed. We need to rescue the step
347 						// execution data.
348 						logger.info("Commit failed while step execution data was already updated. "
349 								+ "Reverting to old version.");
350 						copy(oldVersion, stepExecution);
351 						if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
352 							rollback(stepExecution);
353 						}
354 					}
355 					chunkListener.afterChunkError(chunkContext);
356 				}
357 				if (status == TransactionSynchronization.STATUS_UNKNOWN) {
358 					logger.error("Rolling back with transaction in unknown state");
359 					rollback(stepExecution);
360 					stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
361 					stepExecution.setTerminateOnly();
362 				}
363 			}
364 			finally {
365 				// Only release the lock if we acquired it, and release as late
366 				// as possible
367 				if (locked) {
368 					semaphore.release();
369 				}
370 				locked = false;
371 			}
372 		}
373 
374 		@Override
375 		public Object doInTransaction(TransactionStatus status) {
376 
377 			TransactionSynchronizationManager.registerSynchronization(this);
378 
379 			RepeatStatus result = RepeatStatus.CONTINUABLE;
380 
381 			StepContribution contribution = stepExecution.createStepContribution();
382 
383 			chunkListener.beforeChunk(chunkContext);
384 
385 			// In case we need to push it back to its old value
386 			// after a commit fails...
387 			oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
388 			copy(stepExecution, oldVersion);
389 
390 			try {
391 
392 				try {
393 					try {
394 						result = tasklet.execute(contribution, chunkContext);
395 						if (result == null) {
396 							result = RepeatStatus.FINISHED;
397 						}
398 					}
399 					catch (Exception e) {
400 						if (transactionAttribute.rollbackOn(e)) {
401 							chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
402 							throw e;
403 						}
404 					}
405 				}
406 				finally {
407 
408 					// If the step operations are asynchronous then we need
409 					// to synchronize changes to the step execution (at a
410 					// minimum). Take the lock *before* changing the step
411 					// execution.
412 					try {
413 						semaphore.acquire();
414 						locked = true;
415 					}
416 					catch (InterruptedException e) {
417 						logger.error("Thread interrupted while locking for repository update");
418 						stepExecution.setStatus(BatchStatus.STOPPED);
419 						stepExecution.setTerminateOnly();
420 						Thread.currentThread().interrupt();
421 					}
422 
423 					// Apply the contribution to the step
424 					// even if unsuccessful
425 					logger.debug("Applying contribution: " + contribution);
426 					stepExecution.apply(contribution);
427 
428 				}
429 
430 				stepExecutionUpdated = true;
431 
432 				stream.update(stepExecution.getExecutionContext());
433 
434 				try {
435 					// Going to attempt a commit. If it fails this flag will
436 					// stay false and we can use that later.
437 					getJobRepository().updateExecutionContext(stepExecution);
438 					stepExecution.incrementCommitCount();
439 					logger.debug("Saving step execution before commit: " + stepExecution);
440 					getJobRepository().update(stepExecution);
441 				}
442 				catch (Exception e) {
443 					// If we get to here there was a problem saving the step
444 					// execution and we have to fail.
445 					String msg = "JobRepository failure forcing exit with unknown status";
446 					logger.error(msg, e);
447 					stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
448 					stepExecution.setTerminateOnly();
449 					throw new FatalStepExecutionException(msg, e);
450 				}
451 
452 			}
453 			catch (Error e) {
454 				logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
455 				rollback(stepExecution);
456 				throw e;
457 			}
458 			catch (RuntimeException e) {
459 				logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
460 				rollback(stepExecution);
461 				throw e;
462 			}
463 			catch (Exception e) {
464 				logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
465 				rollback(stepExecution);
466 				// Allow checked exceptions
467 				throw new UncheckedTransactionException(e);
468 			}
469 
470 			return result;
471 
472 		}
473 
474 		private void rollback(StepExecution stepExecution) {
475 			if (!rolledBack) {
476 				stepExecution.incrementRollbackCount();
477 				rolledBack = true;
478 			}
479 		}
480 
481 		private void copy(final StepExecution source, final StepExecution target) {
482 			target.setVersion(source.getVersion());
483 			target.setWriteCount(source.getWriteCount());
484 			target.setFilterCount(source.getFilterCount());
485 			target.setCommitCount(source.getCommitCount());
486 			target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
487 		}
488 
489 	}
490 
491 	/**
492 	 * Convenience wrapper for a checked exception so that it can cause a
493 	 * rollback and be extracted afterwards.
494 	 *
495 	 * @author Dave Syer
496 	 *
497 	 */
498 	private static class UncheckedTransactionException extends RuntimeException {
499 
500 		public UncheckedTransactionException(Exception e) {
501 			super(e);
502 		}
503 
504 	}
505 
506 }