1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.item; |
17 | |
18 | import java.util.HashMap; |
19 | import java.util.HashSet; |
20 | import java.util.Iterator; |
21 | import java.util.Map; |
22 | import java.util.Set; |
23 | |
24 | import org.apache.commons.logging.Log; |
25 | import org.apache.commons.logging.LogFactory; |
26 | import org.springframework.batch.core.SkipListener; |
27 | import org.springframework.batch.core.StepContribution; |
28 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
29 | import org.springframework.batch.core.listener.CompositeSkipListener; |
30 | import org.springframework.batch.core.step.skip.ItemSkipPolicy; |
31 | import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; |
32 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
33 | import org.springframework.batch.item.ItemKeyGenerator; |
34 | import org.springframework.batch.item.ItemReader; |
35 | import org.springframework.batch.item.ItemWriter; |
36 | import org.springframework.batch.item.MarkFailedException; |
37 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
38 | |
39 | /** |
40 | * {@link ItemHandler} that implements skip behavior. It delegates to |
41 | * {@link #setItemSkipPolicy(ItemSkipPolicy)} to decide whether skip should be called or not. |
42 | * |
43 | * If exception is thrown while reading the item, skip is called on the |
44 | * {@link ItemReader}. If exception is thrown while writing the item, skip is |
45 | * called on both {@link ItemReader} and {@link ItemWriter}. |
46 | * |
47 | * @author Dave Syer |
48 | * @author Robert Kasanicky |
49 | */ |
50 | public class ItemSkipPolicyItemHandler extends SimpleItemHandler { |
51 | |
52 | /** |
53 | * Key for transaction resource that holds skipped keys until they can be |
54 | * removed |
55 | */ |
56 | private static final String TO_BE_REMOVED = ItemSkipPolicyItemHandler.class.getName() + ".TO_BE_REMOVED"; |
57 | |
58 | protected final Log logger = LogFactory.getLog(getClass()); |
59 | |
60 | private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy(); |
61 | |
62 | private int skipCacheCapacity = 1024; |
63 | |
64 | private Map skippedExceptions = new HashMap(); |
65 | |
66 | private ItemKeyGenerator defaultItemKeyGenerator = new ItemKeyGenerator() { |
67 | public Object getKey(Object item) { |
68 | return item; |
69 | } |
70 | }; |
71 | |
72 | private ItemKeyGenerator itemKeyGenerator = defaultItemKeyGenerator; |
73 | |
74 | private CompositeSkipListener listener = new CompositeSkipListener(); |
75 | |
76 | /** |
77 | * Register some {@link SkipListener}s with the handler. Each will get the |
78 | * callbacks in the order specified at the correct stage if a skip occurs. |
79 | * |
80 | * @param listeners |
81 | */ |
82 | public void setSkipListeners(SkipListener[] listeners) { |
83 | for (int i = 0; i < listeners.length; i++) { |
84 | registerSkipListener(listeners[i]); |
85 | } |
86 | } |
87 | |
88 | /** |
89 | * Register a listener for callbacks at the appropriate stages in a skip |
90 | * process. |
91 | * |
92 | * @param listener a {@link SkipListener} |
93 | */ |
94 | public void registerSkipListener(SkipListener listener) { |
95 | this.listener.register(listener); |
96 | } |
97 | |
98 | /** |
99 | * Public setter for the {@link ItemKeyGenerator}. Defaults to just return |
100 | * the item, and since it will be used before a write operation. |
101 | * Implementations must ensure that items always have the same key when they |
102 | * are read from the {@link ItemReader} (so if the item is mutable and the |
103 | * reader does any buffering the key generator might need to take care to |
104 | * only use data that do not change on write). |
105 | * |
106 | * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. If null |
107 | * resets to default value. |
108 | */ |
109 | public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) { |
110 | if (itemKeyGenerator == null) { |
111 | itemKeyGenerator = defaultItemKeyGenerator; |
112 | } |
113 | this.itemKeyGenerator = itemKeyGenerator; |
114 | } |
115 | |
116 | /** |
117 | * @param itemReader |
118 | * @param itemWriter |
119 | */ |
120 | public ItemSkipPolicyItemHandler(ItemReader itemReader, ItemWriter itemWriter) { |
121 | super(itemReader, itemWriter); |
122 | } |
123 | |
124 | /** |
125 | * @param itemSkipPolicy |
126 | */ |
127 | public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) { |
128 | this.itemSkipPolicy = itemSkipPolicy; |
129 | } |
130 | |
131 | /** |
132 | * Public setter for the capacity of the skipped item cache. If a large |
133 | * number of items are failing and not being recognized as skipped, it |
134 | * usually signals a problem with the key generation (often equals and |
135 | * hashCode in the item itself). So it is better to enforce a strict limit |
136 | * than have weird looking errors, where a skip limit is reached without |
137 | * anything being skipped. |
138 | * |
139 | * @param skipCacheCapacity the capacity to set |
140 | */ |
141 | public void setSkipCacheCapacity(int skipCacheCapacity) { |
142 | this.skipCacheCapacity = skipCacheCapacity; |
143 | } |
144 | |
145 | /** |
146 | * Tries to read the item from the reader, in case of exception skip the |
147 | * item if the skip policy allows, otherwise re-throw. |
148 | * |
149 | * @param contribution current StepContribution holding skipped items count |
150 | * @return next item for processing |
151 | */ |
152 | protected Object read(StepContribution contribution) throws Exception { |
153 | |
154 | while (true) { |
155 | |
156 | try { |
157 | |
158 | Object item = doRead(); |
159 | Object key = itemKeyGenerator.getKey(item); |
160 | Throwable throwable = getSkippedException(key); |
161 | while (item != null && throwable != null) { |
162 | logger.debug("Skipping item on input, previously failed on output; key=[" + key + "]"); |
163 | scheduleForRemoval(key); |
164 | if (listener != null) { |
165 | listener.onSkipInWrite(item, throwable); |
166 | } |
167 | item = doRead(); |
168 | key = itemKeyGenerator.getKey(item); |
169 | throwable = getSkippedException(key); |
170 | } |
171 | return item; |
172 | |
173 | } |
174 | catch (Exception e) { |
175 | try { |
176 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
177 | // increment skip count and try again |
178 | contribution.incrementReadSkipCount(); |
179 | if (listener != null) { |
180 | listener.onSkipInRead(e); |
181 | } |
182 | logger.debug("Skipping failed input", e); |
183 | } |
184 | else { |
185 | // re-throw only when the skip policy runs out of |
186 | // patience |
187 | throw e; |
188 | } |
189 | } |
190 | catch (SkipLimitExceededException ex) { |
191 | // we are headed for a abnormal ending so bake in the skip |
192 | // count |
193 | contribution.combineSkipCounts(); |
194 | throw ex; |
195 | } |
196 | } |
197 | |
198 | } |
199 | |
200 | } |
201 | |
202 | /** |
203 | * Tries to write the item using the writer. In case of exception consults |
204 | * skip policy before re-throwing the exception. The exception is always |
205 | * re-thrown, but if the item is seen again on read it will be skipped. |
206 | * |
207 | * @param item item to write |
208 | * @param contribution current StepContribution holding skipped items count |
209 | */ |
210 | protected void write(Object item, StepContribution contribution) throws Exception { |
211 | doWriteWithSkip(item, contribution); |
212 | } |
213 | |
214 | /** |
215 | * @param item |
216 | * @param contribution |
217 | * @throws Exception |
218 | */ |
219 | protected final void doWriteWithSkip(Object item, StepContribution contribution) throws Exception { |
220 | // Get the key as early as possible, otherwise it might change in |
221 | // doWrite() |
222 | Object key = itemKeyGenerator.getKey(item); |
223 | try { |
224 | doWrite(item); |
225 | } |
226 | catch (Exception e) { |
227 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
228 | contribution.incrementSkipCount(); |
229 | // don't call the listener here - the transaction is going to |
230 | // roll back |
231 | addSkippedException(key, e); |
232 | logger.debug("Added item to skip list; key=" + key); |
233 | } |
234 | // always re-throw exception on write |
235 | throw e; |
236 | } |
237 | } |
238 | |
239 | public void mark() throws MarkFailedException { |
240 | super.mark(); |
241 | clearSkippedExceptions(); |
242 | } |
243 | |
244 | /** |
245 | * Synchronized setter for a skipped exception. |
246 | */ |
247 | private void addSkippedException(Object key, Throwable e) { |
248 | synchronized (skippedExceptions) { |
249 | if (skippedExceptions.size() >= skipCacheCapacity) { |
250 | throw new UnexpectedJobExecutionException( |
251 | "The cache of failed items to skipped unexpectedly reached its capacity (" |
252 | + skipCacheCapacity |
253 | + "). " |
254 | + "This often indicates a problem with the key generation strategy, and/or a mistake in the implementation of hashCode and equals in the items being processed."); |
255 | } |
256 | skippedExceptions.put(key, e); |
257 | } |
258 | } |
259 | |
260 | /** |
261 | * Schedule this key for removal from the skipped exception cache at the end |
262 | * of this transaction (only removed if business transaction is successful). |
263 | * |
264 | * @param key |
265 | */ |
266 | private void scheduleForRemoval(Object key) { |
267 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
268 | TransactionSynchronizationManager.bindResource(TO_BE_REMOVED, new HashSet()); |
269 | } |
270 | ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).add(key); |
271 | } |
272 | |
273 | /** |
274 | * Clear the map of skipped exception corresponding to key. |
275 | * @param key the key to clear |
276 | */ |
277 | private void clearSkippedExceptions() { |
278 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
279 | return; |
280 | } |
281 | synchronized (skippedExceptions) { |
282 | for (Iterator iterator = ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).iterator(); iterator |
283 | .hasNext();) { |
284 | Object key = (Object) iterator.next(); |
285 | skippedExceptions.remove(key); |
286 | } |
287 | TransactionSynchronizationManager.unbindResource(TO_BE_REMOVED); |
288 | } |
289 | } |
290 | |
291 | /** |
292 | * Synchronized getter for a skipped exception. |
293 | * @return the skippedExceptions |
294 | */ |
295 | private Throwable getSkippedException(Object key) { |
296 | synchronized (skippedExceptions) { |
297 | return (Throwable) skippedExceptions.get(key); |
298 | } |
299 | } |
300 | |
301 | } |