1 | package org.springframework.batch.core.step.item; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Arrays; |
5 | import java.util.List; |
6 | |
7 | import org.springframework.batch.core.step.skip.ItemSkipPolicy; |
8 | import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; |
9 | import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy; |
10 | import org.springframework.batch.core.step.skip.SkipLimitExceededException; |
11 | import org.springframework.batch.item.ItemKeyGenerator; |
12 | import org.springframework.batch.repeat.exception.SimpleLimitExceptionHandler; |
13 | |
14 | /** |
15 | * Factory bean for step that provides options for configuring skip behavior. |
16 | * User can set {@link #setSkipLimit(int)} to set how many exceptions of |
17 | * {@link #setSkippableExceptionClasses(Class[])} types are tolerated. |
18 | * {@link #setFatalExceptionClasses(Class[])} will cause immediate termination of job - they |
19 | * are treated as higher priority than {@link #setSkippableExceptionClasses(Class[])}, so |
20 | * the two lists don't need to be exclusive. |
21 | * |
22 | * @see SimpleStepFactoryBean |
23 | * @see StatefulRetryStepFactoryBean |
24 | * |
25 | * @author Dave Syer |
26 | * @author Robert Kasanicky |
27 | * |
28 | */ |
29 | public class SkipLimitStepFactoryBean extends SimpleStepFactoryBean { |
30 | |
31 | private int skipLimit = 0; |
32 | |
33 | private Class[] skippableExceptionClasses = new Class[] { Exception.class }; |
34 | |
35 | private Class[] fatalExceptionClasses = new Class[] { Error.class }; |
36 | |
37 | private ItemKeyGenerator itemKeyGenerator; |
38 | |
39 | private int skipCacheCapacity = 1024; |
40 | |
41 | private ItemSkipPolicy itemSkipPolicy; |
42 | |
43 | /** |
44 | * Public setter for a limit that determines skip policy. If this value is |
45 | * positive then an exception in chunk processing will cause the item to be |
46 | * skipped and no exception propagated until the limit is reached. If it is |
47 | * zero then all exceptions will be propagated from the chunk and cause the |
48 | * step to abort. |
49 | * |
50 | * @param skipLimit the value to set. Default is 0 (never skip). |
51 | */ |
52 | public void setSkipLimit(int skipLimit) { |
53 | this.skipLimit = skipLimit; |
54 | } |
55 | |
56 | /** |
57 | * Public setter for exception classes that when raised won't crash the job |
58 | * but will result in transaction rollback and the item which handling |
59 | * caused the exception will be skipped. |
60 | * |
61 | * @param exceptionClasses defaults to <code>Exception</code> |
62 | */ |
63 | public void setSkippableExceptionClasses(Class[] exceptionClasses) { |
64 | this.skippableExceptionClasses = exceptionClasses; |
65 | } |
66 | |
67 | /** |
68 | * Public setter for exception classes that should cause immediate failure. |
69 | * |
70 | * @param fatalExceptionClasses {@link Error} by default |
71 | */ |
72 | public void setFatalExceptionClasses(Class[] fatalExceptionClasses) { |
73 | this.fatalExceptionClasses = fatalExceptionClasses; |
74 | } |
75 | |
76 | /** |
77 | * Protected getter for the fatal exceptions. |
78 | * @return the fatalExceptionClasses |
79 | */ |
80 | protected Class[] getFatalExceptionClasses() { |
81 | return fatalExceptionClasses; |
82 | } |
83 | |
84 | /** |
85 | * Public setter for the {@link ItemKeyGenerator}. This is used to identify |
86 | * failed items so they can be skipped if encountered again, generally in |
87 | * another transaction. |
88 | * |
89 | * @param itemKeyGenerator the {@link ItemKeyGenerator} to set. |
90 | */ |
91 | public void setItemKeyGenerator(ItemKeyGenerator itemKeyGenerator) { |
92 | this.itemKeyGenerator = itemKeyGenerator; |
93 | } |
94 | |
95 | /** |
96 | * Protected getter for the {@link ItemKeyGenerator}. |
97 | * @return the {@link ItemKeyGenerator} |
98 | */ |
99 | protected ItemKeyGenerator getItemKeyGenerator() { |
100 | return itemKeyGenerator; |
101 | } |
102 | |
103 | /** |
104 | * Protected getter for the {@link ItemSkipPolicy}. |
105 | * @return the itemSkipPolicy |
106 | */ |
107 | protected ItemSkipPolicy getItemSkipPolicy() { |
108 | return itemSkipPolicy; |
109 | } |
110 | |
111 | /** |
112 | * Public setter for the capacity of the skipped item cache. If a large |
113 | * number of items are failing and not being recognized as skipped, it |
114 | * usually signals a problem with the key generation (often equals and |
115 | * hashCode in the item itself). So it is better to enforce a strict limit |
116 | * than have weird looking errors, where a skip limit is reached without |
117 | * anything being skipped.<br/> |
118 | * |
119 | * The default value is 1024 which should be high enough and more for most |
120 | * purposes. To breach the limit in a single-threaded step typically you |
121 | * have to have this many failures in a single transaction. |
122 | * |
123 | * @param skipCacheCapacity the capacity to set |
124 | */ |
125 | public void setSkipCacheCapacity(int skipCacheCapacity) { |
126 | this.skipCacheCapacity = skipCacheCapacity; |
127 | } |
128 | |
129 | /** |
130 | * Uses the {@link #setSkipLimit(int)} value to configure item handler and and |
131 | * exception handler. |
132 | */ |
133 | protected void applyConfiguration(ItemOrientedStep step) { |
134 | super.applyConfiguration(step); |
135 | |
136 | ItemSkipPolicyItemHandler itemHandler = new ItemSkipPolicyItemHandler(getItemReader(), getItemWriter()); |
137 | |
138 | if (skipLimit > 0) { |
139 | |
140 | /* |
141 | * If there is a skip limit (not the default) then we are prepared |
142 | * to absorb exceptions at the step level because the failed items |
143 | * will never re-appear after a rollback. |
144 | */ |
145 | addFatalExceptionIfMissing(SkipLimitExceededException.class); |
146 | List fatalExceptionList = Arrays.asList(fatalExceptionClasses); |
147 | |
148 | LimitCheckingItemSkipPolicy limitCheckingSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, Arrays |
149 | .asList(skippableExceptionClasses), fatalExceptionList); |
150 | itemHandler.setItemSkipPolicy(limitCheckingSkipPolicy); |
151 | this.itemSkipPolicy = limitCheckingSkipPolicy; |
152 | SimpleLimitExceptionHandler exceptionHandler = new SimpleLimitExceptionHandler(skipLimit); |
153 | exceptionHandler.setExceptionClasses(skippableExceptionClasses); |
154 | exceptionHandler.setFatalExceptionClasses(fatalExceptionClasses); |
155 | |
156 | getStepOperations().setExceptionHandler(exceptionHandler); |
157 | |
158 | // for subclass to pick up limit and exception classes |
159 | setExceptionHandler(exceptionHandler); |
160 | |
161 | itemHandler.setItemKeyGenerator(itemKeyGenerator); |
162 | itemHandler.setSkipCacheCapacity(skipCacheCapacity); |
163 | |
164 | BatchListenerFactoryHelper helper = new BatchListenerFactoryHelper(); |
165 | itemHandler.setSkipListeners(helper.getSkipListeners(getListeners())); |
166 | |
167 | } |
168 | else { |
169 | // This is the default in ItemOrientedStep anyway... |
170 | itemHandler.setItemSkipPolicy(new NeverSkipItemSkipPolicy()); |
171 | } |
172 | |
173 | step.setItemHandler(itemHandler); |
174 | } |
175 | |
176 | public void addFatalExceptionIfMissing(Class cls) { |
177 | List fatalExceptionList = new ArrayList(Arrays.asList(fatalExceptionClasses)); |
178 | if (!fatalExceptionList.contains(cls)) { |
179 | fatalExceptionList.add(cls); |
180 | } |
181 | fatalExceptionClasses = (Class[]) fatalExceptionList.toArray(new Class[0]); |
182 | } |
183 | |
184 | } |