1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.tasklet; |
17 | |
18 | import java.util.concurrent.Semaphore; |
19 | |
20 | import org.apache.commons.logging.Log; |
21 | import org.apache.commons.logging.LogFactory; |
22 | import org.springframework.batch.core.BatchStatus; |
23 | import org.springframework.batch.core.ChunkListener; |
24 | import org.springframework.batch.core.JobInterruptedException; |
25 | import org.springframework.batch.core.StepContribution; |
26 | import org.springframework.batch.core.StepExecution; |
27 | import org.springframework.batch.core.StepExecutionListener; |
28 | import org.springframework.batch.core.listener.CompositeChunkListener; |
29 | import org.springframework.batch.core.repository.JobRepository; |
30 | import org.springframework.batch.core.scope.context.ChunkContext; |
31 | import org.springframework.batch.core.scope.context.StepContextRepeatCallback; |
32 | import org.springframework.batch.core.step.AbstractStep; |
33 | import org.springframework.batch.core.step.StepInterruptionPolicy; |
34 | import org.springframework.batch.core.step.ThreadStepInterruptionPolicy; |
35 | import org.springframework.batch.item.ExecutionContext; |
36 | import org.springframework.batch.item.ItemReader; |
37 | import org.springframework.batch.item.ItemStream; |
38 | import org.springframework.batch.item.ItemWriter; |
39 | import org.springframework.batch.item.support.CompositeItemStream; |
40 | import org.springframework.batch.repeat.RepeatContext; |
41 | import org.springframework.batch.repeat.RepeatOperations; |
42 | import org.springframework.batch.repeat.RepeatStatus; |
43 | import org.springframework.batch.repeat.support.RepeatTemplate; |
44 | import org.springframework.transaction.PlatformTransactionManager; |
45 | import org.springframework.transaction.TransactionStatus; |
46 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
47 | import org.springframework.transaction.interceptor.TransactionAttribute; |
48 | import org.springframework.util.Assert; |
49 | |
50 | /** |
51 | * Simple implementation of executing the step as a call to a {@link Tasklet}, |
52 | * possibly repeated, and each call surrounded by a transaction. The structure |
53 | * is therefore that of a loop with transaction boundary inside the loop. The |
54 | * loop is controlled by the step operations ( |
55 | * {@link #setStepOperations(RepeatOperations)}).<br/> |
56 | * <br/> |
57 | * |
58 | * Clients can use interceptors in the step operations to intercept or listen to |
59 | * the iteration on a step-wide basis, for instance to get a callback when the |
60 | * step is complete. Those that want callbacks at the level of an individual |
61 | * tasks, can specify interceptors for the chunk operations. |
62 | * |
63 | * @author Dave Syer |
64 | * @author Lucas Ward |
65 | * @author Ben Hale |
66 | * @author Robert Kasanicky |
67 | */ |
68 | public class TaskletStep extends AbstractStep { |
69 | |
70 | private static final Log logger = LogFactory.getLog(TaskletStep.class); |
71 | |
72 | private RepeatOperations stepOperations = new RepeatTemplate(); |
73 | |
74 | private CompositeChunkListener chunkListener = new CompositeChunkListener(); |
75 | |
76 | // default to checking current thread for interruption. |
77 | private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy(); |
78 | |
79 | private CompositeItemStream stream = new CompositeItemStream(); |
80 | |
81 | private PlatformTransactionManager transactionManager; |
82 | |
83 | private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() { |
84 | |
85 | @Override |
86 | public boolean rollbackOn(Throwable ex) { |
87 | return true; |
88 | } |
89 | |
90 | }; |
91 | |
92 | private Tasklet tasklet; |
93 | |
94 | private Semaphore semaphore = new Semaphore(1); |
95 | |
96 | /** |
97 | * Default constructor. |
98 | */ |
99 | public TaskletStep() { |
100 | this(null); |
101 | } |
102 | |
103 | /** |
104 | * @param name |
105 | */ |
106 | public TaskletStep(String name) { |
107 | super(name); |
108 | } |
109 | |
110 | /* |
111 | * (non-Javadoc) |
112 | * |
113 | * @see |
114 | * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet() |
115 | */ |
116 | @Override |
117 | public void afterPropertiesSet() throws Exception { |
118 | super.afterPropertiesSet(); |
119 | Assert.notNull(transactionManager, "TransactionManager is mandatory"); |
120 | } |
121 | |
122 | /** |
123 | * Public setter for the {@link PlatformTransactionManager}. |
124 | * |
125 | * @param transactionManager the transaction manager to set |
126 | */ |
127 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
128 | this.transactionManager = transactionManager; |
129 | } |
130 | |
131 | /** |
132 | * Public setter for the {@link TransactionAttribute}. |
133 | * |
134 | * @param transactionAttribute the {@link TransactionAttribute} to set |
135 | */ |
136 | public void setTransactionAttribute(TransactionAttribute transactionAttribute) { |
137 | this.transactionAttribute = transactionAttribute; |
138 | } |
139 | |
140 | /** |
141 | * Public setter for the {@link Tasklet}. |
142 | * |
143 | * @param tasklet the {@link Tasklet} to set |
144 | */ |
145 | public void setTasklet(Tasklet tasklet) { |
146 | this.tasklet = tasklet; |
147 | if (tasklet instanceof StepExecutionListener) { |
148 | registerStepExecutionListener((StepExecutionListener) tasklet); |
149 | } |
150 | } |
151 | |
152 | /** |
153 | * Register a chunk listener for callbacks at the appropriate stages in a |
154 | * step execution. |
155 | * |
156 | * @param listener a {@link ChunkListener} |
157 | */ |
158 | public void registerChunkListener(ChunkListener listener) { |
159 | this.chunkListener.register(listener); |
160 | } |
161 | |
162 | /** |
163 | * Register each of the objects as listeners. |
164 | * |
165 | * @param listeners an array of listener objects of known types. |
166 | */ |
167 | public void setChunkListeners(ChunkListener[] listeners) { |
168 | for (int i = 0; i < listeners.length; i++) { |
169 | registerChunkListener(listeners[i]); |
170 | } |
171 | } |
172 | |
173 | /** |
174 | * Register each of the streams for callbacks at the appropriate time in the |
175 | * step. The {@link ItemReader} and {@link ItemWriter} are automatically |
176 | * registered, but it doesn't hurt to also register them here. Injected |
177 | * dependencies of the reader and writer are not automatically registered, |
178 | * so if you implement {@link ItemWriter} using delegation to another object |
179 | * which itself is a {@link ItemStream}, you need to register the delegate |
180 | * here. |
181 | * |
182 | * @param streams an array of {@link ItemStream} objects. |
183 | */ |
184 | public void setStreams(ItemStream[] streams) { |
185 | for (int i = 0; i < streams.length; i++) { |
186 | registerStream(streams[i]); |
187 | } |
188 | } |
189 | |
190 | /** |
191 | * Register a single {@link ItemStream} for callbacks to the stream |
192 | * interface. |
193 | * |
194 | * @param stream |
195 | */ |
196 | public void registerStream(ItemStream stream) { |
197 | this.stream.register(stream); |
198 | } |
199 | |
200 | /** |
201 | * The {@link RepeatOperations} to use for the outer loop of the batch |
202 | * processing. Should be set up by the caller through a factory. Defaults to |
203 | * a plain {@link RepeatTemplate}. |
204 | * |
205 | * @param stepOperations a {@link RepeatOperations} instance. |
206 | */ |
207 | public void setStepOperations(RepeatOperations stepOperations) { |
208 | this.stepOperations = stepOperations; |
209 | } |
210 | |
211 | /** |
212 | * Setter for the {@link StepInterruptionPolicy}. The policy is used to |
213 | * check whether an external request has been made to interrupt the job |
214 | * execution. |
215 | * |
216 | * @param interruptionPolicy a {@link StepInterruptionPolicy} |
217 | */ |
218 | public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) { |
219 | this.interruptionPolicy = interruptionPolicy; |
220 | } |
221 | |
222 | /** |
223 | * Process the step and update its context so that progress can be monitored |
224 | * by the caller. The step is broken down into chunks, each one executing in |
225 | * a transaction. The step and its execution and execution context are all |
226 | * given an up to date {@link BatchStatus}, and the {@link JobRepository} is |
227 | * used to store the result. Various reporting information are also added to |
228 | * the current context governing the step execution, which would normally be |
229 | * available to the caller through the step's {@link ExecutionContext}.<br/> |
230 | * |
231 | * @throws JobInterruptedException if the step or a chunk is interrupted |
232 | * @throws RuntimeException if there is an exception during a chunk |
233 | * execution |
234 | * |
235 | */ |
236 | @Override |
237 | protected void doExecute(StepExecution stepExecution) throws Exception { |
238 | |
239 | stream.update(stepExecution.getExecutionContext()); |
240 | getJobRepository().updateExecutionContext(stepExecution); |
241 | |
242 | stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { |
243 | |
244 | @Override |
245 | public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) |
246 | throws Exception { |
247 | |
248 | StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); |
249 | |
250 | StepContribution contribution = stepExecution.createStepContribution(); |
251 | |
252 | // Before starting a new transaction, check for |
253 | // interruption. |
254 | interruptionPolicy.checkInterrupted(stepExecution); |
255 | |
256 | RepeatStatus result = RepeatStatus.CONTINUABLE; |
257 | |
258 | TransactionStatus transaction = transactionManager.getTransaction(transactionAttribute); |
259 | |
260 | chunkListener.beforeChunk(); |
261 | |
262 | boolean locked = false; |
263 | |
264 | try { |
265 | |
266 | try { |
267 | try { |
268 | result = tasklet.execute(contribution, chunkContext); |
269 | if(result == null) { |
270 | result = RepeatStatus.FINISHED; |
271 | } |
272 | } |
273 | catch (Exception e) { |
274 | if (transactionAttribute.rollbackOn(e)) { |
275 | throw e; |
276 | } |
277 | } |
278 | chunkListener.afterChunk(); |
279 | } |
280 | finally { |
281 | // Apply the contribution to the step |
282 | // even if unsuccessful |
283 | logger.debug("Applying contribution: " + contribution); |
284 | stepExecution.apply(contribution); |
285 | |
286 | } |
287 | |
288 | // If the step operations are asynchronous then we need |
289 | // to synchronize changes to the step execution (at a |
290 | // minimum). |
291 | try { |
292 | semaphore.acquire(); |
293 | locked = true; |
294 | } |
295 | catch (InterruptedException e) { |
296 | stepExecution.setStatus(BatchStatus.STOPPED); |
297 | Thread.currentThread().interrupt(); |
298 | } |
299 | |
300 | stream.update(stepExecution.getExecutionContext()); |
301 | |
302 | try { |
303 | getJobRepository().updateExecutionContext(stepExecution); |
304 | transactionManager.commit(transaction); |
305 | stepExecution.incrementCommitCount(); |
306 | logger.debug("Saving step execution after commit: " + stepExecution); |
307 | getJobRepository().update(stepExecution); |
308 | } |
309 | catch (Exception e) { |
310 | throw new FatalException("Fatal failure detected", e); |
311 | } |
312 | |
313 | } |
314 | catch (FatalException e) { |
315 | try { |
316 | logger.debug("Rollback for FatalException: " + e.getClass().getName() + ": " + e.getMessage()); |
317 | rollback(stepExecution, transaction); |
318 | } |
319 | catch (Exception rollbackException) { |
320 | /* |
321 | * Propagate the original fatal failure; only log the |
322 | * failed rollback. The failure can be caused by |
323 | * attempting a rollback when the commit has already |
324 | * succeeded (which is normal so only logged at debug |
325 | * level) |
326 | */ |
327 | logger.debug("Rollback caused by fatal failure failed", rollbackException); |
328 | } |
329 | throw e; |
330 | } |
331 | catch (Error e) { |
332 | try { |
333 | logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage()); |
334 | rollback(stepExecution, transaction); |
335 | } |
336 | catch (Exception rollbackException) { |
337 | logger.error("Fatal rollback failure, original exception that caused the rollback is", e); |
338 | throw new FatalException("Failed while processing rollback", rollbackException); |
339 | } |
340 | throw e; |
341 | |
342 | } |
343 | catch (Exception e) { |
344 | try { |
345 | logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage()); |
346 | rollback(stepExecution, transaction); |
347 | } |
348 | catch (Exception rollbackException) { |
349 | logger.error("Fatal rollback failure, original exception that caused the rollback is", e); |
350 | throw new FatalException("Failed while processing rollback", rollbackException); |
351 | } |
352 | throw e; |
353 | } |
354 | finally { |
355 | // only release the lock if we acquired it |
356 | if (locked) { |
357 | semaphore.release(); |
358 | } |
359 | locked = false; |
360 | } |
361 | |
362 | // Check for interruption after transaction as well, so that |
363 | // the interrupted exception is correctly propagated up to |
364 | // caller |
365 | interruptionPolicy.checkInterrupted(stepExecution); |
366 | |
367 | return result; |
368 | } |
369 | |
370 | }); |
371 | |
372 | } |
373 | |
374 | protected void close(ExecutionContext ctx) throws Exception { |
375 | stream.close(); |
376 | } |
377 | |
378 | protected void open(ExecutionContext ctx) throws Exception { |
379 | stream.open(ctx); |
380 | } |
381 | |
382 | private void rollback(StepExecution stepExecution, TransactionStatus transaction) { |
383 | transactionManager.rollback(transaction); |
384 | stepExecution.incrementRollbackCount(); |
385 | } |
386 | } |