1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.item; |
17 | |
18 | import org.apache.commons.logging.Log; |
19 | import org.apache.commons.logging.LogFactory; |
20 | import org.springframework.batch.core.BatchStatus; |
21 | import org.springframework.batch.core.JobInterruptedException; |
22 | import org.springframework.batch.core.StepContribution; |
23 | import org.springframework.batch.core.StepExecution; |
24 | import org.springframework.batch.core.StepExecutionListener; |
25 | import org.springframework.batch.core.repository.JobRepository; |
26 | import org.springframework.batch.core.step.AbstractStep; |
27 | import org.springframework.batch.core.step.StepExecutionSynchronizer; |
28 | import org.springframework.batch.core.step.StepExecutionSynchronizerFactory; |
29 | import org.springframework.batch.core.step.StepInterruptionPolicy; |
30 | import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; |
31 | import org.springframework.batch.item.ExecutionContext; |
32 | import org.springframework.batch.item.ItemReader; |
33 | import org.springframework.batch.item.ItemStream; |
34 | import org.springframework.batch.item.ItemWriter; |
35 | import org.springframework.batch.item.support.CompositeItemStream; |
36 | import org.springframework.batch.repeat.ExitStatus; |
37 | import org.springframework.batch.repeat.RepeatCallback; |
38 | import org.springframework.batch.repeat.RepeatContext; |
39 | import org.springframework.batch.repeat.RepeatOperations; |
40 | import org.springframework.batch.repeat.support.RepeatTemplate; |
41 | import org.springframework.transaction.PlatformTransactionManager; |
42 | import org.springframework.transaction.TransactionStatus; |
43 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
44 | import org.springframework.transaction.interceptor.TransactionAttribute; |
45 | |
46 | /** |
47 | * Simple implementation of executing the step as a set of chunks, each chunk |
48 | * surrounded by a transaction. The structure is therefore that of two nested |
49 | * loops, with transaction boundary around the whole inner loop. The outer loop |
50 | * is controlled by the step operations ( |
51 | * {@link #setStepOperations(RepeatOperations)}), and the inner loop by the |
52 | * chunk operations ({@link #setChunkOperations(RepeatOperations)}). The inner |
53 | * loop should always be executed in a single thread, so the chunk operations |
54 | * should not do any concurrent execution. N.B. usually that means that the |
55 | * chunk operations should be a {@link RepeatTemplate} (which is the default).<br/> |
56 | * |
57 | * Clients can use interceptors in the step operations to intercept or listen to |
58 | * the iteration on a step-wide basis, for instance to get a callback when the |
59 | * step is complete. Those that want callbacks at the level of an individual |
60 | * tasks, can specify interceptors for the chunk operations. |
61 | * |
62 | * @author Dave Syer |
63 | * @author Lucas Ward |
64 | * @author Ben Hale |
65 | * @author Robert Kasanicky |
66 | */ |
67 | public class ItemOrientedStep extends AbstractStep { |
68 | |
69 | private static final Log logger = LogFactory.getLog(ItemOrientedStep.class); |
70 | |
71 | private RepeatOperations chunkOperations = new RepeatTemplate(); |
72 | |
73 | private RepeatOperations stepOperations = new RepeatTemplate(); |
74 | |
75 | // default to checking current thread for interruption. |
76 | private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy(); |
77 | |
78 | private CompositeItemStream stream = new CompositeItemStream(); |
79 | |
80 | private PlatformTransactionManager transactionManager; |
81 | |
82 | private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() { |
83 | |
84 | public boolean rollbackOn(Throwable ex) { |
85 | return true; |
86 | } |
87 | |
88 | }; |
89 | |
90 | private ItemHandler itemHandler; |
91 | |
92 | private StepExecutionSynchronizer synchronizer; |
93 | |
94 | /** |
95 | * @param name |
96 | */ |
97 | public ItemOrientedStep(String name) { |
98 | super(name); |
99 | synchronizer = new StepExecutionSynchronizerFactory().getStepExecutionSynchronizer(); |
100 | } |
101 | |
102 | /** |
103 | * Public setter for the {@link PlatformTransactionManager}. |
104 | * |
105 | * @param transactionManager the transaction manager to set |
106 | */ |
107 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
108 | this.transactionManager = transactionManager; |
109 | } |
110 | |
111 | /** |
112 | * Public setter for the {@link TransactionAttribute}. |
113 | * @param transactionAttribute the {@link TransactionAttribute} to set |
114 | */ |
115 | public void setTransactionAttribute(TransactionAttribute transactionAttribute) { |
116 | this.transactionAttribute = transactionAttribute; |
117 | } |
118 | |
119 | /** |
120 | * Public setter for the {@link ItemHandler}. |
121 | * |
122 | * @param itemHandler the {@link ItemHandler} to set |
123 | */ |
124 | public void setItemHandler(ItemHandler itemHandler) { |
125 | this.itemHandler = itemHandler; |
126 | } |
127 | |
128 | /** |
129 | * Register each of the objects as listeners. If the {@link ItemReader} or |
130 | * {@link ItemWriter} themselves implements this interface they will be |
131 | * registered automatically, but their injected dependencies will not be. |
132 | * This is a good way to get access to job parameters and execution context |
133 | * if the tasklet is parameterised. |
134 | * |
135 | * @param listeners an array of listener objects of known types. |
136 | */ |
137 | public void setStepExecutionListeners(StepExecutionListener[] listeners) { |
138 | for (int i = 0; i < listeners.length; i++) { |
139 | registerStepExecutionListener(listeners[i]); |
140 | } |
141 | } |
142 | |
143 | /** |
144 | * Register each of the streams for callbacks at the appropriate time in the |
145 | * step. The {@link ItemReader} and {@link ItemWriter} are automatically |
146 | * registered, but it doesn't hurt to also register them here. Injected |
147 | * dependencies of the reader and writer are not automatically registered, |
148 | * so if you implement {@link ItemWriter} using delegation to another object |
149 | * which itself is a {@link ItemStream}, you need to register the delegate |
150 | * here. |
151 | * |
152 | * @param streams an array of {@link ItemStream} objects. |
153 | */ |
154 | public void setStreams(ItemStream[] streams) { |
155 | for (int i = 0; i < streams.length; i++) { |
156 | registerStream(streams[i]); |
157 | } |
158 | } |
159 | |
160 | /** |
161 | * Register a single {@link ItemStream} for callbacks to the stream |
162 | * interface. |
163 | * |
164 | * @param stream |
165 | */ |
166 | public void registerStream(ItemStream stream) { |
167 | this.stream.register(stream); |
168 | } |
169 | |
170 | /** |
171 | * The {@link RepeatOperations} to use for the outer loop of the batch |
172 | * processing. Should be set up by the caller through a factory. Defaults to |
173 | * a plain {@link RepeatTemplate}. |
174 | * |
175 | * @param stepOperations a {@link RepeatOperations} instance. |
176 | */ |
177 | public void setStepOperations(RepeatOperations stepOperations) { |
178 | this.stepOperations = stepOperations; |
179 | } |
180 | |
181 | /** |
182 | * The {@link RepeatOperations} to use for the inner loop of the batch |
183 | * processing. should be set up by the caller through a factory. defaults to |
184 | * a plain {@link RepeatTemplate}. |
185 | * |
186 | * @param chunkOperations a {@link RepeatOperations} instance. |
187 | */ |
188 | public void setChunkOperations(RepeatOperations chunkOperations) { |
189 | this.chunkOperations = chunkOperations; |
190 | } |
191 | |
192 | /** |
193 | * Setter for the {@link StepInterruptionPolicy}. The policy is used to |
194 | * check whether an external request has been made to interrupt the job |
195 | * execution. |
196 | * |
197 | * @param interruptionPolicy a {@link StepInterruptionPolicy} |
198 | */ |
199 | public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) { |
200 | this.interruptionPolicy = interruptionPolicy; |
201 | } |
202 | |
203 | /** |
204 | * Mostly useful for testing, but could be used to remove dependence on |
205 | * backport concurrency utilities. Public setter for the |
206 | * {@link StepExecutionSynchronizer}. |
207 | * |
208 | * @param synchronizer the {@link StepExecutionSynchronizer} to set |
209 | */ |
210 | public void setSynchronizer(StepExecutionSynchronizer synchronizer) { |
211 | this.synchronizer = synchronizer; |
212 | } |
213 | |
214 | /** |
215 | * Process the step and update its context so that progress can be monitored |
216 | * by the caller. The step is broken down into chunks, each one executing in |
217 | * a transaction. The step and its execution and execution context are all |
218 | * given an up to date {@link BatchStatus}, and the {@link JobRepository} is |
219 | * used to store the result. Various reporting information are also added to |
220 | * the current context (the {@link RepeatContext} governing the step |
221 | * execution, which would normally be available to the caller somehow |
222 | * through the step's {@link ExecutionContext}.<br/> |
223 | * |
224 | * @throws JobInterruptedException if the step or a chunk is interrupted |
225 | * @throws RuntimeException if there is an exception during a chunk |
226 | * execution |
227 | * |
228 | */ |
229 | protected ExitStatus doExecute(final StepExecution stepExecution) throws Exception { |
230 | stream.update(stepExecution.getExecutionContext()); |
231 | getJobRepository().saveOrUpdateExecutionContext(stepExecution); |
232 | itemHandler.mark(); |
233 | |
234 | final ExceptionHolder fatalException = new ExceptionHolder(); |
235 | |
236 | return stepOperations.iterate(new RepeatCallback() { |
237 | |
238 | public ExitStatus doInIteration(RepeatContext context) throws Exception { |
239 | final StepContribution contribution = stepExecution.createStepContribution(); |
240 | // Before starting a new transaction, check for |
241 | // interruption. |
242 | if (stepExecution.isTerminateOnly()) { |
243 | context.setTerminateOnly(); |
244 | } |
245 | interruptionPolicy.checkInterrupted(stepExecution); |
246 | |
247 | ExitStatus exitStatus = ExitStatus.CONTINUABLE; |
248 | |
249 | TransactionStatus transaction = transactionManager.getTransaction(transactionAttribute); |
250 | |
251 | boolean locked = false; |
252 | // Throwable that contains any exceptions that are caught but |
253 | // are propagated up because |
254 | // they shouldn't cause a rollback. |
255 | Throwable nonRollbackException = null; |
256 | |
257 | try { |
258 | |
259 | exitStatus = processChunk(stepExecution, contribution); |
260 | |
261 | contribution.incrementCommitCount(); |
262 | |
263 | // If the step operations are asynchronous then we need |
264 | // to synchronize changes to the step execution (at a |
265 | // minimum). |
266 | try { |
267 | synchronizer.lock(stepExecution); |
268 | locked = true; |
269 | } |
270 | catch (InterruptedException e) { |
271 | stepExecution.setStatus(BatchStatus.STOPPED); |
272 | Thread.currentThread().interrupt(); |
273 | } |
274 | |
275 | // Apply the contribution to the step |
276 | // only if chunk was successful |
277 | stepExecution.apply(contribution); |
278 | |
279 | // Attempt to flush before the step execution and stream |
280 | // state are updated |
281 | itemHandler.flush(); |
282 | |
283 | stream.update(stepExecution.getExecutionContext()); |
284 | |
285 | try { |
286 | getJobRepository().saveOrUpdateExecutionContext(stepExecution); |
287 | } |
288 | catch (Exception e) { |
289 | fatalException.setException(e); |
290 | stepExecution.setStatus(BatchStatus.UNKNOWN); |
291 | throw new FatalException("Fatal error detected during save of step execution context", e); |
292 | } |
293 | |
294 | try { |
295 | itemHandler.mark(); |
296 | transactionManager.commit(transaction); |
297 | } |
298 | catch (Exception e) { |
299 | fatalException.setException(e); |
300 | stepExecution.setStatus(BatchStatus.UNKNOWN); |
301 | logger.error("Fatal error detected during commit."); |
302 | throw new FatalException("Fatal error detected during commit", e); |
303 | } |
304 | |
305 | try { |
306 | getJobRepository().saveOrUpdate(stepExecution); |
307 | } |
308 | catch (Exception e) { |
309 | fatalException.setException(e); |
310 | stepExecution.setStatus(BatchStatus.UNKNOWN); |
311 | throw new FatalException("Fatal error detected during update of step execution", e); |
312 | } |
313 | |
314 | } |
315 | catch (Error e) { |
316 | try { |
317 | processRollback(stepExecution, contribution, fatalException, transaction); |
318 | } |
319 | catch (Exception rollbackException) { |
320 | logger.error("Rollback failed, original exception that caused the rollback is", e); |
321 | throw rollbackException; |
322 | } |
323 | throw e; |
324 | } |
325 | catch (Exception e) { |
326 | try { |
327 | processRollback(stepExecution, contribution, fatalException, transaction); |
328 | } |
329 | catch (Exception rollbackException) { |
330 | logger.error("Rollback failed, original exception that caused the rollback is", e); |
331 | throw rollbackException; |
332 | } |
333 | throw e; |
334 | } |
335 | finally { |
336 | // only release the lock if we acquired it |
337 | if (locked) { |
338 | synchronizer.release(stepExecution); |
339 | } |
340 | locked = false; |
341 | |
342 | // Even if the TransactionAttributes dictate that an |
343 | // exception shouldn't cause a rollback, |
344 | // we still need to throw the exception so the |
345 | // ExceptionHandler can make a determination about |
346 | // whether the exception should cause job failure. |
347 | if (nonRollbackException != null) { |
348 | if (nonRollbackException instanceof Error) { |
349 | throw (Error) nonRollbackException; |
350 | } |
351 | else { |
352 | throw (Exception) nonRollbackException; |
353 | } |
354 | } |
355 | } |
356 | |
357 | // Check for interruption after transaction as well, so that |
358 | // the interrupted exception is correctly propagated up to |
359 | // caller |
360 | interruptionPolicy.checkInterrupted(stepExecution); |
361 | |
362 | return exitStatus; |
363 | } |
364 | |
365 | }); |
366 | |
367 | } |
368 | |
369 | /** |
370 | * Execute a bunch of identical business logic operations all within a |
371 | * transaction. The transaction is programmatically started and stopped |
372 | * outside this method, so subclasses that override do not need to create a |
373 | * transaction. |
374 | * @param execution the current {@link StepExecution} which should be |
375 | * treated as read-only for the purposes of this method. |
376 | * @param contribution the current {@link StepContribution} which can accept |
377 | * changes to be aggregated later into the step execution. |
378 | * |
379 | * @return true if there is more data to process. |
380 | */ |
381 | protected ExitStatus processChunk(final StepExecution execution, final StepContribution contribution) { |
382 | ExitStatus result = chunkOperations.iterate(new RepeatCallback() { |
383 | public ExitStatus doInIteration(final RepeatContext context) throws Exception { |
384 | if (execution.isTerminateOnly()) { |
385 | context.setTerminateOnly(); |
386 | } |
387 | // check for interruption before each item as well |
388 | interruptionPolicy.checkInterrupted(execution); |
389 | ExitStatus exitStatus = ExitStatus.FINISHED; |
390 | |
391 | exitStatus = itemHandler.handle(contribution); |
392 | |
393 | // check for interruption after each item as well |
394 | interruptionPolicy.checkInterrupted(execution); |
395 | return exitStatus; |
396 | } |
397 | }); |
398 | return result; |
399 | } |
400 | |
401 | /** |
402 | * @param stepExecution |
403 | * @param contribution |
404 | * @param fatalException |
405 | * @param transaction |
406 | */ |
407 | private void processRollback(final StepExecution stepExecution, final StepContribution contribution, |
408 | final ExceptionHolder fatalException, TransactionStatus transaction) { |
409 | |
410 | stepExecution.incrementReadSkipCountBy(contribution.getReadSkipCount()); |
411 | stepExecution.incrementWriteSkipCountBy(contribution.getWriteSkipCount()); |
412 | /* |
413 | * Any exception thrown within the transaction should automatically |
414 | * cause the transaction to rollback. |
415 | */ |
416 | stepExecution.rollback(); |
417 | |
418 | try { |
419 | itemHandler.reset(); |
420 | itemHandler.clear(); |
421 | transactionManager.rollback(transaction); |
422 | } |
423 | catch (Exception e) { |
424 | /* |
425 | * If we already failed to commit, it doesn't help to do this again |
426 | * - it's better to allow the CommitFailedException to propagate |
427 | */ |
428 | if (!fatalException.hasException()) { |
429 | fatalException.setException(e); |
430 | throw new FatalException("Failed while processing rollback", e); |
431 | } |
432 | } |
433 | |
434 | if (fatalException.hasException()) { |
435 | // Try and give the system a chance to update the stepExecution with |
436 | // the failure status. |
437 | getJobRepository().saveOrUpdate(stepExecution); |
438 | } |
439 | } |
440 | |
441 | private static class ExceptionHolder { |
442 | |
443 | private Exception exception; |
444 | |
445 | public boolean hasException() { |
446 | return exception != null; |
447 | } |
448 | |
449 | public void setException(Exception exception) { |
450 | this.exception = exception; |
451 | } |
452 | |
453 | public Exception getException() { |
454 | return this.exception; |
455 | } |
456 | |
457 | } |
458 | |
459 | protected void close(ExecutionContext ctx) throws Exception { |
460 | stream.close(ctx); |
461 | } |
462 | |
463 | protected void open(ExecutionContext ctx) throws Exception { |
464 | stream.open(ctx); |
465 | } |
466 | } |