EMMA Coverage Report (generated Fri Jan 30 13:20:29 EST 2009)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [ItemSkipPolicyItemHandler.java]

nameclass, %method, %block, %line, %
ItemSkipPolicyItemHandler.java100% (2/2)100% (19/19)95%  (344/364)95%  (87/92)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ItemSkipPolicyItemHandler100% (1/1)100% (17/17)94%  (339/359)94%  (85/90)
getSkippedException (Object): Throwable 100% (1/1)72%  (13/18)67%  (2/3)
setItemKeyGenerator (ItemKeyGenerator): void 100% (1/1)75%  (6/8)88%  (1.8/2)
clearSkippedExceptions (): void 100% (1/1)87%  (33/38)96%  (10.5/11)
shouldRethrow (Exception): boolean 100% (1/1)90%  (19/21)94%  (3.8/4)
read (StepContribution): Object 100% (1/1)93%  (71/76)86%  (18/21)
<static initializer> 100% (1/1)95%  (21/22)98%  (2/2)
ItemSkipPolicyItemHandler (ItemReader, ItemWriter): void 100% (1/1)100% (30/30)100% (8/8)
addSkippedException (Object, Throwable): void 100% (1/1)100% (43/43)100% (6/6)
doWriteWithSkip (Object, StepContribution): void 100% (1/1)100% (47/47)100% (13/13)
mark (): void 100% (1/1)100% (5/5)100% (3/3)
registerSkipListener (SkipListener): void 100% (1/1)100% (5/5)100% (2/2)
scheduleForRemoval (Object): void 100% (1/1)100% (15/15)100% (4/4)
setDoNotRethrowExceptionClasses (Class []): void 100% (1/1)100% (4/4)100% (2/2)
setItemSkipPolicy (ItemSkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setSkipCacheCapacity (int): void 100% (1/1)100% (4/4)100% (2/2)
setSkipListeners (SkipListener []): void 100% (1/1)100% (14/14)100% (3/3)
write (Object, StepContribution): void 100% (1/1)100% (5/5)100% (2/2)
     
class ItemSkipPolicyItemHandler$1100% (1/1)100% (2/2)100% (5/5)100% (2/2)
ItemSkipPolicyItemHandler$1 (): void 100% (1/1)100% (3/3)100% (1/1)
getKey (Object): Object 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import java.util.HashMap;
19import java.util.HashSet;
20import java.util.Iterator;
21import java.util.Map;
22import java.util.Set;
23 
24import org.springframework.batch.core.SkipListener;
25import org.springframework.batch.core.StepContribution;
26import org.springframework.batch.core.UnexpectedJobExecutionException;
27import org.springframework.batch.core.listener.CompositeSkipListener;
28import org.springframework.batch.core.step.skip.ItemSkipPolicy;
29import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
30import org.springframework.batch.core.step.skip.SkipLimitExceededException;
31import org.springframework.batch.item.ItemKeyGenerator;
32import org.springframework.batch.item.ItemReader;
33import org.springframework.batch.item.ItemWriter;
34import org.springframework.batch.item.MarkFailedException;
35import org.springframework.transaction.support.TransactionSynchronizationManager;
36 
37/**
38 * {@link ItemHandler} that implements skip behavior. It delegates to
39 * {@link #setItemSkipPolicy(ItemSkipPolicy)} to decide whether skip should be
40 * called or not.
41 * 
42 * When exception is skipped on read it is *not* re-thrown (does not cause tx
43 * rollback). Skipped exception on write is re-thrown by default (causes tx
44 * rollback) unless the exception class is included in
45 * {@link #setDoNotRethrowExceptionClasses(Class[])}.
46 * 
47 * If exception is thrown while reading the item, skip is called on the
48 * {@link ItemReader}. If exception is thrown while writing the item, skip is
49 * called on both {@link ItemReader} and {@link ItemWriter}.
50 * 
51 * @author Dave Syer
52 * @author Robert Kasanicky
53 */
54public class ItemSkipPolicyItemHandler extends SimpleItemHandler {
55 
56        /**
57         * Key for transaction resource that holds skipped keys until they can be
58         * removed
59         */
60        private static final String TO_BE_REMOVED = ItemSkipPolicyItemHandler.class.getName() + ".TO_BE_REMOVED";
61        
62        private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy();
63 
64        private int skipCacheCapacity = 1024;
65 
66        private Map skippedExceptions = new HashMap();
67 
68        private Class[] doNotRethrowExceptionClasses = new Class[] {};
69 
70        private static final ItemKeyGenerator defaultItemKeyGenerator = new ItemKeyGenerator() {
71                public Object getKey(Object item) {
72                        return item;
73                }
74        };
75 
76        private ItemKeyGenerator itemKeyGenerator = defaultItemKeyGenerator;
77 
78        private CompositeSkipListener listener = new CompositeSkipListener();
79 
80        /**
81         * Register some {@link SkipListener}s with the handler. Each will get the
82         * callbacks in the order specified at the correct stage if a skip occurs.
83         * 
84         * @param listeners
85         */
86        public void setSkipListeners(SkipListener[] listeners) {
87                for (int i = 0; i < listeners.length; i++) {
88                        registerSkipListener(listeners[i]);
89                }
90        }
91 
92        /**
93         * Register a listener for callbacks at the appropriate stages in a skip
94         * process.
95         * 
96         * @param listener a {@link SkipListener}
97         */
98        public void registerSkipListener(SkipListener listener) {
99                this.listener.register(listener);
100        }
101 
102        /**
103         * Public setter for the {@link ItemKeyGenerator}. Defaults to just return
104         * the item, and since it will be used before a write operation.
105         * Implementations must ensure that items always have the same key when they
106         * are read from the {@link ItemReader} (so if the item is mutable and the
107         * reader does any buffering the key generator might need to take care to
108         * only use data that do not change on write).
109         * 
110         * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. If null
111         * resets to default value.
112         */
113        public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) {
114                this.itemKeyGenerator = (itemKeyGenerator == null) ? defaultItemKeyGenerator : itemKeyGenerator;
115        }
116 
117        /**
118         * @param itemReader
119         * @param itemWriter
120         */
121        public ItemSkipPolicyItemHandler(ItemReader itemReader, ItemWriter itemWriter) {
122                super(itemReader, itemWriter);
123        }
124 
125        /**
126         * @param itemSkipPolicy
127         */
128        public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) {
129                this.itemSkipPolicy = itemSkipPolicy;
130        }
131 
132        /**
133         * Public setter for the capacity of the skipped item cache. If a large
134         * number of items are failing and not being recognized as skipped, it
135         * usually signals a problem with the key generation (often equals and
136         * hashCode in the item itself). So it is better to enforce a strict limit
137         * than have weird looking errors, where a skip limit is reached without
138         * anything being skipped.
139         * 
140         * @param skipCacheCapacity the capacity to set
141         */
142        public void setSkipCacheCapacity(int skipCacheCapacity) {
143                this.skipCacheCapacity = skipCacheCapacity;
144        }
145 
146        /**
147         * Tries to read the item from the reader, in case of exception skip the
148         * item if the skip policy allows, otherwise re-throw.
149         * 
150         * @param contribution current StepContribution holding skipped items count
151         * @return next item for processing
152         */
153        protected Object read(StepContribution contribution) throws Exception {
154 
155                while (true) {
156 
157                        try {
158 
159                                Object item = doRead();
160                                Object key = itemKeyGenerator.getKey(item);
161                                Throwable throwable = getSkippedException(key);
162                                while (item != null && throwable != null) {
163                                        logger.debug("Skipping item on input, previously failed on output; key=[" + key + "]");
164                                        scheduleForRemoval(key);
165                                        
166                                        item = doRead();
167                                        key = itemKeyGenerator.getKey(item);
168                                        throwable = getSkippedException(key);
169                                }
170                                return item;
171 
172                        }
173                        catch (Exception e) {
174                                try {
175                                        if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
176                                                // increment skip count and try again
177                                                contribution.incrementTemporaryReadSkipCount();
178                                                
179                                                listener.onSkipInRead(e);
180                                                
181                                                logger.debug("Skipping failed input", e);
182                                        }
183                                        else {
184                                                // re-throw only when the skip policy runs out of
185                                                // patience
186                                                throw e;
187                                        }
188                                }
189                                catch (SkipLimitExceededException ex) {
190                                        // we are headed for a abnormal ending so bake in the skip
191                                        // count
192                                        contribution.combineSkipCounts();
193                                        throw ex;
194                                }
195                        }
196 
197                }
198 
199        }
200 
201        /**
202         * Tries to write the item using the writer. In case of exception consults
203         * skip policy before re-throwing the exception. The exception is always
204         * re-thrown, but if the item is seen again on read it will be skipped.
205         * 
206         * @param item item to write
207         * @param contribution current StepContribution holding skipped items count
208         */
209        protected void write(Object item, StepContribution contribution) throws Exception {
210                doWriteWithSkip(item, contribution);
211        }
212 
213        /**
214         * @param item
215         * @param contribution
216         * @throws Exception
217         */
218        protected final void doWriteWithSkip(Object item, StepContribution contribution) throws Exception {
219                // Get the key as early as possible, otherwise it might change in
220                // doWrite()
221                Object key = itemKeyGenerator.getKey(item);
222                try {
223                        doWrite(item);
224                }
225                catch (Exception e) {
226                        if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
227                                contribution.incrementWriteSkipCount();
228                                
229                                addSkippedException(key, e);
230                                logger.debug("Added item to skip list; key=" + key);
231 
232                                listener.onSkipInWrite(item, e);
233                                
234                                // return without re-throwing if exception shouldn't cause
235                                // rollback
236                                if (!shouldRethrow(e)) {
237                                        return;
238                                }
239                        }
240                        // re-throw exception on write by default
241                        throw e;
242                }
243        }
244 
245        private boolean shouldRethrow(Exception e) {
246                for (int i = 0; i < doNotRethrowExceptionClasses.length; i++) {
247                        if (doNotRethrowExceptionClasses[i].isAssignableFrom(e.getClass())) {
248                                return false;
249                        }
250                }
251                return true;
252        }
253 
254        public void mark() throws MarkFailedException {
255                super.mark();
256                clearSkippedExceptions();
257        }
258 
259        /**
260         * Synchronized setter for a skipped exception.
261         */
262        private void addSkippedException(Object key, Throwable e) {
263                synchronized (skippedExceptions) {
264                        if (skippedExceptions.size() >= skipCacheCapacity) {
265                                throw new UnexpectedJobExecutionException(
266                                                "The cache of failed items to skipped unexpectedly reached its capacity ("
267                                                                + skipCacheCapacity
268                                                                + "). "
269                                                                + "This often indicates a problem with the key generation strategy, and/or a mistake in the implementation of hashCode and equals in the items being processed.");
270                        }
271                        skippedExceptions.put(key, e);
272                }
273        }
274 
275        /**
276         * Schedule this key for removal from the skipped exception cache at the end
277         * of this transaction (only removed if business transaction is successful).
278         * 
279         * @param key
280         */
281        private void scheduleForRemoval(Object key) {
282                if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) {
283                        TransactionSynchronizationManager.bindResource(TO_BE_REMOVED, new HashSet());
284                }
285                ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).add(key);
286        }
287 
288        /**
289         * Clear the map of skipped exception corresponding to key.
290         * @param key the key to clear
291         */
292        private void clearSkippedExceptions() {
293                if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) {
294                        return;
295                }
296                synchronized (skippedExceptions) {
297                        for (Iterator iterator = ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).iterator(); iterator
298                                        .hasNext();) {
299                                Object key = iterator.next();
300                                skippedExceptions.remove(key);
301                        }
302                        TransactionSynchronizationManager.unbindResource(TO_BE_REMOVED);
303                }
304        }
305 
306        /**
307         * Synchronized getter for a skipped exception.
308         * @return the skippedExceptions
309         */
310        private Throwable getSkippedException(Object key) {
311                synchronized (skippedExceptions) {
312                        return (Throwable) skippedExceptions.get(key);
313                }
314        }
315 
316        /**
317         * doNotRethrowExceptionClasses will not be re-thrown when skipped.
318         * @param doNotRethrowExceptionClasses empty by default
319         */
320        public void setDoNotRethrowExceptionClasses(Class[] doNotRethrowExceptionClasses) {
321                this.doNotRethrowExceptionClasses = doNotRethrowExceptionClasses;
322        }
323 
324}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov