1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.tasklet; |
17 | |
18 | import java.util.concurrent.Semaphore; |
19 | |
20 | import org.apache.commons.logging.Log; |
21 | import org.apache.commons.logging.LogFactory; |
22 | import org.springframework.batch.core.BatchStatus; |
23 | import org.springframework.batch.core.ChunkListener; |
24 | import org.springframework.batch.core.JobInterruptedException; |
25 | import org.springframework.batch.core.StepContribution; |
26 | import org.springframework.batch.core.StepExecution; |
27 | import org.springframework.batch.core.StepExecutionListener; |
28 | import org.springframework.batch.core.listener.CompositeChunkListener; |
29 | import org.springframework.batch.core.repository.JobRepository; |
30 | import org.springframework.batch.core.scope.context.ChunkContext; |
31 | import org.springframework.batch.core.scope.context.StepContextRepeatCallback; |
32 | import org.springframework.batch.core.step.AbstractStep; |
33 | import org.springframework.batch.core.step.FatalStepExecutionException; |
34 | import org.springframework.batch.core.step.StepInterruptionPolicy; |
35 | import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; |
36 | import org.springframework.batch.item.ExecutionContext; |
37 | import org.springframework.batch.item.ItemReader; |
38 | import org.springframework.batch.item.ItemStream; |
39 | import org.springframework.batch.item.ItemWriter; |
40 | import org.springframework.batch.item.support.CompositeItemStream; |
41 | import org.springframework.batch.repeat.RepeatContext; |
42 | import org.springframework.batch.repeat.RepeatOperations; |
43 | import org.springframework.batch.repeat.RepeatStatus; |
44 | import org.springframework.batch.repeat.support.RepeatTemplate; |
45 | import org.springframework.transaction.PlatformTransactionManager; |
46 | import org.springframework.transaction.TransactionStatus; |
47 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
48 | import org.springframework.transaction.interceptor.TransactionAttribute; |
49 | import org.springframework.transaction.support.TransactionCallback; |
50 | import org.springframework.transaction.support.TransactionSynchronization; |
51 | import org.springframework.transaction.support.TransactionSynchronizationAdapter; |
52 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
53 | import org.springframework.transaction.support.TransactionTemplate; |
54 | import org.springframework.util.Assert; |
55 | |
56 | /** |
57 | * Simple implementation of executing the step as a call to a {@link Tasklet}, |
58 | * possibly repeated, and each call surrounded by a transaction. The structure |
59 | * is therefore that of a loop with transaction boundary inside the loop. The |
60 | * loop is controlled by the step operations ( |
61 | * {@link #setStepOperations(RepeatOperations)}).<br/> |
62 | * <br/> |
63 | * |
64 | * Clients can use interceptors in the step operations to intercept or listen to |
65 | * the iteration on a step-wide basis, for instance to get a callback when the |
66 | * step is complete. Those that want callbacks at the level of an individual |
67 | * tasks, can specify interceptors for the chunk operations. |
68 | * |
69 | * @author Dave Syer |
70 | * @author Lucas Ward |
71 | * @author Ben Hale |
72 | * @author Robert Kasanicky |
73 | * @author Michael Minella |
74 | */ |
75 | @SuppressWarnings("serial") |
76 | public class TaskletStep extends AbstractStep { |
77 | |
78 | private static final Log logger = LogFactory.getLog(TaskletStep.class); |
79 | |
80 | private RepeatOperations stepOperations = new RepeatTemplate(); |
81 | |
82 | private CompositeChunkListener chunkListener = new CompositeChunkListener(); |
83 | |
84 | // default to checking current thread for interruption. |
85 | private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy(); |
86 | |
87 | private CompositeItemStream stream = new CompositeItemStream(); |
88 | |
89 | private PlatformTransactionManager transactionManager; |
90 | |
91 | private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() { |
92 | |
93 | @Override |
94 | public boolean rollbackOn(Throwable ex) { |
95 | return true; |
96 | } |
97 | |
98 | }; |
99 | |
100 | private Tasklet tasklet; |
101 | |
102 | /** |
103 | * Default constructor. |
104 | */ |
105 | public TaskletStep() { |
106 | this(null); |
107 | } |
108 | |
109 | /** |
110 | * @param name |
111 | */ |
112 | public TaskletStep(String name) { |
113 | super(name); |
114 | } |
115 | |
116 | /* |
117 | * (non-Javadoc) |
118 | * |
119 | * @see |
120 | * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet() |
121 | */ |
122 | @Override |
123 | public void afterPropertiesSet() throws Exception { |
124 | super.afterPropertiesSet(); |
125 | Assert.state(transactionManager != null, "A transaction manager must be provided"); |
126 | } |
127 | |
128 | /** |
129 | * Public setter for the {@link PlatformTransactionManager}. |
130 | * |
131 | * @param transactionManager the transaction manager to set |
132 | */ |
133 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
134 | this.transactionManager = transactionManager; |
135 | } |
136 | |
137 | /** |
138 | * Public setter for the {@link TransactionAttribute}. |
139 | * |
140 | * @param transactionAttribute the {@link TransactionAttribute} to set |
141 | */ |
142 | public void setTransactionAttribute(TransactionAttribute transactionAttribute) { |
143 | this.transactionAttribute = transactionAttribute; |
144 | } |
145 | |
146 | /** |
147 | * Public setter for the {@link Tasklet}. |
148 | * |
149 | * @param tasklet the {@link Tasklet} to set |
150 | */ |
151 | public void setTasklet(Tasklet tasklet) { |
152 | this.tasklet = tasklet; |
153 | if (tasklet instanceof StepExecutionListener) { |
154 | registerStepExecutionListener((StepExecutionListener) tasklet); |
155 | } |
156 | } |
157 | |
158 | /** |
159 | * Register a chunk listener for callbacks at the appropriate stages in a |
160 | * step execution. |
161 | * |
162 | * @param listener a {@link ChunkListener} |
163 | */ |
164 | public void registerChunkListener(ChunkListener listener) { |
165 | this.chunkListener.register(listener); |
166 | } |
167 | |
168 | /** |
169 | * Register each of the objects as listeners. |
170 | * |
171 | * @param listeners an array of listener objects of known types. |
172 | */ |
173 | public void setChunkListeners(ChunkListener[] listeners) { |
174 | for (int i = 0; i < listeners.length; i++) { |
175 | registerChunkListener(listeners[i]); |
176 | } |
177 | } |
178 | |
179 | /** |
180 | * Register each of the streams for callbacks at the appropriate time in the |
181 | * step. The {@link ItemReader} and {@link ItemWriter} are automatically |
182 | * registered, but it doesn't hurt to also register them here. Injected |
183 | * dependencies of the reader and writer are not automatically registered, |
184 | * so if you implement {@link ItemWriter} using delegation to another object |
185 | * which itself is a {@link ItemStream}, you need to register the delegate |
186 | * here. |
187 | * |
188 | * @param streams an array of {@link ItemStream} objects. |
189 | */ |
190 | public void setStreams(ItemStream[] streams) { |
191 | for (int i = 0; i < streams.length; i++) { |
192 | registerStream(streams[i]); |
193 | } |
194 | } |
195 | |
196 | /** |
197 | * Register a single {@link ItemStream} for callbacks to the stream |
198 | * interface. |
199 | * |
200 | * @param stream |
201 | */ |
202 | public void registerStream(ItemStream stream) { |
203 | this.stream.register(stream); |
204 | } |
205 | |
206 | /** |
207 | * The {@link RepeatOperations} to use for the outer loop of the batch |
208 | * processing. Should be set up by the caller through a factory. Defaults to |
209 | * a plain {@link RepeatTemplate}. |
210 | * |
211 | * @param stepOperations a {@link RepeatOperations} instance. |
212 | */ |
213 | public void setStepOperations(RepeatOperations stepOperations) { |
214 | this.stepOperations = stepOperations; |
215 | } |
216 | |
217 | /** |
218 | * Setter for the {@link StepInterruptionPolicy}. The policy is used to |
219 | * check whether an external request has been made to interrupt the job |
220 | * execution. |
221 | * |
222 | * @param interruptionPolicy a {@link StepInterruptionPolicy} |
223 | */ |
224 | public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) { |
225 | this.interruptionPolicy = interruptionPolicy; |
226 | } |
227 | |
228 | /** |
229 | * Process the step and update its context so that progress can be monitored |
230 | * by the caller. The step is broken down into chunks, each one executing in |
231 | * a transaction. The step and its execution and execution context are all |
232 | * given an up to date {@link BatchStatus}, and the {@link JobRepository} is |
233 | * used to store the result. Various reporting information are also added to |
234 | * the current context governing the step execution, which would normally be |
235 | * available to the caller through the step's {@link ExecutionContext}.<br/> |
236 | * |
237 | * @throws JobInterruptedException if the step or a chunk is interrupted |
238 | * @throws RuntimeException if there is an exception during a chunk |
239 | * execution |
240 | * |
241 | */ |
242 | @Override |
243 | @SuppressWarnings("unchecked") |
244 | protected void doExecute(StepExecution stepExecution) throws Exception { |
245 | |
246 | stream.update(stepExecution.getExecutionContext()); |
247 | getJobRepository().updateExecutionContext(stepExecution); |
248 | |
249 | // Shared semaphore per step execution, so other step executions can run |
250 | // in parallel without needing the lock |
251 | final Semaphore semaphore = createSemaphore(); |
252 | |
253 | stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { |
254 | |
255 | @Override |
256 | public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) |
257 | throws Exception { |
258 | |
259 | StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); |
260 | |
261 | // Before starting a new transaction, check for |
262 | // interruption. |
263 | interruptionPolicy.checkInterrupted(stepExecution); |
264 | |
265 | RepeatStatus result; |
266 | try { |
267 | result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute) |
268 | .execute(new ChunkTransactionCallback(chunkContext, semaphore)); |
269 | } |
270 | catch (UncheckedTransactionException e) { |
271 | // Allow checked exceptions to be thrown inside callback |
272 | throw (Exception) e.getCause(); |
273 | } |
274 | |
275 | chunkListener.afterChunk(chunkContext); |
276 | |
277 | // Check for interruption after transaction as well, so that |
278 | // the interrupted exception is correctly propagated up to |
279 | // caller |
280 | interruptionPolicy.checkInterrupted(stepExecution); |
281 | |
282 | return result; |
283 | } |
284 | |
285 | }); |
286 | |
287 | } |
288 | |
289 | /** |
290 | * Extension point mainly for test purposes so that the behaviour of the |
291 | * lock can be manipulated to simulate various pathologies. |
292 | * |
293 | * @return a semaphore for locking access to the JobRepository |
294 | */ |
295 | protected Semaphore createSemaphore() { |
296 | return new Semaphore(1); |
297 | } |
298 | |
299 | @Override |
300 | protected void close(ExecutionContext ctx) throws Exception { |
301 | stream.close(); |
302 | } |
303 | |
304 | @Override |
305 | protected void open(ExecutionContext ctx) throws Exception { |
306 | stream.open(ctx); |
307 | } |
308 | |
309 | /** |
310 | * A callback for the transactional work inside a chunk. Also detects |
311 | * failures in the transaction commit and rollback, only panicking if the |
312 | * transaction status is unknown (i.e. if a commit failure leads to a clean |
313 | * rollback then we assume the state is consistent). |
314 | * |
315 | * @author Dave Syer |
316 | * |
317 | */ |
318 | @SuppressWarnings("rawtypes") |
319 | private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback { |
320 | |
321 | private final StepExecution stepExecution; |
322 | |
323 | private final ChunkContext chunkContext; |
324 | |
325 | private boolean rolledBack = false; |
326 | |
327 | private boolean stepExecutionUpdated = false; |
328 | |
329 | private StepExecution oldVersion; |
330 | |
331 | private boolean locked = false; |
332 | |
333 | private final Semaphore semaphore; |
334 | |
335 | public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) { |
336 | this.chunkContext = chunkContext; |
337 | this.stepExecution = chunkContext.getStepContext().getStepExecution(); |
338 | this.semaphore = semaphore; |
339 | } |
340 | |
341 | @Override |
342 | public void afterCompletion(int status) { |
343 | try { |
344 | if (status != TransactionSynchronization.STATUS_COMMITTED) { |
345 | if (stepExecutionUpdated) { |
346 | // Wah! the commit failed. We need to rescue the step |
347 | // execution data. |
348 | logger.info("Commit failed while step execution data was already updated. " |
349 | + "Reverting to old version."); |
350 | copy(oldVersion, stepExecution); |
351 | if (status == TransactionSynchronization.STATUS_ROLLED_BACK) { |
352 | rollback(stepExecution); |
353 | } |
354 | } |
355 | chunkListener.afterChunkError(chunkContext); |
356 | } |
357 | |
358 | if (status == TransactionSynchronization.STATUS_UNKNOWN) { |
359 | logger.error("Rolling back with transaction in unknown state"); |
360 | rollback(stepExecution); |
361 | stepExecution.upgradeStatus(BatchStatus.UNKNOWN); |
362 | stepExecution.setTerminateOnly(); |
363 | } |
364 | } |
365 | finally { |
366 | // Only release the lock if we acquired it, and release as late |
367 | // as possible |
368 | if (locked) { |
369 | semaphore.release(); |
370 | } |
371 | |
372 | locked = false; |
373 | } |
374 | } |
375 | |
376 | @Override |
377 | public Object doInTransaction(TransactionStatus status) { |
378 | TransactionSynchronizationManager.registerSynchronization(this); |
379 | |
380 | RepeatStatus result = RepeatStatus.CONTINUABLE; |
381 | |
382 | StepContribution contribution = stepExecution.createStepContribution(); |
383 | |
384 | chunkListener.beforeChunk(chunkContext); |
385 | |
386 | // In case we need to push it back to its old value |
387 | // after a commit fails... |
388 | oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); |
389 | copy(stepExecution, oldVersion); |
390 | |
391 | try { |
392 | |
393 | try { |
394 | try { |
395 | result = tasklet.execute(contribution, chunkContext); |
396 | if (result == null) { |
397 | result = RepeatStatus.FINISHED; |
398 | } |
399 | } |
400 | catch (Exception e) { |
401 | if (transactionAttribute.rollbackOn(e)) { |
402 | chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e); |
403 | throw e; |
404 | } |
405 | } |
406 | } |
407 | finally { |
408 | |
409 | // If the step operations are asynchronous then we need |
410 | // to synchronize changes to the step execution (at a |
411 | // minimum). Take the lock *before* changing the step |
412 | // execution. |
413 | try { |
414 | semaphore.acquire(); |
415 | locked = true; |
416 | } |
417 | catch (InterruptedException e) { |
418 | logger.error("Thread interrupted while locking for repository update"); |
419 | stepExecution.setStatus(BatchStatus.STOPPED); |
420 | stepExecution.setTerminateOnly(); |
421 | Thread.currentThread().interrupt(); |
422 | } |
423 | |
424 | // Apply the contribution to the step |
425 | // even if unsuccessful |
426 | logger.debug("Applying contribution: " + contribution); |
427 | stepExecution.apply(contribution); |
428 | |
429 | } |
430 | |
431 | stepExecutionUpdated = true; |
432 | |
433 | stream.update(stepExecution.getExecutionContext()); |
434 | |
435 | try { |
436 | // Going to attempt a commit. If it fails this flag will |
437 | // stay false and we can use that later. |
438 | getJobRepository().updateExecutionContext(stepExecution); |
439 | stepExecution.incrementCommitCount(); |
440 | logger.debug("Saving step execution before commit: " + stepExecution); |
441 | getJobRepository().update(stepExecution); |
442 | } |
443 | catch (Exception e) { |
444 | // If we get to here there was a problem saving the step |
445 | // execution and we have to fail. |
446 | String msg = "JobRepository failure forcing rollback"; |
447 | logger.error(msg, e); |
448 | throw new FatalStepExecutionException(msg, e); |
449 | } |
450 | } |
451 | catch (Error e) { |
452 | logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage()); |
453 | rollback(stepExecution); |
454 | throw e; |
455 | } |
456 | catch (RuntimeException e) { |
457 | logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage()); |
458 | rollback(stepExecution); |
459 | throw e; |
460 | } |
461 | catch (Exception e) { |
462 | logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage()); |
463 | rollback(stepExecution); |
464 | // Allow checked exceptions |
465 | throw new UncheckedTransactionException(e); |
466 | } |
467 | |
468 | return result; |
469 | |
470 | } |
471 | |
472 | private void rollback(StepExecution stepExecution) { |
473 | if (!rolledBack) { |
474 | stepExecution.incrementRollbackCount(); |
475 | rolledBack = true; |
476 | } |
477 | } |
478 | |
479 | private void copy(final StepExecution source, final StepExecution target) { |
480 | target.setVersion(source.getVersion()); |
481 | target.setWriteCount(source.getWriteCount()); |
482 | target.setFilterCount(source.getFilterCount()); |
483 | target.setCommitCount(source.getCommitCount()); |
484 | target.setExecutionContext(new ExecutionContext(source.getExecutionContext())); |
485 | } |
486 | |
487 | } |
488 | |
489 | /** |
490 | * Convenience wrapper for a checked exception so that it can cause a |
491 | * rollback and be extracted afterwards. |
492 | * |
493 | * @author Dave Syer |
494 | * |
495 | */ |
496 | private static class UncheckedTransactionException extends RuntimeException { |
497 | |
498 | public UncheckedTransactionException(Exception e) { |
499 | super(e); |
500 | } |
501 | |
502 | } |
503 | |
504 | } |