EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [SimpleChunkProcessor.java]

nameclass, %method, %block, %line, %
SimpleChunkProcessor.java100% (1/1)90%  (18/20)93%  (213/228)92%  (73/79)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SimpleChunkProcessor100% (1/1)90%  (18/20)93%  (213/228)92%  (73/79)
SimpleChunkProcessor (): void 0%   (0/1)0%   (0/5)0%   (0/2)
afterPropertiesSet (): void 0%   (0/1)0%   (0/9)0%   (0/3)
doWrite (List): void 100% (1/1)96%  (22/23)90%  (9/10)
SimpleChunkProcessor (ItemProcessor, ItemWriter): void 100% (1/1)100% (14/14)100% (5/5)
doAfterWrite (List): void 100% (1/1)100% (5/5)100% (2/2)
doOnWriteError (Exception, List): void 100% (1/1)100% (6/6)100% (2/2)
doProcess (Object): Object 100% (1/1)100% (31/31)100% (10/10)
getAdjustedOutputs (Chunk, Chunk): Chunk 100% (1/1)100% (2/2)100% (1/1)
getFilterCount (Chunk, Chunk): int 100% (1/1)100% (8/8)100% (1/1)
getListener (): MulticasterBatchListener 100% (1/1)100% (3/3)100% (1/1)
initializeUserData (Chunk): void 100% (1/1)100% (6/6)100% (2/2)
isComplete (Chunk): boolean 100% (1/1)100% (3/3)100% (1/1)
process (StepContribution, Chunk): void 100% (1/1)100% (28/28)100% (7/7)
registerListener (StepListener): void 100% (1/1)100% (5/5)100% (2/2)
setItemProcessor (ItemProcessor): void 100% (1/1)100% (4/4)100% (2/2)
setItemWriter (ItemWriter): void 100% (1/1)100% (4/4)100% (2/2)
setListeners (List): void 100% (1/1)100% (15/15)100% (3/3)
transform (StepContribution, Chunk): Chunk 100% (1/1)100% (34/34)100% (13/13)
write (StepContribution, Chunk, Chunk): void 100% (1/1)100% (15/15)100% (7/7)
writeItems (List): void 100% (1/1)100% (8/8)100% (3/3)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.step.item;
18 
19import java.util.List;
20 
21import org.springframework.batch.core.StepContribution;
22import org.springframework.batch.core.StepListener;
23import org.springframework.batch.core.listener.MulticasterBatchListener;
24import org.springframework.batch.item.ItemProcessor;
25import org.springframework.batch.item.ItemWriter;
26import org.springframework.beans.factory.InitializingBean;
27import org.springframework.util.Assert;
28 
29/**
30 * Simple implementation of the {@link ChunkProcessor} interface that handles
31 * basic item writing and processing. Any exceptions encountered will be
32 * rethrown.
33 * 
34 * @see ChunkOrientedTasklet
35 */
36public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
37 
38        private ItemProcessor<? super I, ? extends O> itemProcessor;
39 
40        private ItemWriter<? super O> itemWriter;
41 
42        private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>();
43 
44        /**
45         * Default constructor for ease of configuration (both itemWriter and
46         * itemProcessor are mandatory).
47         */
48        @SuppressWarnings("unused")
49        private SimpleChunkProcessor() {
50                this(null, null);
51        }
52 
53        public SimpleChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
54                this.itemProcessor = itemProcessor;
55                this.itemWriter = itemWriter;
56        }
57 
58        /**
59         * @param itemProcessor the {@link ItemProcessor} to set
60         */
61        public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
62                this.itemProcessor = itemProcessor;
63        }
64 
65        /**
66         * @param itemWriter the {@link ItemWriter} to set
67         */
68        public void setItemWriter(ItemWriter<? super O> itemWriter) {
69                this.itemWriter = itemWriter;
70        }
71 
72        /**
73         * Check mandatory properties.
74         * 
75         * @see InitializingBean#afterPropertiesSet()
76         */
77        public void afterPropertiesSet() throws Exception {
78                Assert.notNull(itemWriter, "ItemWriter must be set");
79                Assert.notNull(itemProcessor, "ItemProcessor must be set");
80        }
81 
82        /**
83         * Register some {@link StepListener}s with the handler. Each will get the
84         * callbacks in the order specified at the correct stage.
85         * 
86         * @param listeners
87         */
88        public void setListeners(List<? extends StepListener> listeners) {
89                for (StepListener listener : listeners) {
90                        registerListener(listener);
91                }
92        }
93 
94        /**
95         * Register a listener for callbacks at the appropriate stages in a process.
96         * 
97         * @param listener a {@link StepListener}
98         */
99        public void registerListener(StepListener listener) {
100                this.listener.register(listener);
101        }
102 
103        /**
104         * @return the listener
105         */
106        protected MulticasterBatchListener<I, O> getListener() {
107                return listener;
108        }
109 
110        /**
111         * @param item the input item
112         * @return the result of the processing
113         * @throws Exception
114         */
115        protected final O doProcess(I item) throws Exception {
116 
117                if (itemProcessor == null) {
118                        @SuppressWarnings("unchecked")
119                        O result = (O) item;
120                        return result;
121                }
122 
123                try {
124                        listener.beforeProcess(item);
125                        O result = itemProcessor.process(item);
126                        listener.afterProcess(item, result);
127                        return result;
128                }
129                catch (Exception e) {
130                        listener.onProcessError(item, e);
131                        throw e;
132                }
133 
134        }
135 
136        /**
137         * Surrounds the actual write call with listener callbacks.
138         * 
139         * @param items
140         * @throws Exception
141         */
142        protected final void doWrite(List<O> items) throws Exception {
143 
144                if (itemWriter == null) {
145                        return;
146                }
147 
148                try {
149                        listener.beforeWrite(items);
150                        writeItems(items);
151                        doAfterWrite(items);
152                }
153                catch (Exception e) {
154                        doOnWriteError(e, items);
155                        throw e;
156                }
157 
158        }
159 
160        /**
161         * Call the listener's after write method.
162         * 
163         * @param items
164         */
165        protected final void doAfterWrite(List<O> items) {
166                listener.afterWrite(items);
167        }
168        protected final void doOnWriteError(Exception e, List<O> items) {
169                listener.onWriteError(e, items);
170        }
171 
172        protected void writeItems(List<O> items) throws Exception {
173                if (itemWriter != null) {
174                        itemWriter.write(items);
175                }
176        }
177 
178        public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
179 
180                // Allow temporary state to be stored in the user data field
181                initializeUserData(inputs);
182 
183                // If there is no input we don't have to do anything more
184                if (isComplete(inputs)) {
185                        return;
186                }
187 
188                // Make the transformation, calling remove() on the inputs iterator if
189                // any items are filtered. Might throw exception and cause rollback.
190                Chunk<O> outputs = transform(contribution, inputs);
191 
192                // Adjust the filter count based on available data
193                contribution.incrementFilterCount(getFilterCount(inputs, outputs));
194 
195                // Adjust the outputs if necessary for housekeeping purposes, and then
196                // write them out...
197                write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
198 
199        }
200 
201        /**
202         * Extension point for subclasses to allow them to memorise the contents of
203         * the inputs, in case they are needed for accounting purposes later. The
204         * default implementation sets up some user data to remember the original
205         * size of the inputs. If this method is overridden then some or all of
206         * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and
207         * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to
208         * ensure that the user data is handled consistently.
209         * 
210         * @param inputs the inputs for the process
211         */
212        protected void initializeUserData(Chunk<I> inputs) {
213                inputs.setUserData(inputs.size());
214        }
215 
216        /**
217         * Extension point for subclasses to calculate the filter count. Defaults to
218         * the difference between input size and output size.
219         * 
220         * @param inputs the inputs after transformation
221         * @param outputs the outputs after transformation
222         * 
223         * @return the difference in sizes
224         * 
225         * @see #initializeUserData(Chunk)
226         */
227        protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
228                return (Integer) inputs.getUserData() - outputs.size();
229        }
230 
231        /**
232         * Extension point for subclasses that want to store additional data in the
233         * inputs. Default just checks if inputs are empty.
234         * 
235         * @param inputs the input chunk
236         * @return true if it is empty
237         * 
238         * @see #initializeUserData(Chunk)
239         */
240        protected boolean isComplete(Chunk<I> inputs) {
241                return inputs.isEmpty();
242        }
243 
244        /**
245         * Extension point for subclasses that want to adjust the outputs based on
246         * additional saved data in the inputs. Default implementation just returns
247         * the outputs unchanged.
248         * 
249         * @param inputs the inputs for the transformation
250         * @param outputs the result of the transformation
251         * @return the outputs unchanged
252         * 
253         * @see #initializeUserData(Chunk)
254         */
255        protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
256                return outputs;
257        }
258 
259        /**
260         * Simple implementation delegates to the {@link #doWrite(List)} method and
261         * increments the write count in the contribution. Subclasses can handle
262         * more complicated scenarios, e.g.with fault tolerance. If output items are
263         * skipped they should be removed from the inputs as well.
264         * 
265         * @param contribution the current step contribution
266         * @param inputs the inputs that gave rise to the ouputs
267         * @param outputs the outputs to write
268         * @throws Exception if there is a problem
269         */
270        protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
271                try {
272                        doWrite(outputs.getItems());
273                }
274                catch (Exception e) {
275                        /*
276                         * For a simple chunk processor (no fault tolerance) we are done
277                         * here, so prevent any more processing of these inputs.
278                         */
279                        inputs.clear();
280                        throw e;
281                }
282                contribution.incrementWriteCount(outputs.size());
283        }
284 
285        protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
286                Chunk<O> outputs = new Chunk<O>();
287                for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
288                        final I item = iterator.next();
289                        O output;
290                        try {
291                                output = doProcess(item);
292                        }
293                        catch (Exception e) {
294                                /*
295                                 * For a simple chunk processor (no fault tolerance) we are done
296                                 * here, so prevent any more processing of these inputs.
297                                 */
298                                inputs.clear();
299                                throw e;
300                        }
301                        if (output != null) {
302                                outputs.add(output);
303                        }
304                        else {
305                                iterator.remove();
306                        }
307                }
308                return outputs;
309        }
310 
311}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov