| 1 | /* |
| 2 | * Copyright 2006-2007 the original author or authors. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.springframework.batch.core.step.item; |
| 17 | |
| 18 | import java.util.HashMap; |
| 19 | import java.util.HashSet; |
| 20 | import java.util.Iterator; |
| 21 | import java.util.Map; |
| 22 | import java.util.Set; |
| 23 | |
| 24 | import org.apache.commons.logging.Log; |
| 25 | import org.apache.commons.logging.LogFactory; |
| 26 | import org.springframework.batch.core.SkipListener; |
| 27 | import org.springframework.batch.core.StepContribution; |
| 28 | import org.springframework.batch.core.UnexpectedJobExecutionException; |
| 29 | import org.springframework.batch.core.listener.CompositeSkipListener; |
| 30 | import org.springframework.batch.core.step.skip.ItemSkipPolicy; |
| 31 | import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; |
| 32 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
| 33 | import org.springframework.batch.item.ItemKeyGenerator; |
| 34 | import org.springframework.batch.item.ItemReader; |
| 35 | import org.springframework.batch.item.ItemWriter; |
| 36 | import org.springframework.batch.item.MarkFailedException; |
| 37 | import org.springframework.transaction.support.TransactionSynchronizationManager; |
| 38 | |
| 39 | /** |
| 40 | * {@link ItemHandler} that implements skip behavior. It delegates to |
| 41 | * {@link #setItemSkipPolicy(ItemSkipPolicy)} to decide whether skip should be called or not. |
| 42 | * |
| 43 | * If exception is thrown while reading the item, skip is called on the |
| 44 | * {@link ItemReader}. If exception is thrown while writing the item, skip is |
| 45 | * called on both {@link ItemReader} and {@link ItemWriter}. |
| 46 | * |
| 47 | * @author Dave Syer |
| 48 | * @author Robert Kasanicky |
| 49 | */ |
| 50 | public class ItemSkipPolicyItemHandler extends SimpleItemHandler { |
| 51 | |
| 52 | /** |
| 53 | * Key for transaction resource that holds skipped keys until they can be |
| 54 | * removed |
| 55 | */ |
| 56 | private static final String TO_BE_REMOVED = ItemSkipPolicyItemHandler.class.getName() + ".TO_BE_REMOVED"; |
| 57 | |
| 58 | protected final Log logger = LogFactory.getLog(getClass()); |
| 59 | |
| 60 | private ItemSkipPolicy itemSkipPolicy = new NeverSkipItemSkipPolicy(); |
| 61 | |
| 62 | private int skipCacheCapacity = 1024; |
| 63 | |
| 64 | private Map skippedExceptions = new HashMap(); |
| 65 | |
| 66 | private ItemKeyGenerator defaultItemKeyGenerator = new ItemKeyGenerator() { |
| 67 | public Object getKey(Object item) { |
| 68 | return item; |
| 69 | } |
| 70 | }; |
| 71 | |
| 72 | private ItemKeyGenerator itemKeyGenerator = defaultItemKeyGenerator; |
| 73 | |
| 74 | private CompositeSkipListener listener = new CompositeSkipListener(); |
| 75 | |
| 76 | /** |
| 77 | * Register some {@link SkipListener}s with the handler. Each will get the |
| 78 | * callbacks in the order specified at the correct stage if a skip occurs. |
| 79 | * |
| 80 | * @param listeners |
| 81 | */ |
| 82 | public void setSkipListeners(SkipListener[] listeners) { |
| 83 | for (int i = 0; i < listeners.length; i++) { |
| 84 | registerSkipListener(listeners[i]); |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | /** |
| 89 | * Register a listener for callbacks at the appropriate stages in a skip |
| 90 | * process. |
| 91 | * |
| 92 | * @param listener a {@link SkipListener} |
| 93 | */ |
| 94 | public void registerSkipListener(SkipListener listener) { |
| 95 | this.listener.register(listener); |
| 96 | } |
| 97 | |
| 98 | /** |
| 99 | * Public setter for the {@link ItemKeyGenerator}. Defaults to just return |
| 100 | * the item, and since it will be used before a write operation. |
| 101 | * Implementations must ensure that items always have the same key when they |
| 102 | * are read from the {@link ItemReader} (so if the item is mutable and the |
| 103 | * reader does any buffering the key generator might need to take care to |
| 104 | * only use data that do not change on write). |
| 105 | * |
| 106 | * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. If null |
| 107 | * resets to default value. |
| 108 | */ |
| 109 | public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) { |
| 110 | if (itemKeyGenerator == null) { |
| 111 | itemKeyGenerator = defaultItemKeyGenerator; |
| 112 | } |
| 113 | this.itemKeyGenerator = itemKeyGenerator; |
| 114 | } |
| 115 | |
| 116 | /** |
| 117 | * @param itemReader |
| 118 | * @param itemWriter |
| 119 | */ |
| 120 | public ItemSkipPolicyItemHandler(ItemReader itemReader, ItemWriter itemWriter) { |
| 121 | super(itemReader, itemWriter); |
| 122 | } |
| 123 | |
| 124 | /** |
| 125 | * @param itemSkipPolicy |
| 126 | */ |
| 127 | public void setItemSkipPolicy(ItemSkipPolicy itemSkipPolicy) { |
| 128 | this.itemSkipPolicy = itemSkipPolicy; |
| 129 | } |
| 130 | |
| 131 | /** |
| 132 | * Public setter for the capacity of the skipped item cache. If a large |
| 133 | * number of items are failing and not being recognized as skipped, it |
| 134 | * usually signals a problem with the key generation (often equals and |
| 135 | * hashCode in the item itself). So it is better to enforce a strict limit |
| 136 | * than have weird looking errors, where a skip limit is reached without |
| 137 | * anything being skipped. |
| 138 | * |
| 139 | * @param skipCacheCapacity the capacity to set |
| 140 | */ |
| 141 | public void setSkipCacheCapacity(int skipCacheCapacity) { |
| 142 | this.skipCacheCapacity = skipCacheCapacity; |
| 143 | } |
| 144 | |
| 145 | /** |
| 146 | * Tries to read the item from the reader, in case of exception skip the |
| 147 | * item if the skip policy allows, otherwise re-throw. |
| 148 | * |
| 149 | * @param contribution current StepContribution holding skipped items count |
| 150 | * @return next item for processing |
| 151 | */ |
| 152 | protected Object read(StepContribution contribution) throws Exception { |
| 153 | |
| 154 | while (true) { |
| 155 | |
| 156 | try { |
| 157 | |
| 158 | Object item = doRead(); |
| 159 | Object key = itemKeyGenerator.getKey(item); |
| 160 | Throwable throwable = getSkippedException(key); |
| 161 | while (item != null && throwable != null) { |
| 162 | logger.debug("Skipping item on input, previously failed on output; key=[" + key + "]"); |
| 163 | scheduleForRemoval(key); |
| 164 | if (listener != null) { |
| 165 | listener.onSkipInWrite(item, throwable); |
| 166 | } |
| 167 | item = doRead(); |
| 168 | key = itemKeyGenerator.getKey(item); |
| 169 | throwable = getSkippedException(key); |
| 170 | } |
| 171 | return item; |
| 172 | |
| 173 | } |
| 174 | catch (Exception e) { |
| 175 | try { |
| 176 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
| 177 | // increment skip count and try again |
| 178 | contribution.incrementReadSkipCount(); |
| 179 | if (listener != null) { |
| 180 | listener.onSkipInRead(e); |
| 181 | } |
| 182 | logger.debug("Skipping failed input", e); |
| 183 | } |
| 184 | else { |
| 185 | // re-throw only when the skip policy runs out of |
| 186 | // patience |
| 187 | throw e; |
| 188 | } |
| 189 | } |
| 190 | catch (SkipLimitExceededException ex) { |
| 191 | // we are headed for a abnormal ending so bake in the skip |
| 192 | // count |
| 193 | contribution.combineSkipCounts(); |
| 194 | throw ex; |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | } |
| 199 | |
| 200 | } |
| 201 | |
| 202 | /** |
| 203 | * Tries to write the item using the writer. In case of exception consults |
| 204 | * skip policy before re-throwing the exception. The exception is always |
| 205 | * re-thrown, but if the item is seen again on read it will be skipped. |
| 206 | * |
| 207 | * @param item item to write |
| 208 | * @param contribution current StepContribution holding skipped items count |
| 209 | */ |
| 210 | protected void write(Object item, StepContribution contribution) throws Exception { |
| 211 | doWriteWithSkip(item, contribution); |
| 212 | } |
| 213 | |
| 214 | /** |
| 215 | * @param item |
| 216 | * @param contribution |
| 217 | * @throws Exception |
| 218 | */ |
| 219 | protected final void doWriteWithSkip(Object item, StepContribution contribution) throws Exception { |
| 220 | // Get the key as early as possible, otherwise it might change in |
| 221 | // doWrite() |
| 222 | Object key = itemKeyGenerator.getKey(item); |
| 223 | try { |
| 224 | doWrite(item); |
| 225 | } |
| 226 | catch (Exception e) { |
| 227 | if (itemSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { |
| 228 | contribution.incrementSkipCount(); |
| 229 | // don't call the listener here - the transaction is going to |
| 230 | // roll back |
| 231 | addSkippedException(key, e); |
| 232 | logger.debug("Added item to skip list; key=" + key); |
| 233 | } |
| 234 | // always re-throw exception on write |
| 235 | throw e; |
| 236 | } |
| 237 | } |
| 238 | |
| 239 | public void mark() throws MarkFailedException { |
| 240 | super.mark(); |
| 241 | clearSkippedExceptions(); |
| 242 | } |
| 243 | |
| 244 | /** |
| 245 | * Synchronized setter for a skipped exception. |
| 246 | */ |
| 247 | private void addSkippedException(Object key, Throwable e) { |
| 248 | synchronized (skippedExceptions) { |
| 249 | if (skippedExceptions.size() >= skipCacheCapacity) { |
| 250 | throw new UnexpectedJobExecutionException( |
| 251 | "The cache of failed items to skipped unexpectedly reached its capacity (" |
| 252 | + skipCacheCapacity |
| 253 | + "). " |
| 254 | + "This often indicates a problem with the key generation strategy, and/or a mistake in the implementation of hashCode and equals in the items being processed."); |
| 255 | } |
| 256 | skippedExceptions.put(key, e); |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | /** |
| 261 | * Schedule this key for removal from the skipped exception cache at the end |
| 262 | * of this transaction (only removed if business transaction is successful). |
| 263 | * |
| 264 | * @param key |
| 265 | */ |
| 266 | private void scheduleForRemoval(Object key) { |
| 267 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
| 268 | TransactionSynchronizationManager.bindResource(TO_BE_REMOVED, new HashSet()); |
| 269 | } |
| 270 | ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).add(key); |
| 271 | } |
| 272 | |
| 273 | /** |
| 274 | * Clear the map of skipped exception corresponding to key. |
| 275 | * @param key the key to clear |
| 276 | */ |
| 277 | private void clearSkippedExceptions() { |
| 278 | if (!TransactionSynchronizationManager.hasResource(TO_BE_REMOVED)) { |
| 279 | return; |
| 280 | } |
| 281 | synchronized (skippedExceptions) { |
| 282 | for (Iterator iterator = ((Set) TransactionSynchronizationManager.getResource(TO_BE_REMOVED)).iterator(); iterator |
| 283 | .hasNext();) { |
| 284 | Object key = (Object) iterator.next(); |
| 285 | skippedExceptions.remove(key); |
| 286 | } |
| 287 | TransactionSynchronizationManager.unbindResource(TO_BE_REMOVED); |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | /** |
| 292 | * Synchronized getter for a skipped exception. |
| 293 | * @return the skippedExceptions |
| 294 | */ |
| 295 | private Throwable getSkippedException(Object key) { |
| 296 | synchronized (skippedExceptions) { |
| 297 | return (Throwable) skippedExceptions.get(key); |
| 298 | } |
| 299 | } |
| 300 | |
| 301 | } |