1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.springframework.batch.core.step.tasklet;
17
18 import java.util.concurrent.Semaphore;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.springframework.batch.core.BatchStatus;
23 import org.springframework.batch.core.ChunkListener;
24 import org.springframework.batch.core.JobInterruptedException;
25 import org.springframework.batch.core.StepContribution;
26 import org.springframework.batch.core.StepExecution;
27 import org.springframework.batch.core.StepExecutionListener;
28 import org.springframework.batch.core.listener.CompositeChunkListener;
29 import org.springframework.batch.core.repository.JobRepository;
30 import org.springframework.batch.core.scope.context.ChunkContext;
31 import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
32 import org.springframework.batch.core.step.AbstractStep;
33 import org.springframework.batch.core.step.FatalStepExecutionException;
34 import org.springframework.batch.core.step.StepInterruptionPolicy;
35 import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
36 import org.springframework.batch.item.ExecutionContext;
37 import org.springframework.batch.item.ItemReader;
38 import org.springframework.batch.item.ItemStream;
39 import org.springframework.batch.item.ItemWriter;
40 import org.springframework.batch.item.support.CompositeItemStream;
41 import org.springframework.batch.repeat.RepeatContext;
42 import org.springframework.batch.repeat.RepeatOperations;
43 import org.springframework.batch.repeat.RepeatStatus;
44 import org.springframework.batch.repeat.support.RepeatTemplate;
45 import org.springframework.transaction.PlatformTransactionManager;
46 import org.springframework.transaction.TransactionStatus;
47 import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48 import org.springframework.transaction.interceptor.TransactionAttribute;
49 import org.springframework.transaction.support.TransactionCallback;
50 import org.springframework.transaction.support.TransactionSynchronization;
51 import org.springframework.transaction.support.TransactionSynchronizationAdapter;
52 import org.springframework.transaction.support.TransactionSynchronizationManager;
53 import org.springframework.transaction.support.TransactionTemplate;
54 import org.springframework.util.Assert;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class TaskletStep extends AbstractStep {
75
76 private static final Log logger = LogFactory.getLog(TaskletStep.class);
77
78 private RepeatOperations stepOperations = new RepeatTemplate();
79
80 private CompositeChunkListener chunkListener = new CompositeChunkListener();
81
82
83 private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
84
85 private CompositeItemStream stream = new CompositeItemStream();
86
87 private PlatformTransactionManager transactionManager;
88
89 private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
90
91 @Override
92 public boolean rollbackOn(Throwable ex) {
93 return true;
94 }
95
96 };
97
98 private Tasklet tasklet;
99
100
101
102
103 public TaskletStep() {
104 this(null);
105 }
106
107
108
109
110 public TaskletStep(String name) {
111 super(name);
112 }
113
114
115
116
117
118
119
120 @Override
121 public void afterPropertiesSet() throws Exception {
122 super.afterPropertiesSet();
123 Assert.state(transactionManager != null, "A transaction manager must be provided");
124 }
125
126
127
128
129
130
131 public void setTransactionManager(PlatformTransactionManager transactionManager) {
132 this.transactionManager = transactionManager;
133 }
134
135
136
137
138
139
140 public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
141 this.transactionAttribute = transactionAttribute;
142 }
143
144
145
146
147
148
149 public void setTasklet(Tasklet tasklet) {
150 this.tasklet = tasklet;
151 if (tasklet instanceof StepExecutionListener) {
152 registerStepExecutionListener((StepExecutionListener) tasklet);
153 }
154 }
155
156
157
158
159
160
161
162 public void registerChunkListener(ChunkListener listener) {
163 this.chunkListener.register(listener);
164 }
165
166
167
168
169
170
171 public void setChunkListeners(ChunkListener[] listeners) {
172 for (int i = 0; i < listeners.length; i++) {
173 registerChunkListener(listeners[i]);
174 }
175 }
176
177
178
179
180
181
182
183
184
185
186
187
188 public void setStreams(ItemStream[] streams) {
189 for (int i = 0; i < streams.length; i++) {
190 registerStream(streams[i]);
191 }
192 }
193
194
195
196
197
198
199
200 public void registerStream(ItemStream stream) {
201 this.stream.register(stream);
202 }
203
204
205
206
207
208
209
210
211 public void setStepOperations(RepeatOperations stepOperations) {
212 this.stepOperations = stepOperations;
213 }
214
215
216
217
218
219
220
221
222 public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
223 this.interruptionPolicy = interruptionPolicy;
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 @Override
241 protected void doExecute(StepExecution stepExecution) throws Exception {
242
243 stream.update(stepExecution.getExecutionContext());
244 getJobRepository().updateExecutionContext(stepExecution);
245
246
247
248 final Semaphore semaphore = createSemaphore();
249
250 stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
251
252 @Override
253 public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
254 throws Exception {
255
256 StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
257
258
259
260 interruptionPolicy.checkInterrupted(stepExecution);
261
262 RepeatStatus result;
263 try {
264 result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute)
265 .execute(new ChunkTransactionCallback(chunkContext, semaphore));
266 }
267 catch (UncheckedTransactionException e) {
268
269 throw (Exception) e.getCause();
270 }
271
272 chunkListener.afterChunk();
273
274
275
276
277 interruptionPolicy.checkInterrupted(stepExecution);
278
279 return result;
280 }
281
282 });
283
284 }
285
286
287
288
289
290
291
292 protected Semaphore createSemaphore() {
293 return new Semaphore(1);
294 }
295
296 protected void close(ExecutionContext ctx) throws Exception {
297 stream.close();
298 }
299
300 protected void open(ExecutionContext ctx) throws Exception {
301 stream.open(ctx);
302 }
303
304
305
306
307
308
309
310
311
312
313 private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback {
314
315 private final StepExecution stepExecution;
316
317 private final ChunkContext chunkContext;
318
319 private boolean rolledBack = false;
320
321 private boolean stepExecutionUpdated = false;
322
323 private StepExecution oldVersion;
324
325 private boolean locked = false;
326
327 private final Semaphore semaphore;
328
329 public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
330 this.chunkContext = chunkContext;
331 this.stepExecution = chunkContext.getStepContext().getStepExecution();
332 this.semaphore = semaphore;
333 }
334
335 @Override
336 public void afterCompletion(int status) {
337 try {
338 if (status != TransactionSynchronization.STATUS_COMMITTED) {
339 if (stepExecutionUpdated) {
340
341
342 logger.info("Commit failed while step execution data was already updated. "
343 + "Reverting to old version.");
344 copy(oldVersion, stepExecution);
345 if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
346 rollback(stepExecution);
347 }
348 }
349 }
350 if (status == TransactionSynchronization.STATUS_UNKNOWN) {
351 logger.error("Rolling back with transaction in unknown state");
352 rollback(stepExecution);
353 stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
354 stepExecution.setTerminateOnly();
355 }
356 }
357 finally {
358
359
360 if (locked) {
361 semaphore.release();
362 }
363 locked = false;
364 }
365 }
366
367 public Object doInTransaction(TransactionStatus status) {
368
369 TransactionSynchronizationManager.registerSynchronization(this);
370
371 RepeatStatus result = RepeatStatus.CONTINUABLE;
372
373 StepContribution contribution = stepExecution.createStepContribution();
374
375 chunkListener.beforeChunk();
376
377
378
379 oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
380 copy(stepExecution, oldVersion);
381
382 try {
383
384 try {
385 try {
386 result = tasklet.execute(contribution, chunkContext);
387 if (result == null) {
388 result = RepeatStatus.FINISHED;
389 }
390 }
391 catch (Exception e) {
392 if (transactionAttribute.rollbackOn(e)) {
393 throw e;
394 }
395 }
396
397 }
398 finally {
399
400
401
402
403
404 try {
405 semaphore.acquire();
406 locked = true;
407 }
408 catch (InterruptedException e) {
409 logger.error("Thread interrupted while locking for repository update");
410 stepExecution.setStatus(BatchStatus.STOPPED);
411 stepExecution.setTerminateOnly();
412 Thread.currentThread().interrupt();
413 }
414
415
416
417 logger.debug("Applying contribution: " + contribution);
418 stepExecution.apply(contribution);
419
420 }
421
422 stepExecutionUpdated = true;
423
424 stream.update(stepExecution.getExecutionContext());
425
426 try {
427
428
429 getJobRepository().updateExecutionContext(stepExecution);
430 stepExecution.incrementCommitCount();
431 logger.debug("Saving step execution before commit: " + stepExecution);
432 getJobRepository().update(stepExecution);
433 }
434 catch (Exception e) {
435
436
437 String msg = "JobRepository failure forcing exit with unknown status";
438 logger.error(msg, e);
439 stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
440 stepExecution.setTerminateOnly();
441 throw new FatalStepExecutionException(msg, e);
442 }
443
444 }
445 catch (Error e) {
446 logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
447 rollback(stepExecution);
448 throw e;
449 }
450 catch (RuntimeException e) {
451 logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
452 rollback(stepExecution);
453 throw e;
454 }
455 catch (Exception e) {
456 logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
457 rollback(stepExecution);
458
459 throw new UncheckedTransactionException(e);
460 }
461
462 return result;
463
464 }
465
466 private void rollback(StepExecution stepExecution) {
467 if (!rolledBack) {
468 stepExecution.incrementRollbackCount();
469 rolledBack = true;
470 }
471 }
472
473 private void copy(final StepExecution source, final StepExecution target) {
474 target.setVersion(source.getVersion());
475 target.setWriteCount(source.getWriteCount());
476 target.setFilterCount(source.getFilterCount());
477 target.setCommitCount(source.getCommitCount());
478 target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
479 }
480
481 }
482
483
484
485
486
487
488
489
490 private static class UncheckedTransactionException extends RuntimeException {
491
492 public UncheckedTransactionException(Exception e) {
493 super(e);
494 }
495
496 }
497
498 }