1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.tasklet; |
17 | |
18 | import java.util.concurrent.Semaphore; |
19 | |
20 | import org.apache.commons.logging.Log; |
21 | import org.apache.commons.logging.LogFactory; |
22 | import org.springframework.batch.core.BatchStatus; |
23 | import org.springframework.batch.core.ChunkListener; |
24 | import org.springframework.batch.core.JobInterruptedException; |
25 | import org.springframework.batch.core.StepContribution; |
26 | import org.springframework.batch.core.StepExecution; |
27 | import org.springframework.batch.core.StepExecutionListener; |
28 | import org.springframework.batch.core.listener.CompositeChunkListener; |
29 | import org.springframework.batch.core.repository.JobRepository; |
30 | import org.springframework.batch.core.scope.context.ChunkContext; |
31 | import org.springframework.batch.core.scope.context.StepContextRepeatCallback; |
32 | import org.springframework.batch.core.step.AbstractStep; |
33 | import org.springframework.batch.core.step.FatalStepExecutionException; |
34 | import org.springframework.batch.core.step.StepInterruptionPolicy; |
35 | import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; |
36 | import org.springframework.batch.item.ExecutionContext; |
37 | import org.springframework.batch.item.ItemReader; |
38 | import org.springframework.batch.item.ItemStream; |
39 | import org.springframework.batch.item.ItemWriter; |
40 | import org.springframework.batch.item.support.CompositeItemStream; |
41 | import org.springframework.batch.repeat.RepeatContext; |
42 | import org.springframework.batch.repeat.RepeatOperations; |
43 | import org.springframework.batch.repeat.RepeatStatus; |
44 | import org.springframework.batch.repeat.support.RepeatTemplate; |
45 | import org.springframework.transaction.PlatformTransactionManager; |
46 | import org.springframework.transaction.TransactionStatus; |
47 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
48 | import org.springframework.transaction.interceptor.TransactionAttribute; |
49 | import org.springframework.transaction.support.TransactionCallback; |
50 | import org.springframework.transaction.support.TransactionSynchronization; |
51 | import org.springframework.transaction.support.TransactionSynchronizationAdapter; |
52 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
53 | import org.springframework.transaction.support.TransactionTemplate; |
54 | import org.springframework.util.Assert; |
55 | |
56 | /** |
57 | * Simple implementation of executing the step as a call to a {@link Tasklet}, |
58 | * possibly repeated, and each call surrounded by a transaction. The structure |
59 | * is therefore that of a loop with transaction boundary inside the loop. The |
60 | * loop is controlled by the step operations ( |
61 | * {@link #setStepOperations(RepeatOperations)}).<br/> |
62 | * <br/> |
63 | * |
64 | * Clients can use interceptors in the step operations to intercept or listen to |
65 | * the iteration on a step-wide basis, for instance to get a callback when the |
66 | * step is complete. Those that want callbacks at the level of an individual |
67 | * tasks, can specify interceptors for the chunk operations. |
68 | * |
69 | * @author Dave Syer |
70 | * @author Lucas Ward |
71 | * @author Ben Hale |
72 | * @author Robert Kasanicky |
73 | */ |
74 | public class TaskletStep extends AbstractStep { |
75 | |
76 | private static final Log logger = LogFactory.getLog(TaskletStep.class); |
77 | |
78 | private RepeatOperations stepOperations = new RepeatTemplate(); |
79 | |
80 | private CompositeChunkListener chunkListener = new CompositeChunkListener(); |
81 | |
82 | // default to checking current thread for interruption. |
83 | private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy(); |
84 | |
85 | private CompositeItemStream stream = new CompositeItemStream(); |
86 | |
87 | private PlatformTransactionManager transactionManager; |
88 | |
89 | private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() { |
90 | |
91 | @Override |
92 | public boolean rollbackOn(Throwable ex) { |
93 | return true; |
94 | } |
95 | |
96 | }; |
97 | |
98 | private Tasklet tasklet; |
99 | |
100 | /** |
101 | * Default constructor. |
102 | */ |
103 | public TaskletStep() { |
104 | this(null); |
105 | } |
106 | |
107 | /** |
108 | * @param name |
109 | */ |
110 | public TaskletStep(String name) { |
111 | super(name); |
112 | } |
113 | |
114 | /* |
115 | * (non-Javadoc) |
116 | * |
117 | * @see |
118 | * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet() |
119 | */ |
120 | @Override |
121 | public void afterPropertiesSet() throws Exception { |
122 | super.afterPropertiesSet(); |
123 | Assert.state(transactionManager != null, "A transaction manager must be provided"); |
124 | } |
125 | |
126 | /** |
127 | * Public setter for the {@link PlatformTransactionManager}. |
128 | * |
129 | * @param transactionManager the transaction manager to set |
130 | */ |
131 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
132 | this.transactionManager = transactionManager; |
133 | } |
134 | |
135 | /** |
136 | * Public setter for the {@link TransactionAttribute}. |
137 | * |
138 | * @param transactionAttribute the {@link TransactionAttribute} to set |
139 | */ |
140 | public void setTransactionAttribute(TransactionAttribute transactionAttribute) { |
141 | this.transactionAttribute = transactionAttribute; |
142 | } |
143 | |
144 | /** |
145 | * Public setter for the {@link Tasklet}. |
146 | * |
147 | * @param tasklet the {@link Tasklet} to set |
148 | */ |
149 | public void setTasklet(Tasklet tasklet) { |
150 | this.tasklet = tasklet; |
151 | if (tasklet instanceof StepExecutionListener) { |
152 | registerStepExecutionListener((StepExecutionListener) tasklet); |
153 | } |
154 | } |
155 | |
156 | /** |
157 | * Register a chunk listener for callbacks at the appropriate stages in a |
158 | * step execution. |
159 | * |
160 | * @param listener a {@link ChunkListener} |
161 | */ |
162 | public void registerChunkListener(ChunkListener listener) { |
163 | this.chunkListener.register(listener); |
164 | } |
165 | |
166 | /** |
167 | * Register each of the objects as listeners. |
168 | * |
169 | * @param listeners an array of listener objects of known types. |
170 | */ |
171 | public void setChunkListeners(ChunkListener[] listeners) { |
172 | for (int i = 0; i < listeners.length; i++) { |
173 | registerChunkListener(listeners[i]); |
174 | } |
175 | } |
176 | |
177 | /** |
178 | * Register each of the streams for callbacks at the appropriate time in the |
179 | * step. The {@link ItemReader} and {@link ItemWriter} are automatically |
180 | * registered, but it doesn't hurt to also register them here. Injected |
181 | * dependencies of the reader and writer are not automatically registered, |
182 | * so if you implement {@link ItemWriter} using delegation to another object |
183 | * which itself is a {@link ItemStream}, you need to register the delegate |
184 | * here. |
185 | * |
186 | * @param streams an array of {@link ItemStream} objects. |
187 | */ |
188 | public void setStreams(ItemStream[] streams) { |
189 | for (int i = 0; i < streams.length; i++) { |
190 | registerStream(streams[i]); |
191 | } |
192 | } |
193 | |
194 | /** |
195 | * Register a single {@link ItemStream} for callbacks to the stream |
196 | * interface. |
197 | * |
198 | * @param stream |
199 | */ |
200 | public void registerStream(ItemStream stream) { |
201 | this.stream.register(stream); |
202 | } |
203 | |
204 | /** |
205 | * The {@link RepeatOperations} to use for the outer loop of the batch |
206 | * processing. Should be set up by the caller through a factory. Defaults to |
207 | * a plain {@link RepeatTemplate}. |
208 | * |
209 | * @param stepOperations a {@link RepeatOperations} instance. |
210 | */ |
211 | public void setStepOperations(RepeatOperations stepOperations) { |
212 | this.stepOperations = stepOperations; |
213 | } |
214 | |
215 | /** |
216 | * Setter for the {@link StepInterruptionPolicy}. The policy is used to |
217 | * check whether an external request has been made to interrupt the job |
218 | * execution. |
219 | * |
220 | * @param interruptionPolicy a {@link StepInterruptionPolicy} |
221 | */ |
222 | public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) { |
223 | this.interruptionPolicy = interruptionPolicy; |
224 | } |
225 | |
226 | /** |
227 | * Process the step and update its context so that progress can be monitored |
228 | * by the caller. The step is broken down into chunks, each one executing in |
229 | * a transaction. The step and its execution and execution context are all |
230 | * given an up to date {@link BatchStatus}, and the {@link JobRepository} is |
231 | * used to store the result. Various reporting information are also added to |
232 | * the current context governing the step execution, which would normally be |
233 | * available to the caller through the step's {@link ExecutionContext}.<br/> |
234 | * |
235 | * @throws JobInterruptedException if the step or a chunk is interrupted |
236 | * @throws RuntimeException if there is an exception during a chunk |
237 | * execution |
238 | * |
239 | */ |
240 | @Override |
241 | protected void doExecute(StepExecution stepExecution) throws Exception { |
242 | |
243 | stream.update(stepExecution.getExecutionContext()); |
244 | getJobRepository().updateExecutionContext(stepExecution); |
245 | |
246 | // Shared semaphore per step execution, so other step executions can run |
247 | // in parallel without needing the lock |
248 | final Semaphore semaphore = createSemaphore(); |
249 | |
250 | stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { |
251 | |
252 | @Override |
253 | public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) |
254 | throws Exception { |
255 | |
256 | StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); |
257 | |
258 | // Before starting a new transaction, check for |
259 | // interruption. |
260 | interruptionPolicy.checkInterrupted(stepExecution); |
261 | |
262 | RepeatStatus result; |
263 | try { |
264 | result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute) |
265 | .execute(new ChunkTransactionCallback(chunkContext, semaphore)); |
266 | } |
267 | catch (UncheckedTransactionException e) { |
268 | // Allow checked exceptions to be thrown inside callback |
269 | throw (Exception) e.getCause(); |
270 | } |
271 | |
272 | chunkListener.afterChunk(); |
273 | |
274 | // Check for interruption after transaction as well, so that |
275 | // the interrupted exception is correctly propagated up to |
276 | // caller |
277 | interruptionPolicy.checkInterrupted(stepExecution); |
278 | |
279 | return result; |
280 | } |
281 | |
282 | }); |
283 | |
284 | } |
285 | |
286 | /** |
287 | * Extension point mainly for test purposes so that the behaviour of the |
288 | * lock can be manipulated to simulate various pathologies. |
289 | * |
290 | * @return a semaphore for locking access to the JobRepository |
291 | */ |
292 | protected Semaphore createSemaphore() { |
293 | return new Semaphore(1); |
294 | } |
295 | |
296 | protected void close(ExecutionContext ctx) throws Exception { |
297 | stream.close(); |
298 | } |
299 | |
300 | protected void open(ExecutionContext ctx) throws Exception { |
301 | stream.open(ctx); |
302 | } |
303 | |
304 | /** |
305 | * A callback for the transactional work inside a chunk. Also detects |
306 | * failures in the transaction commit and rollback, only panicking if the |
307 | * transaction status is unknown (i.e. if a commit failure leads to a clean |
308 | * rollback then we assume the state is consistent). |
309 | * |
310 | * @author Dave Syer |
311 | * |
312 | */ |
313 | private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback { |
314 | |
315 | private final StepExecution stepExecution; |
316 | |
317 | private final ChunkContext chunkContext; |
318 | |
319 | private boolean rolledBack = false; |
320 | |
321 | private boolean stepExecutionUpdated = false; |
322 | |
323 | private StepExecution oldVersion; |
324 | |
325 | private boolean locked = false; |
326 | |
327 | private final Semaphore semaphore; |
328 | |
329 | public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) { |
330 | this.chunkContext = chunkContext; |
331 | this.stepExecution = chunkContext.getStepContext().getStepExecution(); |
332 | this.semaphore = semaphore; |
333 | } |
334 | |
335 | @Override |
336 | public void afterCompletion(int status) { |
337 | try { |
338 | if (status != TransactionSynchronization.STATUS_COMMITTED) { |
339 | if (stepExecutionUpdated) { |
340 | // Wah! the commit failed. We need to rescue the step |
341 | // execution data. |
342 | logger.info("Commit failed while step execution data was already updated. " |
343 | + "Reverting to old version."); |
344 | copy(oldVersion, stepExecution); |
345 | if (status == TransactionSynchronization.STATUS_ROLLED_BACK) { |
346 | rollback(stepExecution); |
347 | } |
348 | } |
349 | } |
350 | if (status == TransactionSynchronization.STATUS_UNKNOWN) { |
351 | logger.error("Rolling back with transaction in unknown state"); |
352 | rollback(stepExecution); |
353 | stepExecution.upgradeStatus(BatchStatus.UNKNOWN); |
354 | stepExecution.setTerminateOnly(); |
355 | } |
356 | } |
357 | finally { |
358 | // Only release the lock if we acquired it, and release as late |
359 | // as possible |
360 | if (locked) { |
361 | semaphore.release(); |
362 | } |
363 | locked = false; |
364 | } |
365 | } |
366 | |
367 | public Object doInTransaction(TransactionStatus status) { |
368 | |
369 | TransactionSynchronizationManager.registerSynchronization(this); |
370 | |
371 | RepeatStatus result = RepeatStatus.CONTINUABLE; |
372 | |
373 | StepContribution contribution = stepExecution.createStepContribution(); |
374 | |
375 | chunkListener.beforeChunk(); |
376 | |
377 | // In case we need to push it back to its old value |
378 | // after a commit fails... |
379 | oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); |
380 | copy(stepExecution, oldVersion); |
381 | |
382 | try { |
383 | |
384 | try { |
385 | try { |
386 | result = tasklet.execute(contribution, chunkContext); |
387 | if (result == null) { |
388 | result = RepeatStatus.FINISHED; |
389 | } |
390 | } |
391 | catch (Exception e) { |
392 | if (transactionAttribute.rollbackOn(e)) { |
393 | throw e; |
394 | } |
395 | } |
396 | |
397 | } |
398 | finally { |
399 | |
400 | // If the step operations are asynchronous then we need |
401 | // to synchronize changes to the step execution (at a |
402 | // minimum). Take the lock *before* changing the step |
403 | // execution. |
404 | try { |
405 | semaphore.acquire(); |
406 | locked = true; |
407 | } |
408 | catch (InterruptedException e) { |
409 | logger.error("Thread interrupted while locking for repository update"); |
410 | stepExecution.setStatus(BatchStatus.STOPPED); |
411 | stepExecution.setTerminateOnly(); |
412 | Thread.currentThread().interrupt(); |
413 | } |
414 | |
415 | // Apply the contribution to the step |
416 | // even if unsuccessful |
417 | logger.debug("Applying contribution: " + contribution); |
418 | stepExecution.apply(contribution); |
419 | |
420 | } |
421 | |
422 | stepExecutionUpdated = true; |
423 | |
424 | stream.update(stepExecution.getExecutionContext()); |
425 | |
426 | try { |
427 | // Going to attempt a commit. If it fails this flag will |
428 | // stay false and we can use that later. |
429 | getJobRepository().updateExecutionContext(stepExecution); |
430 | stepExecution.incrementCommitCount(); |
431 | logger.debug("Saving step execution before commit: " + stepExecution); |
432 | getJobRepository().update(stepExecution); |
433 | } |
434 | catch (Exception e) { |
435 | // If we get to here there was a problem saving the step |
436 | // execution and we have to fail. |
437 | String msg = "JobRepository failure forcing exit with unknown status"; |
438 | logger.error(msg, e); |
439 | stepExecution.upgradeStatus(BatchStatus.UNKNOWN); |
440 | stepExecution.setTerminateOnly(); |
441 | throw new FatalStepExecutionException(msg, e); |
442 | } |
443 | |
444 | } |
445 | catch (Error e) { |
446 | logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage()); |
447 | rollback(stepExecution); |
448 | throw e; |
449 | } |
450 | catch (RuntimeException e) { |
451 | logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage()); |
452 | rollback(stepExecution); |
453 | throw e; |
454 | } |
455 | catch (Exception e) { |
456 | logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage()); |
457 | rollback(stepExecution); |
458 | // Allow checked exceptions |
459 | throw new UncheckedTransactionException(e); |
460 | } |
461 | |
462 | return result; |
463 | |
464 | } |
465 | |
466 | private void rollback(StepExecution stepExecution) { |
467 | if (!rolledBack) { |
468 | stepExecution.incrementRollbackCount(); |
469 | rolledBack = true; |
470 | } |
471 | } |
472 | |
473 | private void copy(final StepExecution source, final StepExecution target) { |
474 | target.setVersion(source.getVersion()); |
475 | target.setWriteCount(source.getWriteCount()); |
476 | target.setFilterCount(source.getFilterCount()); |
477 | target.setCommitCount(source.getCommitCount()); |
478 | target.setExecutionContext(new ExecutionContext(source.getExecutionContext())); |
479 | } |
480 | |
481 | } |
482 | |
483 | /** |
484 | * Convenience wrapper for a checked exception so that it can cause a |
485 | * rollback and be extracted afterwards. |
486 | * |
487 | * @author Dave Syer |
488 | * |
489 | */ |
490 | private static class UncheckedTransactionException extends RuntimeException { |
491 | |
492 | public UncheckedTransactionException(Exception e) { |
493 | super(e); |
494 | } |
495 | |
496 | } |
497 | |
498 | } |