EMMA Coverage Report (generated Tue May 06 07:29:23 PDT 2008)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [ItemSkipPolicyItemHandler.java]

nameclass, %method, %block, %line, %
ItemSkipPolicyItemHandler.java100% (2/2)94%  (16/17)93%  (325/348)94%  (84.9/90)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ItemSkipPolicyItemHandler100% (1/1)93%  (14/15)93%  (317/340)94%  (82.9/88)
registerSkipListener (SkipListener): void 0%   (0/1)0%   (0/5)0%   (0/2)
setSkipListeners (SkipListener []): void 100% (1/1)50%  (7/14)49%  (1.5/3)
getSkippedException (Object): Throwable 100% (1/1)72%  (13/18)67%  (2/3)
clearSkippedExceptions (): void 100% (1/1)87%  (33/38)96%  (10.5/11)
<static initializer> 100% (1/1)94%  (17/18)94%  (0.9/1)
ItemSkipPolicyItemHandler (ItemReader, ItemWriter): void 100% (1/1)100% (38/38)100% (9/9)
addSkippedException (Object, Throwable): void 100% (1/1)100% (43/43)100% (6/6)
doWriteWithSkip (Object, StepContribution): void 100% (1/1)100% (37/37)100% (10/10)
mark (): void 100% (1/1)100% (5/5)100% (3/3)
read (StepContribution): Object 100% (1/1)100% (87/87)100% (26/26)
scheduleForRemoval (Object): void 100% (1/1)100% (15/15)100% (4/4)
setItemKeyGenerator (ItemKeyGenerator): void 100% (1/1)100% (9/9)100% (4/4)
setItemSkipPolicy (ItemSkipPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setSkipCacheCapacity (int): void 100% (1/1)100% (4/4)100% (2/2)
write (Object, StepContribution): void 100% (1/1)100% (5/5)100% (2/2)
     
class ItemSkipPolicyItemHandler$1100% (1/1)100% (2/2)100% (8/8)100% (2/2)
ItemSkipPolicyItemHandler$1 (ItemSkipPolicyItemHandler): void 100% (1/1)100% (6/6)100% (1/1)
getKey (Object): Object 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import java.util.HashMap;
19import java.util.HashSet;
20import java.util.Iterator;
21import java.util.Map;
22import java.util.Set;
23 
24import org.apache.commons.logging.Log;
25import org.apache.commons.logging.LogFactory;
26import org.springframework.batch.core.SkipListener;
27import org.springframework.batch.core.StepContribution;
28import org.springframework.batch.core.UnexpectedJobExecutionException;
29import org.springframework.batch.core.listener.CompositeSkipListener;
30import org.springframework.batch.core.step.skip.ItemSkipPolicy;
31import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
32import org.springframework.batch.core.step.skip.SkipLimitExceededException;
33import org.springframework.batch.item.ItemKeyGenerator;
34import org.springframework.batch.item.ItemReader;
35import org.springframework.batch.item.ItemWriter;
36import org.springframework.batch.item.MarkFailedException;
37import org.springframework.transaction.support.TransactionSynchronizationManager;
38 
39/**
40 * {@link ItemHandler} that implements skip behavior. It delegates to
41 * {@link #setItemSkipPolicy(ItemSkipPolicy)} to decide whether skip should be called or not.
42 * 
43 * If exception is thrown while reading the item, skip is called on the
44 * {@link ItemReader}. If exception is thrown while writing the item, skip is
45 * called on both {@link ItemReader} and {@link ItemWriter}.
46 * 
47 * @author Dave Syer
48 * @author Robert Kasanicky
49 */
50public class ItemSkipPolicyItemHandler extends SimpleItemHandler {
51 
52        /**
53         * Key for transaction resource that holds skipped keys until they can be
54         * removed
55         */
56        private static final String TO_BE_REMOVED = ItemSkipPolicyItemHandler.class.getName() + ".TO_BE_REMOVED";
57 
58        protected final Log logger = LogFactory.getLog(getClass());
59 
60        private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy();
61 
62        private int skipCacheCapacity = 1024;
63 
64        private Map skippedExceptions = new HashMap();
65 
66        private ItemKeyGenerator defaultItemKeyGenerator = new ItemKeyGenerator() {
67                public Object getKey(Object item) {
68                        return item;
69                }
70        };
71 
72        private ItemKeyGenerator itemKeyGenerator = defaultItemKeyGenerator;
73 
74        private CompositeSkipListener listener = new CompositeSkipListener();
75 
76        /**
77         * Register some {@link SkipListener}s with the handler. Each will get the
78         * callbacks in the order specified at the correct stage if a skip occurs.
79         * 
80         * @param listeners
81         */
82        public void setSkipListeners(SkipListener[] listeners) {
83                for (int i = 0; i < listeners.length; i++) {
84                        registerSkipListener(listeners[i]);
85                }
86        }
87 
88        /**
89         * Register a listener for callbacks at the appropriate stages in a skip
90         * process.
91         * 
92         * @param listener a {@link SkipListener}
93         */
94        public void registerSkipListener(SkipListener listener) {
95                this.listener.register(listener);
96        }
97 
98        /**
99         * Public setter for the {@link ItemKeyGenerator}. Defaults to just return
100         * the item, and since it will be used before a write operation.
101         * Implementations must ensure that items always have the same key when they
102         * are read from the {@link ItemReader} (so if the item is mutable and the
103         * reader does any buffering the key generator might need to take care to
104         * only use data that do not change on write).
105         * 
106         * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. If null
107         * resets to default value.
108         */
109        public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) {
110                if (itemKeyGenerator == null) {
111                        itemKeyGenerator = defaultItemKeyGenerator;
112                }
113                this.itemKeyGenerator = itemKeyGenerator;
114        }
115 
116        /**
117         * @param itemReader
118         * @param itemWriter
119         */
120        public ItemSkipPolicyItemHandler(ItemReader itemReader, ItemWriter itemWriter) {
121                super(itemReader, itemWriter);
122        }
123 
124        /**
125         * @param itemSkipPolicy
126         */
127        public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) {
128                this.itemSkipPolicy = itemSkipPolicy;
129        }
130 
131        /**
132         * Public setter for the capacity of the skipped item cache. If a large
133         * number of items are failing and not being recognized as skipped, it
134         * usually signals a problem with the key generation (often equals and
135         * hashCode in the item itself). So it is better to enforce a strict limit
136         * than have weird looking errors, where a skip limit is reached without
137         * anything being skipped.
138         * 
139         * @param skipCacheCapacity the capacity to set
140         */
141        public void setSkipCacheCapacity(int skipCacheCapacity) {
142                this.skipCacheCapacity = skipCacheCapacity;
143        }
144 
145        /**
146         * Tries to read the item from the reader, in case of exception skip the
147         * item if the skip policy allows, otherwise re-throw.
148         * 
149         * @param contribution current StepContribution holding skipped items count
150         * @return next item for processing
151         */
152        protected Object read(StepContribution contribution) throws Exception {
153 
154                while (true) {
155 
156                        try {
157 
158                                Object item = doRead();
159                                Object key = itemKeyGenerator.getKey(item);
160                                Throwable throwable = getSkippedException(key);
161                                while (item != null && throwable != null) {
162                                        logger.debug("Skipping item on input, previously failed on output; key=[" + key + "]");
163                                        scheduleForRemoval(key);
164                                        if (listener != null) {
165                                                listener.onSkipInWrite(item, throwable);
166                                        }
167                                        item = doRead();
168                                        key = itemKeyGenerator.getKey(item);
169                                        throwable = getSkippedException(key);
170                                }
171                                return item;
172 
173                        }
174                        catch (Exception e) {
175                                try {
176                                        if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
177                                                // increment skip count and try again
178                                                contribution.incrementReadSkipCount();
179                                                if (listener != null) {
180                                                        listener.onSkipInRead(e);
181                                                }
182                                                logger.debug("Skipping failed input", e);
183                                        }
184                                        else {
185                                                // re-throw only when the skip policy runs out of
186                                                // patience
187                                                throw e;
188                                        }
189                                }
190                                catch (SkipLimitExceededException ex) {
191                                        // we are headed for a abnormal ending so bake in the skip
192                                        // count
193                                        contribution.combineSkipCounts();
194                                        throw ex;
195                                }
196                        }
197 
198                }
199 
200        }
201 
202        /**
203         * Tries to write the item using the writer. In case of exception consults
204         * skip policy before re-throwing the exception. The exception is always
205         * re-thrown, but if the item is seen again on read it will be skipped.
206         * 
207         * @param item item to write
208         * @param contribution current StepContribution holding skipped items count
209         */
210        protected void write(Object item, StepContribution contribution) throws Exception {
211                doWriteWithSkip(item, contribution);
212        }
213 
214        /**
215         * @param item
216         * @param contribution
217         * @throws Exception
218         */
219        protected final void doWriteWithSkip(Object item, StepContribution contribution) throws Exception {
220                // Get the key as early as possible, otherwise it might change in
221                // doWrite()
222                Object key = itemKeyGenerator.getKey(item);
223                try {
224                        doWrite(item);
225                }
226                catch (Exception e) {
227                        if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) {
228                                contribution.incrementSkipCount();
229                                // don't call the listener here - the transaction is going to
230                                // roll back
231                                addSkippedException(key, e);
232                                logger.debug("Added item to skip list; key=" + key);
233                        }
234                        // always re-throw exception on write
235                        throw e;
236                }
237        }
238 
239        public void mark() throws MarkFailedException {
240                super.mark();
241                clearSkippedExceptions();
242        }
243 
244        /**
245         * Synchronized setter for a skipped exception.
246         */
247        private void addSkippedException(Object key, Throwable e) {
248                synchronized (skippedExceptions) {
249                        if (skippedExceptions.size() >= skipCacheCapacity) {
250                                throw new UnexpectedJobExecutionException(
251                                                "The cache of failed items to skipped unexpectedly reached its capacity ("
252                                                                + skipCacheCapacity
253                                                                + "). "
254                                                                + "This often indicates a problem with the key generation strategy, and/or a mistake in the implementation of hashCode and equals in the items being processed.");
255                        }
256                        skippedExceptions.put(key, e);
257                }
258        }
259 
260        /**
261         * Schedule this key for removal from the skipped exception cache at the end
262         * of this transaction (only removed if business transaction is successful).
263         * 
264         * @param key
265         */
266        private void scheduleForRemoval(Object key) {
267                if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) {
268                        TransactionSynchronizationManager.bindResource(TO_BE_REMOVED, new HashSet());
269                }
270                ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).add(key);
271        }
272 
273        /**
274         * Clear the map of skipped exception corresponding to key.
275         * @param key the key to clear
276         */
277        private void clearSkippedExceptions() {
278                if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) {
279                        return;
280                }
281                synchronized (skippedExceptions) {
282                        for (Iterator iterator = ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).iterator(); iterator
283                                        .hasNext();) {
284                                Object key = (Object) iterator.next();
285                                skippedExceptions.remove(key);
286                        }
287                        TransactionSynchronizationManager.unbindResource(TO_BE_REMOVED);
288                }
289        }
290 
291        /**
292         * Synchronized getter for a skipped exception.
293         * @return the skippedExceptions
294         */
295        private Throwable getSkippedException(Object key) {
296                synchronized (skippedExceptions) {
297                        return (Throwable) skippedExceptions.get(key);
298                }
299        }
300 
301}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov