EMMA Coverage Report (generated Thu May 22 12:08:10 CDT 2014)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [SimpleChunkProcessor.java]

nameclass, %method, %block, %line, %
SimpleChunkProcessor.java100% (1/1)95%  (19/20)97%  (222/228)96%  (77/80)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SimpleChunkProcessor100% (1/1)95%  (19/20)97%  (222/228)96%  (77/80)
SimpleChunkProcessor (): void 0%   (0/1)0%   (0/5)0%   (0/2)
doWrite (List): void 100% (1/1)96%  (22/23)90%  (9/10)
SimpleChunkProcessor (ItemProcessor, ItemWriter): void 100% (1/1)100% (14/14)100% (5/5)
afterPropertiesSet (): void 100% (1/1)100% (9/9)100% (3/3)
doAfterWrite (List): void 100% (1/1)100% (5/5)100% (2/2)
doOnWriteError (Exception, List): void 100% (1/1)100% (6/6)100% (2/2)
doProcess (Object): Object 100% (1/1)100% (31/31)100% (10/10)
getAdjustedOutputs (Chunk, Chunk): Chunk 100% (1/1)100% (2/2)100% (1/1)
getFilterCount (Chunk, Chunk): int 100% (1/1)100% (8/8)100% (1/1)
getListener (): MulticasterBatchListener 100% (1/1)100% (3/3)100% (1/1)
initializeUserData (Chunk): void 100% (1/1)100% (6/6)100% (2/2)
isComplete (Chunk): boolean 100% (1/1)100% (3/3)100% (1/1)
process (StepContribution, Chunk): void 100% (1/1)100% (28/28)100% (7/7)
registerListener (StepListener): void 100% (1/1)100% (5/5)100% (2/2)
setItemProcessor (ItemProcessor): void 100% (1/1)100% (4/4)100% (2/2)
setItemWriter (ItemWriter): void 100% (1/1)100% (4/4)100% (2/2)
setListeners (List): void 100% (1/1)100% (15/15)100% (4/4)
transform (StepContribution, Chunk): Chunk 100% (1/1)100% (34/34)100% (13/13)
write (StepContribution, Chunk, Chunk): void 100% (1/1)100% (15/15)100% (7/7)
writeItems (List): void 100% (1/1)100% (8/8)100% (3/3)

