| 1 | /* |
| 2 | * Copyright 2006-2007 the original author or authors. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.springframework.batch.core.step.item; |
| 17 | |
| 18 | import java.util.HashMap; |
| 19 | import java.util.HashSet; |
| 20 | import java.util.Iterator; |
| 21 | import java.util.Map; |
| 22 | import java.util.Set; |
| 23 | |
| 24 | import org.springframework.batch.core.SkipListener; |
| 25 | import org.springframework.batch.core.StepContribution; |
| 26 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
| 27 | import org.springframework.batch.core.listener.CompositeSkipListener; |
| 28 | import org.springframework.batch.core.step.skip.ItemSkipPolicy; |
| 29 | import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; |
| 30 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
| 31 | import org.springframework.batch.item.ItemKeyGenerator; |
| 32 | import org.springframework.batch.item.ItemReader; |
| 33 | import org.springframework.batch.item.ItemWriter; |
| 34 | import org.springframework.batch.item.MarkFailedException; |
| 35 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
| 36 | |
| 37 | /** |
| 38 | * {@link ItemHandler} that implements skip behavior. It delegates to |
| 39 | * {@link #setItemSkipPolicy(ItemSkipPolicy)} to decide whether skip should be |
| 40 | * called or not. |
| 41 | * |
| 42 | * When exception is skipped on read it is *not* re-thrown (does not cause tx |
| 43 | * rollback). Skipped exception on write is re-thrown by default (causes tx |
| 44 | * rollback) unless the exception class is included in |
| 45 | * {@link #setDoNotRethrowExceptionClasses(Class[])}. |
| 46 | * |
| 47 | * If exception is thrown while reading the item, skip is called on the |
| 48 | * {@link ItemReader}. If exception is thrown while writing the item, skip is |
| 49 | * called on both {@link ItemReader} and {@link ItemWriter}. |
| 50 | * |
| 51 | * @author Dave Syer |
| 52 | * @author Robert Kasanicky |
| 53 | */ |
| 54 | public class ItemSkipPolicyItemHandler extends SimpleItemHandler { |
| 55 | |
| 56 | /** |
| 57 | * Key for transaction resource that holds skipped keys until they can be |
| 58 | * removed |
| 59 | */ |
| 60 | private static final String TO_BE_REMOVED = ItemSkipPolicyItemHandler.class.getName() + ".TO_BE_REMOVED"; |
| 61 | |
| 62 | private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy(); |
| 63 | |
| 64 | private int skipCacheCapacity = 1024; |
| 65 | |
| 66 | private Map skippedExceptions = new HashMap(); |
| 67 | |
| 68 | private Class[] doNotRethrowExceptionClasses = new Class[] {}; |
| 69 | |
| 70 | private static final ItemKeyGenerator defaultItemKeyGenerator = new ItemKeyGenerator() { |
| 71 | public Object getKey(Object item) { |
| 72 | return item; |
| 73 | } |
| 74 | }; |
| 75 | |
| 76 | private ItemKeyGenerator itemKeyGenerator = defaultItemKeyGenerator; |
| 77 | |
| 78 | private CompositeSkipListener listener = new CompositeSkipListener(); |
| 79 | |
| 80 | /** |
| 81 | * Register some {@link SkipListener}s with the handler. Each will get the |
| 82 | * callbacks in the order specified at the correct stage if a skip occurs. |
| 83 | * |
| 84 | * @param listeners |
| 85 | */ |
| 86 | public void setSkipListeners(SkipListener[] listeners) { |
| 87 | for (int i = 0; i < listeners.length; i++) { |
| 88 | registerSkipListener(listeners[i]); |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | /** |
| 93 | * Register a listener for callbacks at the appropriate stages in a skip |
| 94 | * process. |
| 95 | * |
| 96 | * @param listener a {@link SkipListener} |
| 97 | */ |
| 98 | public void registerSkipListener(SkipListener listener) { |
| 99 | this.listener.register(listener); |
| 100 | } |
| 101 | |
| 102 | /** |
| 103 | * Public setter for the {@link ItemKeyGenerator}. Defaults to just return |
| 104 | * the item, and since it will be used before a write operation. |
| 105 | * Implementations must ensure that items always have the same key when they |
| 106 | * are read from the {@link ItemReader} (so if the item is mutable and the |
| 107 | * reader does any buffering the key generator might need to take care to |
| 108 | * only use data that do not change on write). |
| 109 | * |
| 110 | * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. If null |
| 111 | * resets to default value. |
| 112 | */ |
| 113 | public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) { |
| 114 | this.itemKeyGenerator = (itemKeyGenerator == null) ? defaultItemKeyGenerator : itemKeyGenerator; |
| 115 | } |
| 116 | |
| 117 | /** |
| 118 | * @param itemReader |
| 119 | * @param itemWriter |
| 120 | */ |
| 121 | public ItemSkipPolicyItemHandler(ItemReader itemReader, ItemWriter itemWriter) { |
| 122 | super(itemReader, itemWriter); |
| 123 | } |
| 124 | |
| 125 | /** |
| 126 | * @param itemSkipPolicy |
| 127 | */ |
| 128 | public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) { |
| 129 | this.itemSkipPolicy = itemSkipPolicy; |
| 130 | } |
| 131 | |
| 132 | /** |
| 133 | * Public setter for the capacity of the skipped item cache. If a large |
| 134 | * number of items are failing and not being recognized as skipped, it |
| 135 | * usually signals a problem with the key generation (often equals and |
| 136 | * hashCode in the item itself). So it is better to enforce a strict limit |
| 137 | * than have weird looking errors, where a skip limit is reached without |
| 138 | * anything being skipped. |
| 139 | * |
| 140 | * @param skipCacheCapacity the capacity to set |
| 141 | */ |
| 142 | public void setSkipCacheCapacity(int skipCacheCapacity) { |
| 143 | this.skipCacheCapacity = skipCacheCapacity; |
| 144 | } |
| 145 | |
| 146 | /** |
| 147 | * Tries to read the item from the reader, in case of exception skip the |
| 148 | * item if the skip policy allows, otherwise re-throw. |
| 149 | * |
| 150 | * @param contribution current StepContribution holding skipped items count |
| 151 | * @return next item for processing |
| 152 | */ |
| 153 | protected Object read(StepContribution contribution) throws Exception { |
| 154 | |
| 155 | while (true) { |
| 156 | |
| 157 | try { |
| 158 | |
| 159 | Object item = doRead(); |
| 160 | Object key = itemKeyGenerator.getKey(item); |
| 161 | Throwable throwable = getSkippedException(key); |
| 162 | while (item != null && throwable != null) { |
| 163 | logger.debug("Skipping item on input, previously failed on output; key=[" + key + "]"); |
| 164 | scheduleForRemoval(key); |
| 165 | |
| 166 | item = doRead(); |
| 167 | key = itemKeyGenerator.getKey(item); |
| 168 | throwable = getSkippedException(key); |
| 169 | } |
| 170 | return item; |
| 171 | |
| 172 | } |
| 173 | catch (Exception e) { |
| 174 | try { |
| 175 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
| 176 | // increment skip count and try again |
| 177 | contribution.incrementTemporaryReadSkipCount(); |
| 178 | |
| 179 | listener.onSkipInRead(e); |
| 180 | |
| 181 | logger.debug("Skipping failed input", e); |
| 182 | } |
| 183 | else { |
| 184 | // re-throw only when the skip policy runs out of |
| 185 | // patience |
| 186 | throw e; |
| 187 | } |
| 188 | } |
| 189 | catch (SkipLimitExceededException ex) { |
| 190 | // we are headed for a abnormal ending so bake in the skip |
| 191 | // count |
| 192 | contribution.combineSkipCounts(); |
| 193 | throw ex; |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | } |
| 198 | |
| 199 | } |
| 200 | |
| 201 | /** |
| 202 | * Tries to write the item using the writer. In case of exception consults |
| 203 | * skip policy before re-throwing the exception. The exception is always |
| 204 | * re-thrown, but if the item is seen again on read it will be skipped. |
| 205 | * |
| 206 | * @param item item to write |
| 207 | * @param contribution current StepContribution holding skipped items count |
| 208 | */ |
| 209 | protected void write(Object item, StepContribution contribution) throws Exception { |
| 210 | doWriteWithSkip(item, contribution); |
| 211 | } |
| 212 | |
| 213 | /** |
| 214 | * @param item |
| 215 | * @param contribution |
| 216 | * @throws Exception |
| 217 | */ |
| 218 | protected final void doWriteWithSkip(Object item, StepContribution contribution) throws Exception { |
| 219 | // Get the key as early as possible, otherwise it might change in |
| 220 | // doWrite() |
| 221 | Object key = itemKeyGenerator.getKey(item); |
| 222 | try { |
| 223 | doWrite(item); |
| 224 | } |
| 225 | catch (Exception e) { |
| 226 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
| 227 | contribution.incrementWriteSkipCount(); |
| 228 | |
| 229 | addSkippedException(key, e); |
| 230 | logger.debug("Added item to skip list; key=" + key); |
| 231 | |
| 232 | listener.onSkipInWrite(item, e); |
| 233 | |
| 234 | // return without re-throwing if exception shouldn't cause |
| 235 | // rollback |
| 236 | if (!shouldRethrow(e)) { |
| 237 | return; |
| 238 | } |
| 239 | } |
| 240 | // re-throw exception on write by default |
| 241 | throw e; |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | private boolean shouldRethrow(Exception e) { |
| 246 | for (int i = 0; i < doNotRethrowExceptionClasses.length; i++) { |
| 247 | if (doNotRethrowExceptionClasses[i].isAssignableFrom(e.getClass())) { |
| 248 | return false; |
| 249 | } |
| 250 | } |
| 251 | return true; |
| 252 | } |
| 253 | |
| 254 | public void mark() throws MarkFailedException { |
| 255 | super.mark(); |
| 256 | clearSkippedExceptions(); |
| 257 | } |
| 258 | |
| 259 | /** |
| 260 | * Synchronized setter for a skipped exception. |
| 261 | */ |
| 262 | private void addSkippedException(Object key, Throwable e) { |
| 263 | synchronized (skippedExceptions) { |
| 264 | if (skippedExceptions.size() >= skipCacheCapacity) { |
| 265 | throw new UnexpectedJobExecutionException( |
| 266 | "The cache of failed items to skipped unexpectedly reached its capacity (" |
| 267 | + skipCacheCapacity |
| 268 | + "). " |
| 269 | + "This often indicates a problem with the key generation strategy, and/or a mistake in the implementation of hashCode and equals in the items being processed."); |
| 270 | } |
| 271 | skippedExceptions.put(key, e); |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | /** |
| 276 | * Schedule this key for removal from the skipped exception cache at the end |
| 277 | * of this transaction (only removed if business transaction is successful). |
| 278 | * |
| 279 | * @param key |
| 280 | */ |
| 281 | private void scheduleForRemoval(Object key) { |
| 282 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
| 283 | TransactionSynchronizationManager.bindResource(TO_BE_REMOVED, new HashSet()); |
| 284 | } |
| 285 | ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).add(key); |
| 286 | } |
| 287 | |
| 288 | /** |
| 289 | * Clear the map of skipped exception corresponding to key. |
| 290 | * @param key the key to clear |
| 291 | */ |
| 292 | private void clearSkippedExceptions() { |
| 293 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
| 294 | return; |
| 295 | } |
| 296 | synchronized (skippedExceptions) { |
| 297 | for (Iterator iterator = ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).iterator(); iterator |
| 298 | .hasNext();) { |
| 299 | Object key = iterator.next(); |
| 300 | skippedExceptions.remove(key); |
| 301 | } |
| 302 | TransactionSynchronizationManager.unbindResource(TO_BE_REMOVED); |
| 303 | } |
| 304 | } |
| 305 | |
| 306 | /** |
| 307 | * Synchronized getter for a skipped exception. |
| 308 | * @return the skippedExceptions |
| 309 | */ |
| 310 | private Throwable getSkippedException(Object key) { |
| 311 | synchronized (skippedExceptions) { |
| 312 | return (Throwable) skippedExceptions.get(key); |
| 313 | } |
| 314 | } |
| 315 | |
| 316 | /** |
| 317 | * doNotRethrowExceptionClasses will not be re-thrown when skipped. |
| 318 | * @param doNotRethrowExceptionClasses empty by default |
| 319 | */ |
| 320 | public void setDoNotRethrowExceptionClasses(Class[] doNotRethrowExceptionClasses) { |
| 321 | this.doNotRethrowExceptionClasses = doNotRethrowExceptionClasses; |
| 322 | } |
| 323 | |
| 324 | } |