| 1 | /* | 
| 2 | * Copyright 2006-2007 the original author or authors. | 
| 3 | * | 
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); | 
| 5 | * you may not use this file except in compliance with the License. | 
| 6 | * You may obtain a copy of the License at | 
| 7 | * | 
| 8 | *      http://www.apache.org/licenses/LICENSE-2.0 | 
| 9 | * | 
| 10 | * Unless required by applicable law or agreed to in writing, software | 
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, | 
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
| 13 | * See the License for the specific language governing permissions and | 
| 14 | * limitations under the License. | 
| 15 | */ | 
| 16 |  | 
| 17 | package org.springframework.batch.core.step.item; | 
| 18 |  | 
| 19 | import java.util.ArrayList; | 
| 20 | import java.util.Collections; | 
| 21 | import java.util.List; | 
| 22 | import java.util.concurrent.atomic.AtomicInteger; | 
| 23 |  | 
| 24 | import org.apache.commons.logging.Log; | 
| 25 | import org.apache.commons.logging.LogFactory; | 
| 26 | import org.springframework.batch.classify.BinaryExceptionClassifier; | 
| 27 | import org.springframework.batch.classify.Classifier; | 
| 28 | import org.springframework.batch.core.StepContribution; | 
| 29 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; | 
| 30 | import org.springframework.batch.core.step.skip.NonSkippableProcessException; | 
| 31 | import org.springframework.batch.core.step.skip.SkipListenerFailedException; | 
| 32 | import org.springframework.batch.core.step.skip.SkipPolicy; | 
| 33 | import org.springframework.batch.item.ItemProcessor; | 
| 34 | import org.springframework.batch.item.ItemWriter; | 
| 35 | import org.springframework.batch.retry.ExhaustedRetryException; | 
| 36 | import org.springframework.batch.retry.RecoveryCallback; | 
| 37 | import org.springframework.batch.retry.RetryCallback; | 
| 38 | import org.springframework.batch.retry.RetryContext; | 
| 39 | import org.springframework.batch.retry.RetryException; | 
| 40 | import org.springframework.batch.retry.support.DefaultRetryState; | 
| 41 |  | 
| 42 | /** | 
| 43 | * FaultTolerant implementation of the {@link ChunkProcessor} interface, that | 
| 44 | * allows for skipping or retry of items that cause exceptions during writing. | 
| 45 | * | 
| 46 | */ | 
| 47 | public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> { | 
| 48 |  | 
| 49 | private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy(0); | 
| 50 |  | 
| 51 | private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy(0); | 
| 52 |  | 
| 53 | private final BatchRetryTemplate batchRetryTemplate; | 
| 54 |  | 
| 55 | private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true); | 
| 56 |  | 
| 57 | private Log logger = LogFactory.getLog(getClass()); | 
| 58 |  | 
| 59 | private boolean buffering = true; | 
| 60 |  | 
| 61 | private KeyGenerator keyGenerator; | 
| 62 |  | 
| 63 | private ChunkMonitor chunkMonitor = new ChunkMonitor(); | 
| 64 |  | 
| 65 | /** | 
| 66 | * The {@link KeyGenerator} to use to identify failed items across rollback. | 
| 67 | * Not used in the case of the {@link #setBuffering(boolean) buffering flag} | 
| 68 | * being true (the default). | 
| 69 | * | 
| 70 | * @param keyGenerator the {@link KeyGenerator} to set | 
| 71 | */ | 
| 72 | public void setKeyGenerator(KeyGenerator keyGenerator) { | 
| 73 | this.keyGenerator = keyGenerator; | 
| 74 | } | 
| 75 |  | 
| 76 | /** | 
| 77 | * @param SkipPolicy the {@link SkipPolicy} for item processing | 
| 78 | */ | 
| 79 | public void setProcessSkipPolicy(SkipPolicy SkipPolicy) { | 
| 80 | this.itemProcessSkipPolicy = SkipPolicy; | 
| 81 | } | 
| 82 |  | 
| 83 | /** | 
| 84 | * @param SkipPolicy the {@link SkipPolicy} for item writing | 
| 85 | */ | 
| 86 | public void setWriteSkipPolicy(SkipPolicy SkipPolicy) { | 
| 87 | this.itemWriteSkipPolicy = SkipPolicy; | 
| 88 | } | 
| 89 |  | 
| 90 | /** | 
| 91 | * A classifier that can distinguish between exceptions that cause rollback | 
| 92 | * (return true) or not (return false). | 
| 93 | * | 
| 94 | * @param rollbackClassifier | 
| 95 | */ | 
| 96 | public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) { | 
| 97 | this.rollbackClassifier = rollbackClassifier; | 
| 98 | } | 
| 99 |  | 
| 100 | /** | 
| 101 | * @param chunkMonitor | 
| 102 | */ | 
| 103 | public void setChunkMonitor(ChunkMonitor chunkMonitor) { | 
| 104 | this.chunkMonitor = chunkMonitor; | 
| 105 | } | 
| 106 |  | 
| 107 | /** | 
| 108 | * A flag to indicate that items have been buffered and therefore will | 
| 109 | * always come back as a chunk after a rollback. Otherwise things are more | 
| 110 | * complicated because after a rollback the new chunk might or moght not | 
| 111 | * contain items from the previous failed chunk. | 
| 112 | * | 
| 113 | * @param buffering | 
| 114 | */ | 
| 115 | public void setBuffering(boolean buffering) { | 
| 116 | this.buffering = buffering; | 
| 117 | } | 
| 118 |  | 
| 119 | public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, | 
| 120 | ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) { | 
| 121 | super(itemProcessor, itemWriter); | 
| 122 | this.batchRetryTemplate = batchRetryTemplate; | 
| 123 | } | 
| 124 |  | 
| 125 | @Override | 
| 126 | protected void initializeUserData(Chunk<I> inputs) { | 
| 127 | @SuppressWarnings("unchecked") | 
| 128 | UserData<O> data = (UserData<O>) inputs.getUserData(); | 
| 129 | if (data == null) { | 
| 130 | data = new UserData<O>(inputs.size()); | 
| 131 | inputs.setUserData(data); | 
| 132 | data.setOutputs(new Chunk<O>()); | 
| 133 | } | 
| 134 | } | 
| 135 |  | 
| 136 | @Override | 
| 137 | protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { | 
| 138 | @SuppressWarnings("unchecked") | 
| 139 | UserData<O> data = (UserData<O>) inputs.getUserData(); | 
| 140 | return data.size() - outputs.size() - inputs.getSkips().size(); | 
| 141 | } | 
| 142 |  | 
| 143 | @Override | 
| 144 | protected boolean isComplete(Chunk<I> inputs) { | 
| 145 |  | 
| 146 | /* | 
| 147 | * Need to remember the write skips across transactions, otherwise they | 
| 148 | * keep coming back. Since we register skips with the inputs they will | 
| 149 | * not be processed again but the output skips need to be saved for | 
| 150 | * registration later with the listeners. The inputs are going to be the | 
| 151 | * same for all transactions processing the same chunk, but the outputs | 
| 152 | * are not, so we stash them in user data on the inputs. | 
| 153 | */ | 
| 154 |  | 
| 155 | @SuppressWarnings("unchecked") | 
| 156 | UserData<O> data = (UserData<O>) inputs.getUserData(); | 
| 157 | Chunk<O> previous = data.getOutputs(); | 
| 158 |  | 
| 159 | return inputs.isEmpty() && previous.getSkips().isEmpty(); | 
| 160 |  | 
| 161 | } | 
| 162 |  | 
| 163 | @Override | 
| 164 | protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { | 
| 165 |  | 
| 166 | @SuppressWarnings("unchecked") | 
| 167 | UserData<O> data = (UserData<O>) inputs.getUserData(); | 
| 168 | Chunk<O> previous = data.getOutputs(); | 
| 169 |  | 
| 170 | Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips()); | 
| 171 | next.setBusy(previous.isBusy()); | 
| 172 |  | 
| 173 | // Remember for next time if there are skips accumulating | 
| 174 | data.setOutputs(next); | 
| 175 |  | 
| 176 | return next; | 
| 177 |  | 
| 178 | } | 
| 179 |  | 
| 180 | @Override | 
| 181 | protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception { | 
| 182 |  | 
| 183 | Chunk<O> outputs = new Chunk<O>(); | 
| 184 | @SuppressWarnings("unchecked") | 
| 185 | UserData<O> data = (UserData<O>) inputs.getUserData(); | 
| 186 | Chunk<O> cache = data.getOutputs(); | 
| 187 | final Chunk<O>.ChunkIterator cacheIterator = cache.isEmpty() ? null : cache.iterator(); | 
| 188 | final AtomicInteger count = new AtomicInteger(0); | 
| 189 |  | 
| 190 | for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { | 
| 191 |  | 
| 192 | final I item = iterator.next(); | 
| 193 |  | 
| 194 | RetryCallback<O> retryCallback = new RetryCallback<O>() { | 
| 195 |  | 
| 196 | public O doWithRetry(RetryContext context) throws Exception { | 
| 197 | O output = null; | 
| 198 | try { | 
| 199 | count.incrementAndGet(); | 
| 200 | O cached = (cacheIterator != null) ? cacheIterator.next() : null; | 
| 201 | if (cached != null && count.get() > 1) { | 
| 202 | /* | 
| 203 | * If there is a cached chunk then we must be | 
| 204 | * scanning for errors in the writer, in which case | 
| 205 | * only the first one will be written, and for the | 
| 206 | * rest we need to fill in the output from the | 
| 207 | * cache. | 
| 208 | */ | 
| 209 | output = cached; | 
| 210 | } | 
| 211 | else { | 
| 212 | output = doProcess(item); | 
| 213 | } | 
| 214 | } | 
| 215 | catch (Exception e) { | 
| 216 | if (rollbackClassifier.classify(e)) { | 
| 217 | // Default is to rollback unless the classifier | 
| 218 | // allows us to continue | 
| 219 | throw e; | 
| 220 | } | 
| 221 | else if (itemProcessSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { | 
| 222 | // If we are not re-throwing then we should check if | 
| 223 | // this is skippable | 
| 224 | contribution.incrementProcessSkipCount(); | 
| 225 | logger.debug("Skipping after failed process with no rollback", e); | 
| 226 | } | 
| 227 | else { | 
| 228 | // If it's not skippable that's an error in | 
| 229 | // configuration - it doesn't make sense to not roll | 
| 230 | // back if we are also not allowed to skip | 
| 231 | throw new NonSkippableProcessException( | 
| 232 | "Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.", | 
| 233 | e); | 
| 234 | } | 
| 235 | } | 
| 236 | if (output == null) { | 
| 237 | // No need to re-process filtered items | 
| 238 | iterator.remove(); | 
| 239 | } | 
| 240 | return output; | 
| 241 | } | 
| 242 |  | 
| 243 | }; | 
| 244 |  | 
| 245 | RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() { | 
| 246 |  | 
| 247 | public O recover(RetryContext context) throws Exception { | 
| 248 | Exception e = (Exception) context.getLastThrowable(); | 
| 249 | if (itemProcessSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { | 
| 250 | contribution.incrementProcessSkipCount(); | 
| 251 | iterator.remove(e); | 
| 252 | logger.debug("Skipping after failed process", e); | 
| 253 | return null; | 
| 254 | } | 
| 255 | else { | 
| 256 | throw new RetryException("Non-skippable exception in recoverer while processing", e); | 
| 257 | } | 
| 258 | } | 
| 259 |  | 
| 260 | }; | 
| 261 |  | 
| 262 | O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState( | 
| 263 | getInputKey(item), rollbackClassifier)); | 
| 264 | if (output != null) { | 
| 265 | outputs.add(output); | 
| 266 | } | 
| 267 |  | 
| 268 | } | 
| 269 |  | 
| 270 | return outputs; | 
| 271 |  | 
| 272 | } | 
| 273 |  | 
| 274 | @Override | 
| 275 | protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs) | 
| 276 | throws Exception { | 
| 277 |  | 
| 278 | RetryCallback<Object> retryCallback = new RetryCallback<Object>() { | 
| 279 | public Object doWithRetry(RetryContext context) throws Exception { | 
| 280 |  | 
| 281 | if (!inputs.isBusy()) { | 
| 282 | chunkMonitor.setChunkSize(inputs.size()); | 
| 283 | try { | 
| 284 | doWrite(outputs.getItems()); | 
| 285 | } | 
| 286 | catch (Exception e) { | 
| 287 | if (rollbackClassifier.classify(e)) { | 
| 288 | throw e; | 
| 289 | } | 
| 290 | /* | 
| 291 | * If the exception is marked as no-rollback, we need to | 
| 292 | * override that, otherwise there's no way to write the | 
| 293 | * rest of the chunk or to honour the skip listener | 
| 294 | * contract. | 
| 295 | */ | 
| 296 | throw new ForceRollbackForWriteSkipException( | 
| 297 | "Force rollback on skippable exception so that skipped item can be located.", e); | 
| 298 | } | 
| 299 | contribution.incrementWriteCount(outputs.size()); | 
| 300 | } | 
| 301 | else { | 
| 302 | scan(contribution, inputs, outputs, chunkMonitor); | 
| 303 | } | 
| 304 | return null; | 
| 305 |  | 
| 306 | } | 
| 307 | }; | 
| 308 |  | 
| 309 | if (!buffering) { | 
| 310 |  | 
| 311 | RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() { | 
| 312 |  | 
| 313 | public Object recover(RetryContext context) throws Exception { | 
| 314 |  | 
| 315 | Exception e = (Exception) context.getLastThrowable(); | 
| 316 | if (outputs.size() > 1 && !rollbackClassifier.classify(e)) { | 
| 317 | throw new RetryException("Invalid retry state during write caused by " | 
| 318 | + "exception that does not classify for rollback: ", e); | 
| 319 | } | 
| 320 |  | 
| 321 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); | 
| 322 | for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) { | 
| 323 |  | 
| 324 | inputIterator.next(); | 
| 325 | outputIterator.next(); | 
| 326 |  | 
| 327 | checkSkipPolicy(inputIterator, outputIterator, e, contribution); | 
| 328 | if (!rollbackClassifier.classify(e)) { | 
| 329 | throw new RetryException( | 
| 330 | "Invalid retry state during recovery caused by exception that does not classify for rollback: ", | 
| 331 | e); | 
| 332 | } | 
| 333 |  | 
| 334 | } | 
| 335 |  | 
| 336 | return null; | 
| 337 |  | 
| 338 | } | 
| 339 |  | 
| 340 | }; | 
| 341 |  | 
| 342 | batchRetryTemplate.execute(retryCallback, batchRecoveryCallback, BatchRetryTemplate.createState( | 
| 343 | getInputKeys(inputs), rollbackClassifier)); | 
| 344 |  | 
| 345 | } | 
| 346 | else { | 
| 347 |  | 
| 348 | RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { | 
| 349 |  | 
| 350 | public Object recover(RetryContext context) throws Exception { | 
| 351 |  | 
| 352 | /* | 
| 353 | * If the last exception was not skippable we don't need to | 
| 354 | * do any scanning. We can just bomb out with a retry | 
| 355 | * exhausted. | 
| 356 | */ | 
| 357 | if (!itemWriteSkipPolicy.shouldSkip(context.getLastThrowable(), -1)) { | 
| 358 | throw new ExhaustedRetryException( | 
| 359 | "Retry exhausted after last attempt in recovery path, but exception is not skippable.", | 
| 360 | context.getLastThrowable()); | 
| 361 | } | 
| 362 |  | 
| 363 | inputs.setBusy(true); | 
| 364 | scan(contribution, inputs, outputs, chunkMonitor); | 
| 365 | return null; | 
| 366 | } | 
| 367 |  | 
| 368 | }; | 
| 369 |  | 
| 370 | logger.debug("Attempting to write: " + inputs); | 
| 371 | batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs, | 
| 372 | rollbackClassifier)); | 
| 373 |  | 
| 374 | } | 
| 375 |  | 
| 376 | callSkipListeners(inputs, outputs); | 
| 377 |  | 
| 378 | } | 
| 379 |  | 
| 380 | private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) { | 
| 381 |  | 
| 382 | for (SkipWrapper<I> wrapper : inputs.getSkips()) { | 
| 383 | I item = wrapper.getItem(); | 
| 384 | if (item == null) { | 
| 385 | continue; | 
| 386 | } | 
| 387 | Exception e = wrapper.getException(); | 
| 388 | try { | 
| 389 | getListener().onSkipInProcess(item, e); | 
| 390 | } | 
| 391 | catch (RuntimeException ex) { | 
| 392 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); | 
| 393 | } | 
| 394 | } | 
| 395 |  | 
| 396 | for (SkipWrapper<O> wrapper : outputs.getSkips()) { | 
| 397 | Exception e = wrapper.getException(); | 
| 398 | try { | 
| 399 | getListener().onSkipInWrite(wrapper.getItem(), e); | 
| 400 | } | 
| 401 | catch (RuntimeException ex) { | 
| 402 | throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); | 
| 403 | } | 
| 404 | } | 
| 405 |  | 
| 406 | // Clear skips if we are possibly going to process this chunk again | 
| 407 | outputs.clearSkips(); | 
| 408 | inputs.clearSkips(); | 
| 409 |  | 
| 410 | } | 
| 411 |  | 
| 412 | private Object getInputKey(I item) { | 
| 413 | if (keyGenerator == null) { | 
| 414 | return item; | 
| 415 | } | 
| 416 | return keyGenerator.getKey(item); | 
| 417 | } | 
| 418 |  | 
| 419 | private List<?> getInputKeys(final Chunk<I> inputs) { | 
| 420 | if (keyGenerator == null) { | 
| 421 | return inputs.getItems(); | 
| 422 | } | 
| 423 | List<Object> keys = new ArrayList<Object>(); | 
| 424 | for (I item : inputs.getItems()) { | 
| 425 | keys.add(keyGenerator.getKey(item)); | 
| 426 | } | 
| 427 | return keys; | 
| 428 | } | 
| 429 |  | 
| 430 | private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator, | 
| 431 | Exception e, StepContribution contribution) { | 
| 432 | logger.debug("Checking skip policy after failed write"); | 
| 433 | if (itemWriteSkipPolicy.shouldSkip(e, contribution.getStepSkipCount())) { | 
| 434 | contribution.incrementWriteSkipCount(); | 
| 435 | inputIterator.remove(); | 
| 436 | outputIterator.remove(e); | 
| 437 | logger.debug("Skipping after failed write", e); | 
| 438 | } | 
| 439 | else { | 
| 440 | throw new RetryException("Non-skippable exception in recoverer", e); | 
| 441 | } | 
| 442 | } | 
| 443 |  | 
| 444 | private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs, | 
| 445 | ChunkMonitor chunkMonitor) throws Exception { | 
| 446 |  | 
| 447 | logger.debug("Scanning for failed item on write: " + inputs); | 
| 448 | if (outputs.isEmpty()) { | 
| 449 | inputs.setBusy(false); | 
| 450 | return; | 
| 451 | } | 
| 452 |  | 
| 453 | Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); | 
| 454 | Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); | 
| 455 |  | 
| 456 | List<O> items = Collections.singletonList(outputIterator.next()); | 
| 457 | inputIterator.next(); | 
| 458 | try { | 
| 459 | writeItems(items); | 
| 460 | // If successful we are going to return and allow | 
| 461 | // the driver to commit... | 
| 462 | doAfterWrite(items); | 
| 463 | contribution.incrementWriteCount(1); | 
| 464 | inputIterator.remove(); | 
| 465 | outputIterator.remove(); | 
| 466 | } | 
| 467 | catch (Exception e) { | 
| 468 | if (!itemWriteSkipPolicy.shouldSkip(e, -1) && !rollbackClassifier.classify(e)) { | 
| 469 | inputIterator.remove(); | 
| 470 | outputIterator.remove(); | 
| 471 | } | 
| 472 | else { | 
| 473 | checkSkipPolicy(inputIterator, outputIterator, e, contribution); | 
| 474 | } | 
| 475 | if (rollbackClassifier.classify(e)) { | 
| 476 | throw e; | 
| 477 | } | 
| 478 | } | 
| 479 | chunkMonitor.incrementOffset(); | 
| 480 | if (outputs.isEmpty()) { | 
| 481 | inputs.setBusy(false); | 
| 482 | chunkMonitor.resetOffset(); | 
| 483 | } | 
| 484 | } | 
| 485 |  | 
| 486 | private static class UserData<O> { | 
| 487 |  | 
| 488 | private final int size; | 
| 489 |  | 
| 490 | private Chunk<O> outputs; | 
| 491 |  | 
| 492 | public UserData(int size) { | 
| 493 | this.size = size; | 
| 494 | } | 
| 495 |  | 
| 496 | public int size() { | 
| 497 | return size; | 
| 498 | } | 
| 499 |  | 
| 500 | public Chunk<O> getOutputs() { | 
| 501 | return outputs; | 
| 502 | } | 
| 503 |  | 
| 504 | public void setOutputs(Chunk<O> outputs) { | 
| 505 | this.outputs = outputs; | 
| 506 | } | 
| 507 |  | 
| 508 | } | 
| 509 |  | 
| 510 | } |