1/*
2 * Copyright 2006-2013 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 
17package org.springframework.batch.core.step.item;
18 
19import java.util.List;
20 
21import org.springframework.batch.core.StepContribution;
22import org.springframework.batch.core.StepListener;
23import org.springframework.batch.core.listener.MulticasterBatchListener;
24import org.springframework.batch.item.ItemProcessor;
25import org.springframework.batch.item.ItemWriter;
26import org.springframework.beans.factory.InitializingBean;
27import org.springframework.util.Assert;
28 
29/**
30 * Simple implementation of the {@link ChunkProcessor} interface that handles
31 * basic item writing and processing. Any exceptions encountered will be
32 * rethrown.
33 *
34 * @see ChunkOrientedTasklet
35 */
36public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
37 
38        private ItemProcessor<? super I, ? extends O> itemProcessor;
39 
40        private ItemWriter<? super O> itemWriter;
41 
42        private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>();
43 
44        /**
45         * Default constructor for ease of configuration (both itemWriter and
46         * itemProcessor are mandatory).
47         */
48        @SuppressWarnings("unused")
49        private SimpleChunkProcessor() {
50                this(null, null);
51        }
52 
53        public SimpleChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
54                this.itemProcessor = itemProcessor;
55                this.itemWriter = itemWriter;
56        }
57 
58        /**
59         * @param itemProcessor the {@link ItemProcessor} to set
60         */
61        public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
62                this.itemProcessor = itemProcessor;
63        }
64 
65        /**
66         * @param itemWriter the {@link ItemWriter} to set
67         */
68        public void setItemWriter(ItemWriter<? super O> itemWriter) {
69                this.itemWriter = itemWriter;
70        }
71 
72        /**
73         * Check mandatory properties.
74         *
75         * @see InitializingBean#afterPropertiesSet()
76         */
77        @Override
78        public void afterPropertiesSet() throws Exception {
79                Assert.notNull(itemWriter, "ItemWriter must be set");
80                Assert.notNull(itemProcessor, "ItemProcessor must be set");
81        }
82 
83        /**
84         * Register some {@link StepListener}s with the handler. Each will get the
85         * callbacks in the order specified at the correct stage.
86         *
87         * @param listeners
88         */
89        public void setListeners(List<? extends StepListener> listeners) {
90                for (StepListener listener : listeners) {
91                        registerListener(listener);
92                }
93        }
94 
95        /**
96         * Register a listener for callbacks at the appropriate stages in a process.
97         *
98         * @param listener a {@link StepListener}
99         */
100        public void registerListener(StepListener listener) {
101                this.listener.register(listener);
102        }
103 
104        /**
105         * @return the listener
106         */
107        protected MulticasterBatchListener<I, O> getListener() {
108                return listener;
109        }
110 
111        /**
112         * @param item the input item
113         * @return the result of the processing
114         * @throws Exception
115         */
116        protected final O doProcess(I item) throws Exception {
117 
118                if (itemProcessor == null) {
119                        @SuppressWarnings("unchecked")
120                        O result = (O) item;
121                        return result;
122                }
123 
124                try {
125                        listener.beforeProcess(item);
126                        O result = itemProcessor.process(item);
127                        listener.afterProcess(item, result);
128                        return result;
129                }
130                catch (Exception e) {
131                        listener.onProcessError(item, e);
132                        throw e;
133                }
134 
135        }
136 
137        /**
138         * Surrounds the actual write call with listener callbacks.
139         *
140         * @param items
141         * @throws Exception
142         */
143        protected final void doWrite(List<O> items) throws Exception {
144 
145                if (itemWriter == null) {
146                        return;
147                }
148 
149                try {
150                        listener.beforeWrite(items);
151                        writeItems(items);
152                        doAfterWrite(items);
153                }
154                catch (Exception e) {
155                        doOnWriteError(e, items);
156                        throw e;
157                }
158 
159        }
160 
161        /**
162         * Call the listener's after write method.
163         *
164         * @param items
165         */
166        protected final void doAfterWrite(List<O> items) {
167                listener.afterWrite(items);
168        }
169        protected final void doOnWriteError(Exception e, List<O> items) {
170                listener.onWriteError(e, items);
171        }
172 
173        protected void writeItems(List<O> items) throws Exception {
174                if (itemWriter != null) {
175                        itemWriter.write(items);
176                }
177        }
178 
179        @Override
180        public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
181 
182                // Allow temporary state to be stored in the user data field
183                initializeUserData(inputs);
184 
185                // If there is no input we don't have to do anything more
186                if (isComplete(inputs)) {
187                        return;
188                }
189 
190                // Make the transformation, calling remove() on the inputs iterator if
191                // any items are filtered. Might throw exception and cause rollback.
192                Chunk<O> outputs = transform(contribution, inputs);
193 
194                // Adjust the filter count based on available data
195                contribution.incrementFilterCount(getFilterCount(inputs, outputs));
196 
197                // Adjust the outputs if necessary for housekeeping purposes, and then
198                // write them out...
199                write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
200 
201        }
202 
203        /**
204         * Extension point for subclasses to allow them to memorise the contents of
205         * the inputs, in case they are needed for accounting purposes later. The
206         * default implementation sets up some user data to remember the original
207         * size of the inputs. If this method is overridden then some or all of
208         * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and
209         * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to
210         * ensure that the user data is handled consistently.
211         *
212         * @param inputs the inputs for the process
213         */
214        protected void initializeUserData(Chunk<I> inputs) {
215                inputs.setUserData(inputs.size());
216        }
217 
218        /**
219         * Extension point for subclasses to calculate the filter count. Defaults to
220         * the difference between input size and output size.
221         *
222         * @param inputs the inputs after transformation
223         * @param outputs the outputs after transformation
224         *
225         * @return the difference in sizes
226         *
227         * @see #initializeUserData(Chunk)
228         */
229        protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
230                return (Integer) inputs.getUserData() - outputs.size();
231        }
232 
233        /**
234         * Extension point for subclasses that want to store additional data in the
235         * inputs. Default just checks if inputs are empty.
236         *
237         * @param inputs the input chunk
238         * @return true if it is empty
239         *
240         * @see #initializeUserData(Chunk)
241         */
242        protected boolean isComplete(Chunk<I> inputs) {
243                return inputs.isEmpty();
244        }
245 
246        /**
247         * Extension point for subclasses that want to adjust the outputs based on
248         * additional saved data in the inputs. Default implementation just returns
249         * the outputs unchanged.
250         *
251         * @param inputs the inputs for the transformation
252         * @param outputs the result of the transformation
253         * @return the outputs unchanged
254         *
255         * @see #initializeUserData(Chunk)
256         */
257        protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
258                return outputs;
259        }
260 
261        /**
262         * Simple implementation delegates to the {@link #doWrite(List)} method and
263         * increments the write count in the contribution. Subclasses can handle
264         * more complicated scenarios, e.g.with fault tolerance. If output items are
265         * skipped they should be removed from the inputs as well.
266         *
267         * @param contribution the current step contribution
268         * @param inputs the inputs that gave rise to the ouputs
269         * @param outputs the outputs to write
270         * @throws Exception if there is a problem
271         */
272        protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
273                try {
274                        doWrite(outputs.getItems());
275                }
276                catch (Exception e) {
277                        /*
278                         * For a simple chunk processor (no fault tolerance) we are done
279                         * here, so prevent any more processing of these inputs.
280                         */
281                        inputs.clear();
282                        throw e;
283                }
284                contribution.incrementWriteCount(outputs.size());
285        }
286 
287        protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
288                Chunk<O> outputs = new Chunk<O>();
289                for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
290                        final I item = iterator.next();
291                        O output;
292                        try {
293                                output = doProcess(item);
294                        }
295                        catch (Exception e) {
296                                /*
297                                 * For a simple chunk processor (no fault tolerance) we are done
298                                 * here, so prevent any more processing of these inputs.
299                                 */
300                                inputs.clear();
301                                throw e;
302                        }
303                        if (output != null) {
304                                outputs.add(output);
305                        }
306                        else {
307                                iterator.remove();
308                        }
309                }
310                return outputs;
311        }
312 
313}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov