1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.item; |
17 | |
18 | import java.util.HashMap; |
19 | import java.util.HashSet; |
20 | import java.util.Iterator; |
21 | import java.util.Map; |
22 | import java.util.Set; |
23 | |
24 | import org.springframework.batch.core.SkipListener; |
25 | import org.springframework.batch.core.StepContribution; |
26 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
27 | import org.springframework.batch.core.listener.CompositeSkipListener; |
28 | import org.springframework.batch.core.step.skip.ItemSkipPolicy; |
29 | import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; |
30 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
31 | import org.springframework.batch.item.ItemKeyGenerator; |
32 | import org.springframework.batch.item.ItemReader; |
33 | import org.springframework.batch.item.ItemWriter; |
34 | import org.springframework.batch.item.MarkFailedException; |
35 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
36 | |
37 | /** |
38 | * {@link ItemHandler} that implements skip behavior. It delegates to |
39 | * {@link #setItemSkipPolicy(ItemSkipPolicy)} to decide whether skip should be |
40 | * called or not. |
41 | * |
42 | * When exception is skipped on read it is *not* re-thrown (does not cause tx |
43 | * rollback). Skipped exception on write is re-thrown by default (causes tx |
44 | * rollback) unless the exception class is included in |
45 | * {@link #setDoNotRethrowExceptionClasses(Class[])}. |
46 | * |
47 | * If exception is thrown while reading the item, skip is called on the |
48 | * {@link ItemReader}. If exception is thrown while writing the item, skip is |
49 | * called on both {@link ItemReader} and {@link ItemWriter}. |
50 | * |
51 | * @author Dave Syer |
52 | * @author Robert Kasanicky |
53 | */ |
54 | public class ItemSkipPolicyItemHandler extends SimpleItemHandler { |
55 | |
56 | /** |
57 | * Key for transaction resource that holds skipped keys until they can be |
58 | * removed |
59 | */ |
60 | private static final String TO_BE_REMOVED = ItemSkipPolicyItemHandler.class.getName() + ".TO_BE_REMOVED"; |
61 | |
62 | private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy(); |
63 | |
64 | private int skipCacheCapacity = 1024; |
65 | |
66 | private Map skippedExceptions = new HashMap(); |
67 | |
68 | private Class[] doNotRethrowExceptionClasses = new Class[] {}; |
69 | |
70 | private static final ItemKeyGenerator defaultItemKeyGenerator = new ItemKeyGenerator() { |
71 | public Object getKey(Object item) { |
72 | return item; |
73 | } |
74 | }; |
75 | |
76 | private ItemKeyGenerator itemKeyGenerator = defaultItemKeyGenerator; |
77 | |
78 | private CompositeSkipListener listener = new CompositeSkipListener(); |
79 | |
80 | /** |
81 | * Register some {@link SkipListener}s with the handler. Each will get the |
82 | * callbacks in the order specified at the correct stage if a skip occurs. |
83 | * |
84 | * @param listeners |
85 | */ |
86 | public void setSkipListeners(SkipListener[] listeners) { |
87 | for (int i = 0; i < listeners.length; i++) { |
88 | registerSkipListener(listeners[i]); |
89 | } |
90 | } |
91 | |
92 | /** |
93 | * Register a listener for callbacks at the appropriate stages in a skip |
94 | * process. |
95 | * |
96 | * @param listener a {@link SkipListener} |
97 | */ |
98 | public void registerSkipListener(SkipListener listener) { |
99 | this.listener.register(listener); |
100 | } |
101 | |
102 | /** |
103 | * Public setter for the {@link ItemKeyGenerator}. Defaults to just return |
104 | * the item, and since it will be used before a write operation. |
105 | * Implementations must ensure that items always have the same key when they |
106 | * are read from the {@link ItemReader} (so if the item is mutable and the |
107 | * reader does any buffering the key generator might need to take care to |
108 | * only use data that do not change on write). |
109 | * |
110 | * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. If null |
111 | * resets to default value. |
112 | */ |
113 | public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) { |
114 | this.itemKeyGenerator = (itemKeyGenerator == null) ? defaultItemKeyGenerator : itemKeyGenerator; |
115 | } |
116 | |
117 | /** |
118 | * @param itemReader |
119 | * @param itemWriter |
120 | */ |
121 | public ItemSkipPolicyItemHandler(ItemReader itemReader, ItemWriter itemWriter) { |
122 | super(itemReader, itemWriter); |
123 | } |
124 | |
125 | /** |
126 | * @param itemSkipPolicy |
127 | */ |
128 | public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) { |
129 | this.itemSkipPolicy = itemSkipPolicy; |
130 | } |
131 | |
132 | /** |
133 | * Public setter for the capacity of the skipped item cache. If a large |
134 | * number of items are failing and not being recognized as skipped, it |
135 | * usually signals a problem with the key generation (often equals and |
136 | * hashCode in the item itself). So it is better to enforce a strict limit |
137 | * than have weird looking errors, where a skip limit is reached without |
138 | * anything being skipped. |
139 | * |
140 | * @param skipCacheCapacity the capacity to set |
141 | */ |
142 | public void setSkipCacheCapacity(int skipCacheCapacity) { |
143 | this.skipCacheCapacity = skipCacheCapacity; |
144 | } |
145 | |
146 | /** |
147 | * Tries to read the item from the reader, in case of exception skip the |
148 | * item if the skip policy allows, otherwise re-throw. |
149 | * |
150 | * @param contribution current StepContribution holding skipped items count |
151 | * @return next item for processing |
152 | */ |
153 | protected Object read(StepContribution contribution) throws Exception { |
154 | |
155 | while (true) { |
156 | |
157 | try { |
158 | |
159 | Object item = doRead(); |
160 | Object key = itemKeyGenerator.getKey(item); |
161 | Throwable throwable = getSkippedException(key); |
162 | while (item != null && throwable != null) { |
163 | logger.debug("Skipping item on input, previously failed on output; key=[" + key + "]"); |
164 | scheduleForRemoval(key); |
165 | |
166 | item = doRead(); |
167 | key = itemKeyGenerator.getKey(item); |
168 | throwable = getSkippedException(key); |
169 | } |
170 | return item; |
171 | |
172 | } |
173 | catch (Exception e) { |
174 | try { |
175 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
176 | // increment skip count and try again |
177 | contribution.incrementTemporaryReadSkipCount(); |
178 | |
179 | listener.onSkipInRead(e); |
180 | |
181 | logger.debug("Skipping failed input", e); |
182 | } |
183 | else { |
184 | // re-throw only when the skip policy runs out of |
185 | // patience |
186 | throw e; |
187 | } |
188 | } |
189 | catch (SkipLimitExceededException ex) { |
190 | // we are headed for a abnormal ending so bake in the skip |
191 | // count |
192 | contribution.combineSkipCounts(); |
193 | throw ex; |
194 | } |
195 | } |
196 | |
197 | } |
198 | |
199 | } |
200 | |
201 | /** |
202 | * Tries to write the item using the writer. In case of exception consults |
203 | * skip policy before re-throwing the exception. The exception is always |
204 | * re-thrown, but if the item is seen again on read it will be skipped. |
205 | * |
206 | * @param item item to write |
207 | * @param contribution current StepContribution holding skipped items count |
208 | */ |
209 | protected void write(Object item, StepContribution contribution) throws Exception { |
210 | doWriteWithSkip(item, contribution); |
211 | } |
212 | |
213 | /** |
214 | * @param item |
215 | * @param contribution |
216 | * @throws Exception |
217 | */ |
218 | protected final void doWriteWithSkip(Object item, StepContribution contribution) throws Exception { |
219 | // Get the key as early as possible, otherwise it might change in |
220 | // doWrite() |
221 | Object key = itemKeyGenerator.getKey(item); |
222 | try { |
223 | doWrite(item); |
224 | } |
225 | catch (Exception e) { |
226 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
227 | contribution.incrementWriteSkipCount(); |
228 | |
229 | addSkippedException(key, e); |
230 | logger.debug("Added item to skip list; key=" + key); |
231 | |
232 | listener.onSkipInWrite(item, e); |
233 | |
234 | // return without re-throwing if exception shouldn't cause |
235 | // rollback |
236 | if (!shouldRethrow(e)) { |
237 | return; |
238 | } |
239 | } |
240 | // re-throw exception on write by default |
241 | throw e; |
242 | } |
243 | } |
244 | |
245 | private boolean shouldRethrow(Exception e) { |
246 | for (int i = 0; i < doNotRethrowExceptionClasses.length; i++) { |
247 | if (doNotRethrowExceptionClasses[i].isAssignableFrom(e.getClass())) { |
248 | return false; |
249 | } |
250 | } |
251 | return true; |
252 | } |
253 | |
254 | public void mark() throws MarkFailedException { |
255 | super.mark(); |
256 | clearSkippedExceptions(); |
257 | } |
258 | |
259 | /** |
260 | * Synchronized setter for a skipped exception. |
261 | */ |
262 | private void addSkippedException(Object key, Throwable e) { |
263 | synchronized (skippedExceptions) { |
264 | if (skippedExceptions.size() >= skipCacheCapacity) { |
265 | throw new UnexpectedJobExecutionException( |
266 | "The cache of failed items to skipped unexpectedly reached its capacity (" |
267 | + skipCacheCapacity |
268 | + "). " |
269 | + "This often indicates a problem with the key generation strategy, and/or a mistake in the implementation of hashCode and equals in the items being processed."); |
270 | } |
271 | skippedExceptions.put(key, e); |
272 | } |
273 | } |
274 | |
275 | /** |
276 | * Schedule this key for removal from the skipped exception cache at the end |
277 | * of this transaction (only removed if business transaction is successful). |
278 | * |
279 | * @param key |
280 | */ |
281 | private void scheduleForRemoval(Object key) { |
282 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
283 | TransactionSynchronizationManager.bindResource(TO_BE_REMOVED, new HashSet()); |
284 | } |
285 | ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).add(key); |
286 | } |
287 | |
288 | /** |
289 | * Clear the map of skipped exception corresponding to key. |
290 | * @param key the key to clear |
291 | */ |
292 | private void clearSkippedExceptions() { |
293 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
294 | return; |
295 | } |
296 | synchronized (skippedExceptions) { |
297 | for (Iterator iterator = ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).iterator(); iterator |
298 | .hasNext();) { |
299 | Object key = iterator.next(); |
300 | skippedExceptions.remove(key); |
301 | } |
302 | TransactionSynchronizationManager.unbindResource(TO_BE_REMOVED); |
303 | } |
304 | } |
305 | |
306 | /** |
307 | * Synchronized getter for a skipped exception. |
308 | * @return the skippedExceptions |
309 | */ |
310 | private Throwable getSkippedException(Object key) { |
311 | synchronized (skippedExceptions) { |
312 | return (Throwable) skippedExceptions.get(key); |
313 | } |
314 | } |
315 | |
316 | /** |
317 | * doNotRethrowExceptionClasses will not be re-thrown when skipped. |
318 | * @param doNotRethrowExceptionClasses empty by default |
319 | */ |
320 | public void setDoNotRethrowExceptionClasses(Class[] doNotRethrowExceptionClasses) { |
321 | this.doNotRethrowExceptionClasses = doNotRethrowExceptionClasses; |
322 | } |
323 | |
324 | } |