1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.listener; |
17 | |
18 | import java.util.List; |
19 | |
20 | import org.springframework.batch.core.ChunkListener; |
21 | import org.springframework.batch.core.ExitStatus; |
22 | import org.springframework.batch.core.ItemProcessListener; |
23 | import org.springframework.batch.core.ItemReadListener; |
24 | import org.springframework.batch.core.ItemWriteListener; |
25 | import org.springframework.batch.core.SkipListener; |
26 | import org.springframework.batch.core.StepExecution; |
27 | import org.springframework.batch.core.StepExecutionListener; |
28 | import org.springframework.batch.core.StepListener; |
29 | import org.springframework.batch.item.ItemStream; |
30 | |
31 | /** |
32 | * @author Dave Syer |
33 | * |
34 | */ |
35 | public class MulticasterBatchListener<T, S> implements StepExecutionListener, ChunkListener, ItemReadListener<T>, |
36 | ItemProcessListener<T, S>, ItemWriteListener<S>, SkipListener<T, S> { |
37 | |
38 | private CompositeStepExecutionListener stepListener = new CompositeStepExecutionListener(); |
39 | |
40 | private CompositeChunkListener chunkListener = new CompositeChunkListener(); |
41 | |
42 | private CompositeItemReadListener<T> itemReadListener = new CompositeItemReadListener<T>(); |
43 | |
44 | private CompositeItemProcessListener<T, S> itemProcessListener = new CompositeItemProcessListener<T, S>(); |
45 | |
46 | private CompositeItemWriteListener<S> itemWriteListener = new CompositeItemWriteListener<S>(); |
47 | |
48 | private CompositeSkipListener<T, S> skipListener = new CompositeSkipListener<T, S>(); |
49 | |
50 | /** |
51 | * Initialise the listener instance. |
52 | */ |
53 | public MulticasterBatchListener() { |
54 | super(); |
55 | } |
56 | |
57 | /** |
58 | * Register each of the objects as listeners. Once registered, calls to the |
59 | * {@link MulticasterBatchListener} broadcast to the individual listeners. |
60 | * |
61 | * @param listeners listener objects of types known to the multicaster. |
62 | */ |
63 | public void setListeners(List<? extends StepListener> listeners) { |
64 | for (StepListener stepListener : listeners) { |
65 | register(stepListener); |
66 | } |
67 | } |
68 | |
69 | /** |
70 | * Register the listener for callbacks on the appropriate interfaces |
71 | * implemented. Any {@link StepListener} can be provided, or an |
72 | * {@link ItemStream}. Other types will be ignored. |
73 | */ |
74 | public void register(StepListener listener) { |
75 | if (listener instanceof StepExecutionListener) { |
76 | this.stepListener.register((StepExecutionListener) listener); |
77 | } |
78 | if (listener instanceof ChunkListener) { |
79 | this.chunkListener.register((ChunkListener) listener); |
80 | } |
81 | if (listener instanceof ItemReadListener<?>) { |
82 | @SuppressWarnings("unchecked") |
83 | ItemReadListener<T> itemReadListener = (ItemReadListener<T>) listener; |
84 | this.itemReadListener.register(itemReadListener); |
85 | } |
86 | if (listener instanceof ItemProcessListener<?, ?>) { |
87 | @SuppressWarnings("unchecked") |
88 | ItemProcessListener<T, S> itemProcessListener = (ItemProcessListener<T, S>) listener; |
89 | this.itemProcessListener.register(itemProcessListener); |
90 | } |
91 | if (listener instanceof ItemWriteListener<?>) { |
92 | @SuppressWarnings("unchecked") |
93 | ItemWriteListener<S> itemWriteListener = (ItemWriteListener<S>) listener; |
94 | this.itemWriteListener.register(itemWriteListener); |
95 | } |
96 | if (listener instanceof SkipListener<?, ?>) { |
97 | @SuppressWarnings("unchecked") |
98 | SkipListener<T, S> skipListener = (SkipListener<T, S>) listener; |
99 | this.skipListener.register(skipListener); |
100 | } |
101 | } |
102 | |
103 | /** |
104 | * @param item |
105 | * @param result |
106 | * @see org.springframework.batch.core.listener.CompositeItemProcessListener#afterProcess(java.lang.Object, |
107 | * java.lang.Object) |
108 | */ |
109 | public void afterProcess(T item, S result) { |
110 | try { |
111 | itemProcessListener.afterProcess(item, result); |
112 | } |
113 | catch (RuntimeException e) { |
114 | throw new StepListenerFailedException("Error in afterProcess.", e); |
115 | } |
116 | } |
117 | |
118 | /** |
119 | * @param item |
120 | * @see org.springframework.batch.core.listener.CompositeItemProcessListener#beforeProcess(java.lang.Object) |
121 | */ |
122 | public void beforeProcess(T item) { |
123 | try { |
124 | itemProcessListener.beforeProcess(item); |
125 | } |
126 | catch (RuntimeException e) { |
127 | throw new StepListenerFailedException("Error in beforeProcess.", e); |
128 | } |
129 | } |
130 | |
131 | /** |
132 | * @param item |
133 | * @param ex |
134 | * @see org.springframework.batch.core.listener.CompositeItemProcessListener#onProcessError(java.lang.Object, |
135 | * java.lang.Exception) |
136 | */ |
137 | public void onProcessError(T item, Exception ex) { |
138 | try { |
139 | itemProcessListener.onProcessError(item, ex); |
140 | } |
141 | catch (RuntimeException e) { |
142 | throw new StepListenerFailedException("Error in onProcessError.", e); |
143 | } |
144 | } |
145 | |
146 | /** |
147 | * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#afterStep(StepExecution) |
148 | */ |
149 | public ExitStatus afterStep(StepExecution stepExecution) { |
150 | try { |
151 | return stepListener.afterStep(stepExecution); |
152 | } |
153 | catch (RuntimeException e) { |
154 | throw new StepListenerFailedException("Error in afterStep.", e); |
155 | } |
156 | } |
157 | |
158 | /** |
159 | * @param stepExecution |
160 | * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#beforeStep(org.springframework.batch.core.StepExecution) |
161 | */ |
162 | public void beforeStep(StepExecution stepExecution) { |
163 | try { |
164 | stepListener.beforeStep(stepExecution); |
165 | } |
166 | catch (RuntimeException e) { |
167 | throw new StepListenerFailedException("Error in beforeStep.", e); |
168 | } |
169 | } |
170 | |
171 | /** |
172 | * |
173 | * @see org.springframework.batch.core.listener.CompositeChunkListener#afterChunk() |
174 | */ |
175 | public void afterChunk() { |
176 | try { |
177 | chunkListener.afterChunk(); |
178 | } |
179 | catch (RuntimeException e) { |
180 | throw new StepListenerFailedException("Error in afterChunk.", e); |
181 | } |
182 | } |
183 | |
184 | /** |
185 | * |
186 | * @see org.springframework.batch.core.listener.CompositeChunkListener#beforeChunk() |
187 | */ |
188 | public void beforeChunk() { |
189 | try { |
190 | chunkListener.beforeChunk(); |
191 | } |
192 | catch (RuntimeException e) { |
193 | throw new StepListenerFailedException("Error in beforeChunk.", e); |
194 | } |
195 | } |
196 | |
197 | /** |
198 | * @param item |
199 | * @see org.springframework.batch.core.listener.CompositeItemReadListener#afterRead(java.lang.Object) |
200 | */ |
201 | public void afterRead(T item) { |
202 | try { |
203 | itemReadListener.afterRead(item); |
204 | } |
205 | catch (RuntimeException e) { |
206 | throw new StepListenerFailedException("Error in afterRead.", e); |
207 | } |
208 | } |
209 | |
210 | /** |
211 | * |
212 | * @see org.springframework.batch.core.listener.CompositeItemReadListener#beforeRead() |
213 | */ |
214 | public void beforeRead() { |
215 | try { |
216 | itemReadListener.beforeRead(); |
217 | } |
218 | catch (RuntimeException e) { |
219 | throw new StepListenerFailedException("Error in beforeRead.", e); |
220 | } |
221 | } |
222 | |
223 | /** |
224 | * @param ex |
225 | * @see org.springframework.batch.core.listener.CompositeItemReadListener#onReadError(java.lang.Exception) |
226 | */ |
227 | public void onReadError(Exception ex) { |
228 | try { |
229 | itemReadListener.onReadError(ex); |
230 | } |
231 | catch (RuntimeException e) { |
232 | throw new StepListenerFailedException("Error in onReadError.", ex, e); |
233 | } |
234 | } |
235 | |
236 | /** |
237 | * |
238 | * @see ItemWriteListener#afterWrite(List) |
239 | */ |
240 | public void afterWrite(List<? extends S> items) { |
241 | try { |
242 | itemWriteListener.afterWrite(items); |
243 | } |
244 | catch (RuntimeException e) { |
245 | throw new StepListenerFailedException("Error in afterWrite.", e); |
246 | } |
247 | } |
248 | |
249 | /** |
250 | * @param items |
251 | * @see ItemWriteListener#beforeWrite(List) |
252 | */ |
253 | public void beforeWrite(List<? extends S> items) { |
254 | try { |
255 | itemWriteListener.beforeWrite(items); |
256 | } |
257 | catch (RuntimeException e) { |
258 | throw new StepListenerFailedException("Error in beforeWrite.", e); |
259 | } |
260 | } |
261 | |
262 | /** |
263 | * @param ex |
264 | * @param items |
265 | * @see ItemWriteListener#onWriteError(Exception, List) |
266 | */ |
267 | public void onWriteError(Exception ex, List<? extends S> items) { |
268 | try { |
269 | itemWriteListener.onWriteError(ex, items); |
270 | } |
271 | catch (RuntimeException e) { |
272 | throw new StepListenerFailedException("Error in onWriteError.", ex, e); |
273 | } |
274 | } |
275 | |
276 | /** |
277 | * @param t |
278 | * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInRead(java.lang.Throwable) |
279 | */ |
280 | public void onSkipInRead(Throwable t) { |
281 | skipListener.onSkipInRead(t); |
282 | } |
283 | |
284 | /** |
285 | * @param item |
286 | * @param t |
287 | * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInWrite(java.lang.Object, |
288 | * java.lang.Throwable) |
289 | */ |
290 | public void onSkipInWrite(S item, Throwable t) { |
291 | skipListener.onSkipInWrite(item, t); |
292 | } |
293 | |
294 | /** |
295 | * @param item |
296 | * @param t |
297 | * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInProcess(Object, |
298 | * Throwable) |
299 | */ |
300 | public void onSkipInProcess(T item, Throwable t) { |
301 | skipListener.onSkipInProcess(item, t); |
302 | } |
303 | |
304 | } |