1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.item; |
17 | |
18 | import org.apache.commons.logging.Log; |
19 | import org.apache.commons.logging.LogFactory; |
20 | import org.springframework.batch.core.BatchStatus; |
21 | import org.springframework.batch.core.JobInterruptedException; |
22 | import org.springframework.batch.core.StepContribution; |
23 | import org.springframework.batch.core.StepExecution; |
24 | import org.springframework.batch.core.StepExecutionListener; |
25 | import org.springframework.batch.core.repository.JobRepository; |
26 | import org.springframework.batch.core.step.AbstractStep; |
27 | import org.springframework.batch.core.step.StepExecutionSynchronizer; |
28 | import org.springframework.batch.core.step.StepExecutionSynchronizerFactory; |
29 | import org.springframework.batch.core.step.StepInterruptionPolicy; |
30 | import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; |
31 | import org.springframework.batch.item.ExecutionContext; |
32 | import org.springframework.batch.item.ItemReader; |
33 | import org.springframework.batch.item.ItemStream; |
34 | import org.springframework.batch.item.ItemWriter; |
35 | import org.springframework.batch.item.support.CompositeItemStream; |
36 | import org.springframework.batch.repeat.ExitStatus; |
37 | import org.springframework.batch.repeat.RepeatCallback; |
38 | import org.springframework.batch.repeat.RepeatContext; |
39 | import org.springframework.batch.repeat.RepeatOperations; |
40 | import org.springframework.batch.repeat.support.RepeatTemplate; |
41 | import org.springframework.transaction.PlatformTransactionManager; |
42 | import org.springframework.transaction.TransactionStatus; |
43 | import org.springframework.transaction.support.DefaultTransactionDefinition; |
44 | |
45 | /** |
46 | * Simple implementation of executing the step as a set of chunks, each chunk |
47 | * surrounded by a transaction. The structure is therefore that of two nested |
48 | * loops, with transaction boundary around the whole inner loop. The outer loop |
49 | * is controlled by the step operations ({@link #setStepOperations(RepeatOperations)}), |
50 | * and the inner loop by the chunk operations ({@link #setChunkOperations(RepeatOperations)}). |
51 | * The inner loop should always be executed in a single thread, so the chunk |
52 | * operations should not do any concurrent execution. N.B. usually that means |
53 | * that the chunk operations should be a {@link RepeatTemplate} (which is the |
54 | * default).<br/> |
55 | * |
56 | * Clients can use interceptors in the step operations to intercept or listen to |
57 | * the iteration on a step-wide basis, for instance to get a callback when the |
58 | * step is complete. Those that want callbacks at the level of an individual |
59 | * tasks, can specify interceptors for the chunk operations. |
60 | * |
61 | * @author Dave Syer |
62 | * @author Lucas Ward |
63 | * @author Ben Hale |
64 | * @author Robert Kasanicky |
65 | */ |
66 | public class ItemOrientedStep extends AbstractStep { |
67 | |
68 | private static final Log logger = LogFactory.getLog(ItemOrientedStep.class); |
69 | |
70 | private RepeatOperations chunkOperations = new RepeatTemplate(); |
71 | |
72 | private RepeatOperations stepOperations = new RepeatTemplate(); |
73 | |
74 | // default to checking current thread for interruption. |
75 | private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy(); |
76 | |
77 | private CompositeItemStream stream = new CompositeItemStream(); |
78 | |
79 | private PlatformTransactionManager transactionManager; |
80 | |
81 | private ItemHandler itemHandler; |
82 | |
83 | private StepExecutionSynchronizer synchronizer; |
84 | |
85 | /** |
86 | * @param name |
87 | */ |
88 | public ItemOrientedStep(String name) { |
89 | super(name); |
90 | synchronizer = new StepExecutionSynchronizerFactory().getStepExecutionSynchronizer(); |
91 | } |
92 | |
93 | /** |
94 | * Public setter for the {@link PlatformTransactionManager}. |
95 | * |
96 | * @param transactionManager the transaction manager to set |
97 | */ |
98 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
99 | this.transactionManager = transactionManager; |
100 | } |
101 | |
102 | /** |
103 | * Public setter for the {@link ItemHandler}. |
104 | * |
105 | * @param itemHandler the {@link ItemHandler} to set |
106 | */ |
107 | public void setItemHandler(ItemHandler itemHandler) { |
108 | this.itemHandler = itemHandler; |
109 | } |
110 | |
111 | /** |
112 | * Register each of the objects as listeners. If the {@link ItemReader} or |
113 | * {@link ItemWriter} themselves implements this interface they will be |
114 | * registered automatically, but their injected dependencies will not be. |
115 | * This is a good way to get access to job parameters and execution context |
116 | * if the tasklet is parameterised. |
117 | * |
118 | * @param listeners an array of listener objects of known types. |
119 | */ |
120 | public void setStepExecutionListeners(StepExecutionListener[] listeners) { |
121 | for (int i = 0; i < listeners.length; i++) { |
122 | registerStepExecutionListener(listeners[i]); |
123 | } |
124 | } |
125 | |
126 | /** |
127 | * Register each of the streams for callbacks at the appropriate time in the |
128 | * step. The {@link ItemReader} and {@link ItemWriter} are automatically |
129 | * registered, but it doesn't hurt to also register them here. Injected |
130 | * dependencies of the reader and writer are not automatically registered, |
131 | * so if you implement {@link ItemWriter} using delegation to another object |
132 | * which itself is a {@link ItemStream}, you need to register the delegate |
133 | * here. |
134 | * |
135 | * @param streams an array of {@link ItemStream} objects. |
136 | */ |
137 | public void setStreams(ItemStream[] streams) { |
138 | for (int i = 0; i < streams.length; i++) { |
139 | registerStream(streams[i]); |
140 | } |
141 | } |
142 | |
143 | /** |
144 | * Register a single {@link ItemStream} for callbacks to the stream |
145 | * interface. |
146 | * |
147 | * @param stream |
148 | */ |
149 | public void registerStream(ItemStream stream) { |
150 | this.stream.register(stream); |
151 | } |
152 | |
153 | /** |
154 | * The {@link RepeatOperations} to use for the outer loop of the batch |
155 | * processing. Should be set up by the caller through a factory. Defaults to |
156 | * a plain {@link RepeatTemplate}. |
157 | * |
158 | * @param stepOperations a {@link RepeatOperations} instance. |
159 | */ |
160 | public void setStepOperations(RepeatOperations stepOperations) { |
161 | this.stepOperations = stepOperations; |
162 | } |
163 | |
164 | /** |
165 | * The {@link RepeatOperations} to use for the inner loop of the batch |
166 | * processing. should be set up by the caller through a factory. defaults to |
167 | * a plain {@link RepeatTemplate}. |
168 | * |
169 | * @param chunkOperations a {@link RepeatOperations} instance. |
170 | */ |
171 | public void setChunkOperations(RepeatOperations chunkOperations) { |
172 | this.chunkOperations = chunkOperations; |
173 | } |
174 | |
175 | /** |
176 | * Setter for the {@link StepInterruptionPolicy}. The policy is used to |
177 | * check whether an external request has been made to interrupt the job |
178 | * execution. |
179 | * |
180 | * @param interruptionPolicy a {@link StepInterruptionPolicy} |
181 | */ |
182 | public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) { |
183 | this.interruptionPolicy = interruptionPolicy; |
184 | } |
185 | |
186 | /** |
187 | * Mostly useful for testing, but could be used to remove dependence on |
188 | * backport concurrency utilities. Public setter for the |
189 | * {@link StepExecutionSynchronizer}. |
190 | * |
191 | * @param synchronizer the {@link StepExecutionSynchronizer} to set |
192 | */ |
193 | public void setSynchronizer(StepExecutionSynchronizer synchronizer) { |
194 | this.synchronizer = synchronizer; |
195 | } |
196 | |
197 | /** |
198 | * Process the step and update its context so that progress can be monitored |
199 | * by the caller. The step is broken down into chunks, each one executing in |
200 | * a transaction. The step and its execution and execution context are all |
201 | * given an up to date {@link BatchStatus}, and the {@link JobRepository} |
202 | * is used to store the result. Various reporting information are also added |
203 | * to the current context (the {@link RepeatContext} governing the step |
204 | * execution, which would normally be available to the caller somehow |
205 | * through the step's {@link ExecutionContext}.<br/> |
206 | * |
207 | * @throws JobInterruptedException if the step or a chunk is interrupted |
208 | * @throws RuntimeException if there is an exception during a chunk |
209 | * execution |
210 | * |
211 | */ |
212 | protected ExitStatus doExecute(final StepExecution stepExecution) throws Exception { |
213 | stream.update(stepExecution.getExecutionContext()); |
214 | getJobRepository().saveOrUpdateExecutionContext(stepExecution); |
215 | itemHandler.mark(); |
216 | |
217 | final ExceptionHolder fatalException = new ExceptionHolder(); |
218 | |
219 | return stepOperations.iterate(new RepeatCallback() { |
220 | |
221 | public ExitStatus doInIteration(RepeatContext context) throws Exception { |
222 | final StepContribution contribution = stepExecution.createStepContribution(); |
223 | // Before starting a new transaction, check for |
224 | // interruption. |
225 | if (stepExecution.isTerminateOnly()) { |
226 | context.setTerminateOnly(); |
227 | } |
228 | interruptionPolicy.checkInterrupted(stepExecution); |
229 | |
230 | ExitStatus exitStatus = ExitStatus.CONTINUABLE; |
231 | |
232 | TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition()); |
233 | |
234 | try { |
235 | |
236 | exitStatus = processChunk(stepExecution, contribution); |
237 | contribution.incrementCommitCount(); |
238 | |
239 | // If the step operations are asynchronous then we need |
240 | // to synchronize changes to the step execution (at a |
241 | // minimum). |
242 | try { |
243 | synchronizer.lock(stepExecution); |
244 | } |
245 | catch (InterruptedException e) { |
246 | stepExecution.setStatus(BatchStatus.STOPPED); |
247 | Thread.currentThread().interrupt(); |
248 | } |
249 | |
250 | // Apply the contribution to the step |
251 | // only if chunk was successful |
252 | stepExecution.apply(contribution); |
253 | |
254 | // Attempt to flush before the step execution and stream |
255 | // state are updated |
256 | itemHandler.flush(); |
257 | |
258 | stream.update(stepExecution.getExecutionContext()); |
259 | try { |
260 | getJobRepository().saveOrUpdateExecutionContext(stepExecution); |
261 | } |
262 | catch (Exception e) { |
263 | fatalException.setException(e); |
264 | stepExecution.setStatus(BatchStatus.UNKNOWN); |
265 | throw new FatalException("Fatal error detected during save of step execution context", e); |
266 | } |
267 | |
268 | try { |
269 | itemHandler.mark(); |
270 | transactionManager.commit(transaction); |
271 | } |
272 | catch (Exception e) { |
273 | fatalException.setException(e); |
274 | stepExecution.setStatus(BatchStatus.UNKNOWN); |
275 | logger.error("Fatal error detected during commit."); |
276 | throw new FatalException("Fatal error detected during commit", e); |
277 | } |
278 | |
279 | } |
280 | catch (Error e) { |
281 | processRollback(stepExecution, contribution, fatalException, transaction); |
282 | throw e; |
283 | } |
284 | catch (Exception e) { |
285 | processRollback(stepExecution, contribution, fatalException, transaction); |
286 | throw e; |
287 | } |
288 | finally { |
289 | synchronizer.release(stepExecution); |
290 | } |
291 | |
292 | // Check for interruption after transaction as well, so that |
293 | // the interrupted exception is correctly propagated up to |
294 | // caller |
295 | interruptionPolicy.checkInterrupted(stepExecution); |
296 | |
297 | return exitStatus; |
298 | } |
299 | |
300 | }); |
301 | |
302 | } |
303 | |
304 | /** |
305 | * Execute a bunch of identical business logic operations all within a |
306 | * transaction. The transaction is programmatically started and stopped |
307 | * outside this method, so subclasses that override do not need to create a |
308 | * transaction. |
309 | * @param execution the current {@link StepExecution} which should be |
310 | * treated as read-only for the purposes of this method. |
311 | * @param contribution the current {@link StepContribution} which can accept |
312 | * changes to be aggregated later into the step execution. |
313 | * |
314 | * @return true if there is more data to process. |
315 | */ |
316 | protected ExitStatus processChunk(final StepExecution execution, final StepContribution contribution) { |
317 | ExitStatus result = chunkOperations.iterate(new RepeatCallback() { |
318 | public ExitStatus doInIteration(final RepeatContext context) throws Exception { |
319 | if (execution.isTerminateOnly()) { |
320 | context.setTerminateOnly(); |
321 | } |
322 | // check for interruption before each item as well |
323 | interruptionPolicy.checkInterrupted(execution); |
324 | ExitStatus exitStatus = itemHandler.handle(contribution); |
325 | // check for interruption after each item as well |
326 | interruptionPolicy.checkInterrupted(execution); |
327 | return exitStatus; |
328 | } |
329 | }); |
330 | return result; |
331 | } |
332 | |
333 | /** |
334 | * @param stepExecution |
335 | * @param contribution |
336 | * @param fatalException |
337 | * @param transaction |
338 | */ |
339 | private void processRollback(final StepExecution stepExecution, final StepContribution contribution, |
340 | final ExceptionHolder fatalException, TransactionStatus transaction) { |
341 | |
342 | stepExecution.incrementSkipCountBy(contribution.getSkipCount()); |
343 | /* |
344 | * Any exception thrown within the transaction should automatically |
345 | * cause the transaction to rollback. |
346 | */ |
347 | stepExecution.rollback(); |
348 | |
349 | try { |
350 | itemHandler.reset(); |
351 | itemHandler.clear(); |
352 | transactionManager.rollback(transaction); |
353 | } |
354 | catch (Exception e) { |
355 | /* |
356 | * If we already failed to commit, it doesn't help to do this again - |
357 | * it's better to allow the CommitFailedException to propagate |
358 | */ |
359 | if (!fatalException.hasException()) { |
360 | fatalException.setException(e); |
361 | throw new FatalException("Failed while processing rollback", e); |
362 | } |
363 | } |
364 | } |
365 | |
366 | private static class ExceptionHolder { |
367 | |
368 | private Exception exception; |
369 | |
370 | public boolean hasException() { |
371 | return exception != null; |
372 | } |
373 | |
374 | public void setException(Exception exception) { |
375 | this.exception = exception; |
376 | } |
377 | |
378 | public Exception getException() { |
379 | return this.exception; |
380 | } |
381 | |
382 | } |
383 | |
384 | protected void close(ExecutionContext ctx) throws Exception { |
385 | stream.close(ctx); |
386 | } |
387 | |
388 | protected void open(ExecutionContext ctx) throws Exception { |
389 | stream.open(ctx); |
390 | } |
391 | } |