EMMA Coverage Report (generated Tue May 06 07:29:23 PDT 2008)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [ItemOrientedStep.java]

nameclass, %method, %block, %line, %
ItemOrientedStep.java100% (4/4)96%  (23/24)98%  (400/408)97%  (99.9/103)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ItemOrientedStep$ExceptionHolder100% (1/1)75%  (3/4)82%  (14/17)80%  (4/5)
getException (): Exception 0%   (0/1)0%   (0/3)0%   (0/1)
ItemOrientedStep$ExceptionHolder (): void 100% (1/1)100% (3/3)100% (1/1)
hasException (): boolean 100% (1/1)100% (7/7)100% (1/1)
setException (Exception): void 100% (1/1)100% (4/4)100% (2/2)
     
class ItemOrientedStep$2100% (1/1)100% (2/2)95%  (37/39)86%  (6/7)
doInIteration (RepeatContext): ExitStatus 100% (1/1)93%  (25/27)83%  (5/6)
ItemOrientedStep$2 (ItemOrientedStep, StepExecution, StepContribution): void 100% (1/1)100% (12/12)100% (1/1)
     
class ItemOrientedStep$1100% (1/1)100% (2/2)99%  (172/174)98%  (40/41)
doInIteration (RepeatContext): ExitStatus 100% (1/1)99%  (160/162)98%  (39/40)
ItemOrientedStep$1 (ItemOrientedStep, StepExecution, ItemOrientedStep$Excepti... 100% (1/1)100% (12/12)100% (1/1)
     
class ItemOrientedStep100% (1/1)100% (16/16)99%  (177/178)100% (49.9/50)
<static initializer> 100% (1/1)91%  (10/11)90%  (0.9/1)
ItemOrientedStep (String): void 100% (1/1)100% (30/30)100% (7/7)
close (ExecutionContext): void 100% (1/1)100% (5/5)100% (2/2)
doExecute (StepExecution): ExitStatus 100% (1/1)100% (27/27)100% (5/5)
open (ExecutionContext): void 100% (1/1)100% (5/5)100% (2/2)
processChunk (StepExecution, StepContribution): ExitStatus 100% (1/1)100% (12/12)100% (2/2)
processRollback (StepExecution, StepContribution, ItemOrientedStep$ExceptionH... 100% (1/1)100% (31/31)100% (11/11)
registerStream (ItemStream): void 100% (1/1)100% (5/5)100% (2/2)
setChunkOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setInterruptionPolicy (StepInterruptionPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setItemHandler (ItemHandler): void 100% (1/1)100% (4/4)100% (2/2)
setStepExecutionListeners (StepExecutionListener []): void 100% (1/1)100% (14/14)100% (3/3)
setStepOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setStreams (ItemStream []): void 100% (1/1)100% (14/14)100% (3/3)
setSynchronizer (StepExecutionSynchronizer): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionManager (PlatformTransactionManager): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import org.apache.commons.logging.Log;
19import org.apache.commons.logging.LogFactory;
20import org.springframework.batch.core.BatchStatus;
21import org.springframework.batch.core.JobInterruptedException;
22import org.springframework.batch.core.StepContribution;
23import org.springframework.batch.core.StepExecution;
24import org.springframework.batch.core.StepExecutionListener;
25import org.springframework.batch.core.repository.JobRepository;
26import org.springframework.batch.core.step.AbstractStep;
27import org.springframework.batch.core.step.StepExecutionSynchronizer;
28import org.springframework.batch.core.step.StepExecutionSynchronizerFactory;
29import org.springframework.batch.core.step.StepInterruptionPolicy;
30import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
31import org.springframework.batch.item.ExecutionContext;
32import org.springframework.batch.item.ItemReader;
33import org.springframework.batch.item.ItemStream;
34import org.springframework.batch.item.ItemWriter;
35import org.springframework.batch.item.support.CompositeItemStream;
36import org.springframework.batch.repeat.ExitStatus;
37import org.springframework.batch.repeat.RepeatCallback;
38import org.springframework.batch.repeat.RepeatContext;
39import org.springframework.batch.repeat.RepeatOperations;
40import org.springframework.batch.repeat.support.RepeatTemplate;
41import org.springframework.transaction.PlatformTransactionManager;
42import org.springframework.transaction.TransactionStatus;
43import org.springframework.transaction.support.DefaultTransactionDefinition;
44 
45/**
46 * Simple implementation of executing the step as a set of chunks, each chunk
47 * surrounded by a transaction. The structure is therefore that of two nested
48 * loops, with transaction boundary around the whole inner loop. The outer loop
49 * is controlled by the step operations ({@link #setStepOperations(RepeatOperations)}),
50 * and the inner loop by the chunk operations ({@link #setChunkOperations(RepeatOperations)}).
51 * The inner loop should always be executed in a single thread, so the chunk
52 * operations should not do any concurrent execution. N.B. usually that means
53 * that the chunk operations should be a {@link RepeatTemplate} (which is the
54 * default).<br/>
55 * 
56 * Clients can use interceptors in the step operations to intercept or listen to
57 * the iteration on a step-wide basis, for instance to get a callback when the
58 * step is complete. Those that want callbacks at the level of an individual
59 * tasks, can specify interceptors for the chunk operations.
60 * 
61 * @author Dave Syer
62 * @author Lucas Ward
63 * @author Ben Hale
64 * @author Robert Kasanicky
65 */
66public class ItemOrientedStep extends AbstractStep {
67 
68        private static final Log logger = LogFactory.getLog(ItemOrientedStep.class);
69 
70        private RepeatOperations chunkOperations = new RepeatTemplate();
71 
72        private RepeatOperations stepOperations = new RepeatTemplate();
73 
74        // default to checking current thread for interruption.
75        private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
76 
77        private CompositeItemStream stream = new CompositeItemStream();
78 
79        private PlatformTransactionManager transactionManager;
80 
81        private ItemHandler itemHandler;
82 
83        private StepExecutionSynchronizer synchronizer;
84 
85        /**
86         * @param name
87         */
88        public ItemOrientedStep(String name) {
89                super(name);
90                synchronizer = new StepExecutionSynchronizerFactory().getStepExecutionSynchronizer();
91        }
92 
93        /**
94         * Public setter for the {@link PlatformTransactionManager}.
95         * 
96         * @param transactionManager the transaction manager to set
97         */
98        public void setTransactionManager(PlatformTransactionManager transactionManager) {
99                this.transactionManager = transactionManager;
100        }
101 
102        /**
103         * Public setter for the {@link ItemHandler}.
104         * 
105         * @param itemHandler the {@link ItemHandler} to set
106         */
107        public void setItemHandler(ItemHandler itemHandler) {
108                this.itemHandler = itemHandler;
109        }
110 
111        /**
112         * Register each of the objects as listeners. If the {@link ItemReader} or
113         * {@link ItemWriter} themselves implements this interface they will be
114         * registered automatically, but their injected dependencies will not be.
115         * This is a good way to get access to job parameters and execution context
116         * if the tasklet is parameterised.
117         * 
118         * @param listeners an array of listener objects of known types.
119         */
120        public void setStepExecutionListeners(StepExecutionListener[] listeners) {
121                for (int i = 0; i < listeners.length; i++) {
122                        registerStepExecutionListener(listeners[i]);
123                }
124        }
125 
126        /**
127         * Register each of the streams for callbacks at the appropriate time in the
128         * step. The {@link ItemReader} and {@link ItemWriter} are automatically
129         * registered, but it doesn't hurt to also register them here. Injected
130         * dependencies of the reader and writer are not automatically registered,
131         * so if you implement {@link ItemWriter} using delegation to another object
132         * which itself is a {@link ItemStream}, you need to register the delegate
133         * here.
134         * 
135         * @param streams an array of {@link ItemStream} objects.
136         */
137        public void setStreams(ItemStream[] streams) {
138                for (int i = 0; i < streams.length; i++) {
139                        registerStream(streams[i]);
140                }
141        }
142 
143        /**
144         * Register a single {@link ItemStream} for callbacks to the stream
145         * interface.
146         * 
147         * @param stream
148         */
149        public void registerStream(ItemStream stream) {
150                this.stream.register(stream);
151        }
152 
153        /**
154         * The {@link RepeatOperations} to use for the outer loop of the batch
155         * processing. Should be set up by the caller through a factory. Defaults to
156         * a plain {@link RepeatTemplate}.
157         * 
158         * @param stepOperations a {@link RepeatOperations} instance.
159         */
160        public void setStepOperations(RepeatOperations stepOperations) {
161                this.stepOperations = stepOperations;
162        }
163 
164        /**
165         * The {@link RepeatOperations} to use for the inner loop of the batch
166         * processing. should be set up by the caller through a factory. defaults to
167         * a plain {@link RepeatTemplate}.
168         * 
169         * @param chunkOperations a {@link RepeatOperations} instance.
170         */
171        public void setChunkOperations(RepeatOperations chunkOperations) {
172                this.chunkOperations = chunkOperations;
173        }
174 
175        /**
176         * Setter for the {@link StepInterruptionPolicy}. The policy is used to
177         * check whether an external request has been made to interrupt the job
178         * execution.
179         * 
180         * @param interruptionPolicy a {@link StepInterruptionPolicy}
181         */
182        public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
183                this.interruptionPolicy = interruptionPolicy;
184        }
185 
186        /**
187         * Mostly useful for testing, but could be used to remove dependence on
188         * backport concurrency utilities. Public setter for the
189         * {@link StepExecutionSynchronizer}.
190         * 
191         * @param synchronizer the {@link StepExecutionSynchronizer} to set
192         */
193        public void setSynchronizer(StepExecutionSynchronizer synchronizer) {
194                this.synchronizer = synchronizer;
195        }
196 
197        /**
198         * Process the step and update its context so that progress can be monitored
199         * by the caller. The step is broken down into chunks, each one executing in
200         * a transaction. The step and its execution and execution context are all
201         * given an up to date {@link BatchStatus}, and the {@link JobRepository}
202         * is used to store the result. Various reporting information are also added
203         * to the current context (the {@link RepeatContext} governing the step
204         * execution, which would normally be available to the caller somehow
205         * through the step's {@link ExecutionContext}.<br/>
206         * 
207         * @throws JobInterruptedException if the step or a chunk is interrupted
208         * @throws RuntimeException if there is an exception during a chunk
209         * execution
210         * 
211         */
212        protected ExitStatus doExecute(final StepExecution stepExecution) throws Exception {
213                stream.update(stepExecution.getExecutionContext());
214                getJobRepository().saveOrUpdateExecutionContext(stepExecution);
215                itemHandler.mark();
216 
217                final ExceptionHolder fatalException = new ExceptionHolder();
218 
219                return stepOperations.iterate(new RepeatCallback() {
220 
221                        public ExitStatus doInIteration(RepeatContext context) throws Exception {
222                                final StepContribution contribution = stepExecution.createStepContribution();
223                                // Before starting a new transaction, check for
224                                // interruption.
225                                if (stepExecution.isTerminateOnly()) {
226                                        context.setTerminateOnly();
227                                }
228                                interruptionPolicy.checkInterrupted(stepExecution);
229 
230                                ExitStatus exitStatus = ExitStatus.CONTINUABLE;
231 
232                                TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionDefinition());
233 
234                                try {
235 
236                                        exitStatus = processChunk(stepExecution, contribution);
237                                        contribution.incrementCommitCount();
238 
239                                        // If the step operations are asynchronous then we need
240                                        // to synchronize changes to the step execution (at a
241                                        // minimum).
242                                        try {
243                                                synchronizer.lock(stepExecution);
244                                        }
245                                        catch (InterruptedException e) {
246                                                stepExecution.setStatus(BatchStatus.STOPPED);
247                                                Thread.currentThread().interrupt();
248                                        }
249 
250                                        // Apply the contribution to the step
251                                        // only if chunk was successful
252                                        stepExecution.apply(contribution);
253 
254                                        // Attempt to flush before the step execution and stream
255                                        // state are updated
256                                        itemHandler.flush();
257 
258                                        stream.update(stepExecution.getExecutionContext());
259                                        try {
260                                                getJobRepository().saveOrUpdateExecutionContext(stepExecution);
261                                        }
262                                        catch (Exception e) {
263                                                fatalException.setException(e);
264                                                stepExecution.setStatus(BatchStatus.UNKNOWN);
265                                                throw new FatalException("Fatal error detected during save of step execution context", e);
266                                        }
267 
268                                        try {
269                                                itemHandler.mark();
270                                                transactionManager.commit(transaction);
271                                        }
272                                        catch (Exception e) {
273                                                fatalException.setException(e);
274                                                stepExecution.setStatus(BatchStatus.UNKNOWN);
275                                                logger.error("Fatal error detected during commit.");
276                                                throw new FatalException("Fatal error detected during commit", e);
277                                        }
278 
279                                }
280                                catch (Error e) {
281                                        processRollback(stepExecution, contribution, fatalException, transaction);
282                                        throw e;
283                                }
284                                catch (Exception e) {
285                                        processRollback(stepExecution, contribution, fatalException, transaction);
286                                        throw e;
287                                }
288                                finally {
289                                        synchronizer.release(stepExecution);
290                                }
291 
292                                // Check for interruption after transaction as well, so that
293                                // the interrupted exception is correctly propagated up to
294                                // caller
295                                interruptionPolicy.checkInterrupted(stepExecution);
296 
297                                return exitStatus;
298                        }
299 
300                });
301 
302        }
303 
304        /**
305         * Execute a bunch of identical business logic operations all within a
306         * transaction. The transaction is programmatically started and stopped
307         * outside this method, so subclasses that override do not need to create a
308         * transaction.
309         * @param execution the current {@link StepExecution} which should be
310         * treated as read-only for the purposes of this method.
311         * @param contribution the current {@link StepContribution} which can accept
312         * changes to be aggregated later into the step execution.
313         * 
314         * @return true if there is more data to process.
315         */
316        protected ExitStatus processChunk(final StepExecution execution, final StepContribution contribution) {
317                ExitStatus result = chunkOperations.iterate(new RepeatCallback() {
318                        public ExitStatus doInIteration(final RepeatContext context) throws Exception {
319                                if (execution.isTerminateOnly()) {
320                                        context.setTerminateOnly();
321                                }
322                                // check for interruption before each item as well
323                                interruptionPolicy.checkInterrupted(execution);
324                                ExitStatus exitStatus = itemHandler.handle(contribution);
325                                // check for interruption after each item as well
326                                interruptionPolicy.checkInterrupted(execution);
327                                return exitStatus;
328                        }
329                });
330                return result;
331        }
332 
333        /**
334         * @param stepExecution
335         * @param contribution
336         * @param fatalException
337         * @param transaction
338         */
339        private void processRollback(final StepExecution stepExecution, final StepContribution contribution,
340                        final ExceptionHolder fatalException, TransactionStatus transaction) {
341 
342                stepExecution.incrementSkipCountBy(contribution.getSkipCount());
343                /*
344                 * Any exception thrown within the transaction should automatically
345                 * cause the transaction to rollback.
346                 */
347                stepExecution.rollback();
348 
349                try {
350                        itemHandler.reset();
351                        itemHandler.clear();
352                        transactionManager.rollback(transaction);
353                }
354                catch (Exception e) {
355                        /*
356                         * If we already failed to commit, it doesn't help to do this again -
357                         * it's better to allow the CommitFailedException to propagate
358                         */
359                        if (!fatalException.hasException()) {
360                                fatalException.setException(e);
361                                throw new FatalException("Failed while processing rollback", e);
362                        }
363                }
364        }
365 
366        private static class ExceptionHolder {
367 
368                private Exception exception;
369 
370                public boolean hasException() {
371                        return exception != null;
372                }
373 
374                public void setException(Exception exception) {
375                        this.exception = exception;
376                }
377 
378                public Exception getException() {
379                        return this.exception;
380                }
381 
382        }
383 
384        protected void close(ExecutionContext ctx) throws Exception {
385                stream.close(ctx);
386        }
387 
388        protected void open(ExecutionContext ctx) throws Exception {
389                stream.open(ctx);
390        }
391}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov