1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.listener; |
17 | |
18 | import java.util.List; |
19 | |
20 | import org.springframework.batch.core.ChunkListener; |
21 | import org.springframework.batch.core.ExitStatus; |
22 | import org.springframework.batch.core.ItemProcessListener; |
23 | import org.springframework.batch.core.ItemReadListener; |
24 | import org.springframework.batch.core.ItemWriteListener; |
25 | import org.springframework.batch.core.SkipListener; |
26 | import org.springframework.batch.core.StepExecution; |
27 | import org.springframework.batch.core.StepExecutionListener; |
28 | import org.springframework.batch.core.StepListener; |
29 | import org.springframework.batch.core.scope.context.ChunkContext; |
30 | import org.springframework.batch.item.ItemStream; |
31 | |
32 | /** |
33 | * @author Dave Syer |
34 | * @author Michael Minella |
35 | */ |
36 | public class MulticasterBatchListener<T, S> implements StepExecutionListener, ChunkListener, ItemReadListener<T>, |
37 | ItemProcessListener<T, S>, ItemWriteListener<S>, SkipListener<T, S> { |
38 | |
39 | private CompositeStepExecutionListener stepListener = new CompositeStepExecutionListener(); |
40 | |
41 | private CompositeChunkListener chunkListener = new CompositeChunkListener(); |
42 | |
43 | private CompositeItemReadListener<T> itemReadListener = new CompositeItemReadListener<T>(); |
44 | |
45 | private CompositeItemProcessListener<T, S> itemProcessListener = new CompositeItemProcessListener<T, S>(); |
46 | |
47 | private CompositeItemWriteListener<S> itemWriteListener = new CompositeItemWriteListener<S>(); |
48 | |
49 | private CompositeSkipListener<T, S> skipListener = new CompositeSkipListener<T, S>(); |
50 | |
51 | /** |
52 | * Initialise the listener instance. |
53 | */ |
54 | public MulticasterBatchListener() { |
55 | super(); |
56 | } |
57 | |
58 | /** |
59 | * Register each of the objects as listeners. Once registered, calls to the |
60 | * {@link MulticasterBatchListener} broadcast to the individual listeners. |
61 | * |
62 | * @param listeners listener objects of types known to the multicaster. |
63 | */ |
64 | public void setListeners(List<? extends StepListener> listeners) { |
65 | for (StepListener stepListener : listeners) { |
66 | register(stepListener); |
67 | } |
68 | } |
69 | |
70 | /** |
71 | * Register the listener for callbacks on the appropriate interfaces |
72 | * implemented. Any {@link StepListener} can be provided, or an |
73 | * {@link ItemStream}. Other types will be ignored. |
74 | */ |
75 | public void register(StepListener listener) { |
76 | if (listener instanceof StepExecutionListener) { |
77 | this.stepListener.register((StepExecutionListener) listener); |
78 | } |
79 | if (listener instanceof ChunkListener) { |
80 | this.chunkListener.register((ChunkListener) listener); |
81 | } |
82 | if (listener instanceof ItemReadListener<?>) { |
83 | @SuppressWarnings("unchecked") |
84 | ItemReadListener<T> itemReadListener = (ItemReadListener<T>) listener; |
85 | this.itemReadListener.register(itemReadListener); |
86 | } |
87 | if (listener instanceof ItemProcessListener<?, ?>) { |
88 | @SuppressWarnings("unchecked") |
89 | ItemProcessListener<T, S> itemProcessListener = (ItemProcessListener<T, S>) listener; |
90 | this.itemProcessListener.register(itemProcessListener); |
91 | } |
92 | if (listener instanceof ItemWriteListener<?>) { |
93 | @SuppressWarnings("unchecked") |
94 | ItemWriteListener<S> itemWriteListener = (ItemWriteListener<S>) listener; |
95 | this.itemWriteListener.register(itemWriteListener); |
96 | } |
97 | if (listener instanceof SkipListener<?, ?>) { |
98 | @SuppressWarnings("unchecked") |
99 | SkipListener<T, S> skipListener = (SkipListener<T, S>) listener; |
100 | this.skipListener.register(skipListener); |
101 | } |
102 | } |
103 | |
104 | /** |
105 | * @param item |
106 | * @param result |
107 | * @see org.springframework.batch.core.listener.CompositeItemProcessListener#afterProcess(java.lang.Object, |
108 | * java.lang.Object) |
109 | */ |
110 | @Override |
111 | public void afterProcess(T item, S result) { |
112 | try { |
113 | itemProcessListener.afterProcess(item, result); |
114 | } |
115 | catch (RuntimeException e) { |
116 | throw new StepListenerFailedException("Error in afterProcess.", e); |
117 | } |
118 | } |
119 | |
120 | /** |
121 | * @param item |
122 | * @see org.springframework.batch.core.listener.CompositeItemProcessListener#beforeProcess(java.lang.Object) |
123 | */ |
124 | @Override |
125 | public void beforeProcess(T item) { |
126 | try { |
127 | itemProcessListener.beforeProcess(item); |
128 | } |
129 | catch (RuntimeException e) { |
130 | throw new StepListenerFailedException("Error in beforeProcess.", e); |
131 | } |
132 | } |
133 | |
134 | /** |
135 | * @param item |
136 | * @param ex |
137 | * @see org.springframework.batch.core.listener.CompositeItemProcessListener#onProcessError(java.lang.Object, |
138 | * java.lang.Exception) |
139 | */ |
140 | @Override |
141 | public void onProcessError(T item, Exception ex) { |
142 | try { |
143 | itemProcessListener.onProcessError(item, ex); |
144 | } |
145 | catch (RuntimeException e) { |
146 | throw new StepListenerFailedException("Error in onProcessError.", e); |
147 | } |
148 | } |
149 | |
150 | /** |
151 | * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#afterStep(StepExecution) |
152 | */ |
153 | @Override |
154 | public ExitStatus afterStep(StepExecution stepExecution) { |
155 | try { |
156 | return stepListener.afterStep(stepExecution); |
157 | } |
158 | catch (RuntimeException e) { |
159 | throw new StepListenerFailedException("Error in afterStep.", e); |
160 | } |
161 | } |
162 | |
163 | /** |
164 | * @param stepExecution |
165 | * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#beforeStep(org.springframework.batch.core.StepExecution) |
166 | */ |
167 | @Override |
168 | public void beforeStep(StepExecution stepExecution) { |
169 | try { |
170 | stepListener.beforeStep(stepExecution); |
171 | } |
172 | catch (RuntimeException e) { |
173 | throw new StepListenerFailedException("Error in beforeStep.", e); |
174 | } |
175 | } |
176 | |
177 | /** |
178 | * |
179 | * @see org.springframework.batch.core.listener.CompositeChunkListener#afterChunk(ChunkContext context) |
180 | */ |
181 | @Override |
182 | public void afterChunk(ChunkContext context) { |
183 | try { |
184 | chunkListener.afterChunk(context); |
185 | } |
186 | catch (RuntimeException e) { |
187 | throw new StepListenerFailedException("Error in afterChunk.", e); |
188 | } |
189 | } |
190 | |
191 | /** |
192 | * |
193 | * @see org.springframework.batch.core.listener.CompositeChunkListener#beforeChunk(ChunkContext context) |
194 | */ |
195 | @Override |
196 | public void beforeChunk(ChunkContext context) { |
197 | try { |
198 | chunkListener.beforeChunk(context); |
199 | } |
200 | catch (RuntimeException e) { |
201 | throw new StepListenerFailedException("Error in beforeChunk.", e); |
202 | } |
203 | } |
204 | |
205 | /** |
206 | * @param item |
207 | * @see org.springframework.batch.core.listener.CompositeItemReadListener#afterRead(java.lang.Object) |
208 | */ |
209 | @Override |
210 | public void afterRead(T item) { |
211 | try { |
212 | itemReadListener.afterRead(item); |
213 | } |
214 | catch (RuntimeException e) { |
215 | throw new StepListenerFailedException("Error in afterRead.", e); |
216 | } |
217 | } |
218 | |
219 | /** |
220 | * |
221 | * @see org.springframework.batch.core.listener.CompositeItemReadListener#beforeRead() |
222 | */ |
223 | @Override |
224 | public void beforeRead() { |
225 | try { |
226 | itemReadListener.beforeRead(); |
227 | } |
228 | catch (RuntimeException e) { |
229 | throw new StepListenerFailedException("Error in beforeRead.", e); |
230 | } |
231 | } |
232 | |
233 | /** |
234 | * @param ex |
235 | * @see org.springframework.batch.core.listener.CompositeItemReadListener#onReadError(java.lang.Exception) |
236 | */ |
237 | @Override |
238 | public void onReadError(Exception ex) { |
239 | try { |
240 | itemReadListener.onReadError(ex); |
241 | } |
242 | catch (RuntimeException e) { |
243 | throw new StepListenerFailedException("Error in onReadError.", e); |
244 | } |
245 | } |
246 | |
247 | /** |
248 | * |
249 | * @see ItemWriteListener#afterWrite(List) |
250 | */ |
251 | @Override |
252 | public void afterWrite(List<? extends S> items) { |
253 | try { |
254 | itemWriteListener.afterWrite(items); |
255 | } |
256 | catch (RuntimeException e) { |
257 | throw new StepListenerFailedException("Error in afterWrite.", e); |
258 | } |
259 | } |
260 | |
261 | /** |
262 | * @param items |
263 | * @see ItemWriteListener#beforeWrite(List) |
264 | */ |
265 | @Override |
266 | public void beforeWrite(List<? extends S> items) { |
267 | try { |
268 | itemWriteListener.beforeWrite(items); |
269 | } |
270 | catch (RuntimeException e) { |
271 | throw new StepListenerFailedException("Error in beforeWrite.", e); |
272 | } |
273 | } |
274 | |
275 | /** |
276 | * @param ex |
277 | * @param items |
278 | * @see ItemWriteListener#onWriteError(Exception, List) |
279 | */ |
280 | @Override |
281 | public void onWriteError(Exception ex, List<? extends S> items) { |
282 | try { |
283 | itemWriteListener.onWriteError(ex, items); |
284 | } |
285 | catch (RuntimeException e) { |
286 | throw new StepListenerFailedException("Error in onWriteError.", e); |
287 | } |
288 | } |
289 | |
290 | /** |
291 | * @param t |
292 | * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInRead(java.lang.Throwable) |
293 | */ |
294 | @Override |
295 | public void onSkipInRead(Throwable t) { |
296 | skipListener.onSkipInRead(t); |
297 | } |
298 | |
299 | /** |
300 | * @param item |
301 | * @param t |
302 | * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInWrite(java.lang.Object, |
303 | * java.lang.Throwable) |
304 | */ |
305 | @Override |
306 | public void onSkipInWrite(S item, Throwable t) { |
307 | skipListener.onSkipInWrite(item, t); |
308 | } |
309 | |
310 | /** |
311 | * @param item |
312 | * @param t |
313 | * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInProcess(Object, |
314 | * Throwable) |
315 | */ |
316 | @Override |
317 | public void onSkipInProcess(T item, Throwable t) { |
318 | skipListener.onSkipInProcess(item, t); |
319 | } |
320 | |
321 | @Override |
322 | public void afterChunkError(ChunkContext context) { |
323 | try { |
324 | chunkListener.afterChunkError(context); |
325 | } |
326 | catch (RuntimeException e) { |
327 | throw new StepListenerFailedException("Error in afterFailedChunk.", e); |
328 | } |
329 | } |
330 | } |