View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.listener;
17  
18  import java.util.List;
19  
20  import org.springframework.batch.core.ChunkListener;
21  import org.springframework.batch.core.ExitStatus;
22  import org.springframework.batch.core.ItemProcessListener;
23  import org.springframework.batch.core.ItemReadListener;
24  import org.springframework.batch.core.ItemWriteListener;
25  import org.springframework.batch.core.SkipListener;
26  import org.springframework.batch.core.StepExecution;
27  import org.springframework.batch.core.StepExecutionListener;
28  import org.springframework.batch.core.StepListener;
29  import org.springframework.batch.core.scope.context.ChunkContext;
30  import org.springframework.batch.item.ItemStream;
31  
32  /**
33   * @author Dave Syer
34   * @author Michael Minella
35   */
36  public class MulticasterBatchListener<T, S> implements StepExecutionListener, ChunkListener, ItemReadListener<T>,
37  ItemProcessListener<T, S>, ItemWriteListener<S>, SkipListener<T, S> {
38  
39  	private CompositeStepExecutionListener stepListener = new CompositeStepExecutionListener();
40  
41  	private CompositeChunkListener chunkListener = new CompositeChunkListener();
42  
43  	private CompositeItemReadListener<T> itemReadListener = new CompositeItemReadListener<T>();
44  
45  	private CompositeItemProcessListener<T, S> itemProcessListener = new CompositeItemProcessListener<T, S>();
46  
47  	private CompositeItemWriteListener<S> itemWriteListener = new CompositeItemWriteListener<S>();
48  
49  	private CompositeSkipListener<T, S> skipListener = new CompositeSkipListener<T, S>();
50  
51  	/**
52  	 * Initialise the listener instance.
53  	 */
54  	public MulticasterBatchListener() {
55  		super();
56  	}
57  
58  	/**
59  	 * Register each of the objects as listeners. Once registered, calls to the
60  	 * {@link MulticasterBatchListener} broadcast to the individual listeners.
61  	 *
62  	 * @param listeners listener objects of types known to the multicaster.
63  	 */
64  	public void setListeners(List<? extends StepListener> listeners) {
65  		for (StepListener stepListener : listeners) {
66  			register(stepListener);
67  		}
68  	}
69  
70  	/**
71  	 * Register the listener for callbacks on the appropriate interfaces
72  	 * implemented. Any {@link StepListener} can be provided, or an
73  	 * {@link ItemStream}. Other types will be ignored.
74  	 */
75  	public void register(StepListener listener) {
76  		if (listener instanceof StepExecutionListener) {
77  			this.stepListener.register((StepExecutionListener) listener);
78  		}
79  		if (listener instanceof ChunkListener) {
80  			this.chunkListener.register((ChunkListener) listener);
81  		}
82  		if (listener instanceof ItemReadListener<?>) {
83  			@SuppressWarnings("unchecked")
84  			ItemReadListener<T> itemReadListener = (ItemReadListener<T>) listener;
85  			this.itemReadListener.register(itemReadListener);
86  		}
87  		if (listener instanceof ItemProcessListener<?, ?>) {
88  			@SuppressWarnings("unchecked")
89  			ItemProcessListener<T, S> itemProcessListener = (ItemProcessListener<T, S>) listener;
90  			this.itemProcessListener.register(itemProcessListener);
91  		}
92  		if (listener instanceof ItemWriteListener<?>) {
93  			@SuppressWarnings("unchecked")
94  			ItemWriteListener<S> itemWriteListener = (ItemWriteListener<S>) listener;
95  			this.itemWriteListener.register(itemWriteListener);
96  		}
97  		if (listener instanceof SkipListener<?, ?>) {
98  			@SuppressWarnings("unchecked")
99  			SkipListener<T, S> skipListener = (SkipListener<T, S>) listener;
100 			this.skipListener.register(skipListener);
101 		}
102 	}
103 
104 	/**
105 	 * @param item
106 	 * @param result
107 	 * @see org.springframework.batch.core.listener.CompositeItemProcessListener#afterProcess(java.lang.Object,
108 	 * java.lang.Object)
109 	 */
110 	@Override
111 	public void afterProcess(T item, S result) {
112 		try {
113 			itemProcessListener.afterProcess(item, result);
114 		}
115 		catch (RuntimeException e) {
116 			throw new StepListenerFailedException("Error in afterProcess.", e);
117 		}
118 	}
119 
120 	/**
121 	 * @param item
122 	 * @see org.springframework.batch.core.listener.CompositeItemProcessListener#beforeProcess(java.lang.Object)
123 	 */
124 	@Override
125 	public void beforeProcess(T item) {
126 		try {
127 			itemProcessListener.beforeProcess(item);
128 		}
129 		catch (RuntimeException e) {
130 			throw new StepListenerFailedException("Error in beforeProcess.", e);
131 		}
132 	}
133 
134 	/**
135 	 * @param item
136 	 * @param ex
137 	 * @see org.springframework.batch.core.listener.CompositeItemProcessListener#onProcessError(java.lang.Object,
138 	 * java.lang.Exception)
139 	 */
140 	@Override
141 	public void onProcessError(T item, Exception ex) {
142 		try {
143 			itemProcessListener.onProcessError(item, ex);
144 		}
145 		catch (RuntimeException e) {
146 			throw new StepListenerFailedException("Error in onProcessError.", e);
147 		}
148 	}
149 
150 	/**
151 	 * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#afterStep(StepExecution)
152 	 */
153 	@Override
154 	public ExitStatus afterStep(StepExecution stepExecution) {
155 		try {
156 			return stepListener.afterStep(stepExecution);
157 		}
158 		catch (RuntimeException e) {
159 			throw new StepListenerFailedException("Error in afterStep.", e);
160 		}
161 	}
162 
163 	/**
164 	 * @param stepExecution
165 	 * @see org.springframework.batch.core.listener.CompositeStepExecutionListener#beforeStep(org.springframework.batch.core.StepExecution)
166 	 */
167 	@Override
168 	public void beforeStep(StepExecution stepExecution) {
169 		try {
170 			stepListener.beforeStep(stepExecution);
171 		}
172 		catch (RuntimeException e) {
173 			throw new StepListenerFailedException("Error in beforeStep.", e);
174 		}
175 	}
176 
177 	/**
178 	 *
179 	 * @see org.springframework.batch.core.listener.CompositeChunkListener#afterChunk(ChunkContext context)
180 	 */
181 	@Override
182 	public void afterChunk(ChunkContext context) {
183 		try {
184 			chunkListener.afterChunk(context);
185 		}
186 		catch (RuntimeException e) {
187 			throw new StepListenerFailedException("Error in afterChunk.", e);
188 		}
189 	}
190 
191 	/**
192 	 *
193 	 * @see org.springframework.batch.core.listener.CompositeChunkListener#beforeChunk(ChunkContext context)
194 	 */
195 	@Override
196 	public void beforeChunk(ChunkContext context) {
197 		try {
198 			chunkListener.beforeChunk(context);
199 		}
200 		catch (RuntimeException e) {
201 			throw new StepListenerFailedException("Error in beforeChunk.", e);
202 		}
203 	}
204 
205 	/**
206 	 * @param item
207 	 * @see org.springframework.batch.core.listener.CompositeItemReadListener#afterRead(java.lang.Object)
208 	 */
209 	@Override
210 	public void afterRead(T item) {
211 		try {
212 			itemReadListener.afterRead(item);
213 		}
214 		catch (RuntimeException e) {
215 			throw new StepListenerFailedException("Error in afterRead.", e);
216 		}
217 	}
218 
219 	/**
220 	 *
221 	 * @see org.springframework.batch.core.listener.CompositeItemReadListener#beforeRead()
222 	 */
223 	@Override
224 	public void beforeRead() {
225 		try {
226 			itemReadListener.beforeRead();
227 		}
228 		catch (RuntimeException e) {
229 			throw new StepListenerFailedException("Error in beforeRead.", e);
230 		}
231 	}
232 
233 	/**
234 	 * @param ex
235 	 * @see org.springframework.batch.core.listener.CompositeItemReadListener#onReadError(java.lang.Exception)
236 	 */
237 	@Override
238 	public void onReadError(Exception ex) {
239 		try {
240 			itemReadListener.onReadError(ex);
241 		}
242 		catch (RuntimeException e) {
243 			throw new StepListenerFailedException("Error in onReadError.", e);
244 		}
245 	}
246 
247 	/**
248 	 *
249 	 * @see ItemWriteListener#afterWrite(List)
250 	 */
251 	@Override
252 	public void afterWrite(List<? extends S> items) {
253 		try {
254 			itemWriteListener.afterWrite(items);
255 		}
256 		catch (RuntimeException e) {
257 			throw new StepListenerFailedException("Error in afterWrite.", e);
258 		}
259 	}
260 
261 	/**
262 	 * @param items
263 	 * @see ItemWriteListener#beforeWrite(List)
264 	 */
265 	@Override
266 	public void beforeWrite(List<? extends S> items) {
267 		try {
268 			itemWriteListener.beforeWrite(items);
269 		}
270 		catch (RuntimeException e) {
271 			throw new StepListenerFailedException("Error in beforeWrite.", e);
272 		}
273 	}
274 
275 	/**
276 	 * @param ex
277 	 * @param items
278 	 * @see ItemWriteListener#onWriteError(Exception, List)
279 	 */
280 	@Override
281 	public void onWriteError(Exception ex, List<? extends S> items) {
282 		try {
283 			itemWriteListener.onWriteError(ex, items);
284 		}
285 		catch (RuntimeException e) {
286 			throw new StepListenerFailedException("Error in onWriteError.", e);
287 		}
288 	}
289 
290 	/**
291 	 * @param t
292 	 * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInRead(java.lang.Throwable)
293 	 */
294 	@Override
295 	public void onSkipInRead(Throwable t) {
296 		skipListener.onSkipInRead(t);
297 	}
298 
299 	/**
300 	 * @param item
301 	 * @param t
302 	 * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInWrite(java.lang.Object,
303 	 * java.lang.Throwable)
304 	 */
305 	@Override
306 	public void onSkipInWrite(S item, Throwable t) {
307 		skipListener.onSkipInWrite(item, t);
308 	}
309 
310 	/**
311 	 * @param item
312 	 * @param t
313 	 * @see org.springframework.batch.core.listener.CompositeSkipListener#onSkipInProcess(Object,
314 	 * Throwable)
315 	 */
316 	@Override
317 	public void onSkipInProcess(T item, Throwable t) {
318 		skipListener.onSkipInProcess(item, t);
319 	}
320 
321 	@Override
322 	public void afterChunkError(ChunkContext context) {
323 		try {
324 			chunkListener.afterChunkError(context);
325 		}
326 		catch (RuntimeException e) {
327 			throw new StepListenerFailedException("Error in afterFailedChunk.", e);
328 		}
329 	}
330 }