View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.step.item;
18  
19  import java.util.List;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.springframework.batch.core.StepContribution;
24  import org.springframework.batch.core.StepListener;
25  import org.springframework.batch.core.listener.MulticasterBatchListener;
26  import org.springframework.batch.item.ItemReader;
27  import org.springframework.batch.repeat.RepeatCallback;
28  import org.springframework.batch.repeat.RepeatContext;
29  import org.springframework.batch.repeat.RepeatOperations;
30  import org.springframework.batch.repeat.RepeatStatus;
31  
32  /**
33   * Simple implementation of the ChunkProvider interface that does basic chunk
34   * providing from an {@link ItemReader}.
35   *
36   * @author Dave Syer
37   * @author Michael Minella
38   * @see ChunkOrientedTasklet
39   */
40  public class SimpleChunkProvider<I> implements ChunkProvider<I> {
41  
42  	protected final Log logger = LogFactory.getLog(getClass());
43  
44  	protected final ItemReader<? extends I> itemReader;
45  
46  	private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<I, Object>();
47  
48  	private final RepeatOperations repeatOperations;
49  
50  	public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
51  		this.itemReader = itemReader;
52  		this.repeatOperations = repeatOperations;
53  	}
54  
55  	/**
56  	 * Register some {@link StepListener}s with the handler. Each will get the
57  	 * callbacks in the order specified at the correct stage.
58  	 *
59  	 * @param listeners
60  	 */
61  	public void setListeners(List<? extends StepListener> listeners) {
62  		for (StepListener listener : listeners) {
63  			registerListener(listener);
64  		}
65  	}
66  
67  	/**
68  	 * Register a listener for callbacks at the appropriate stages in a process.
69  	 *
70  	 * @param listener a {@link StepListener}
71  	 */
72  	public void registerListener(StepListener listener) {
73  		this.listener.register(listener);
74  	}
75  
76  	/**
77  	 * @return the listener
78  	 */
79  	protected MulticasterBatchListener<I, ?> getListener() {
80  		return listener;
81  	}
82  
83  	/**
84  	 * Surrounds the read call with listener callbacks.
85  	 * @return item
86  	 * @throws Exception
87  	 */
88  	protected final I doRead() throws Exception {
89  		try {
90  			listener.beforeRead();
91  			I item = itemReader.read();
92  			if(item != null) {
93  				listener.afterRead(item);
94  			}
95  			return item;
96  		}
97  		catch (Exception e) {
98  			logger.debug(e.getMessage() + " : " + e.getClass().getName());
99  			listener.onReadError(e);
100 			throw e;
101 		}
102 	}
103 
104 	@Override
105 	public Chunk<I> provide(final StepContribution contribution) throws Exception {
106 
107 		final Chunk<I> inputs = new Chunk<I>();
108 		repeatOperations.iterate(new RepeatCallback() {
109 
110 			@Override
111 			public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
112 				I item = null;
113 				try {
114 					item = read(contribution, inputs);
115 				}
116 				catch (SkipOverflowException e) {
117 					// read() tells us about an excess of skips by throwing an
118 					// exception
119 					return RepeatStatus.FINISHED;
120 				}
121 				if (item == null) {
122 					inputs.setEnd();
123 					return RepeatStatus.FINISHED;
124 				}
125 				inputs.add(item);
126 				contribution.incrementReadCount();
127 				return RepeatStatus.CONTINUABLE;
128 			}
129 
130 		});
131 
132 		return inputs;
133 
134 	}
135 
136 	@Override
137 	public void postProcess(StepContribution contribution, Chunk<I> chunk) {
138 		// do nothing
139 	}
140 
141 	/**
142 	 * Delegates to {@link #doRead()}. Subclasses can add additional behaviour
143 	 * (e.g. exception handling).
144 	 *
145 	 * @param contribution the current step execution contribution
146 	 * @param chunk the current chunk
147 	 * @return a new item for processing
148 	 *
149 	 * @throws SkipOverflowException if specifically the chunk is accumulating
150 	 * too much data (e.g. skips) and it wants to force a commit.
151 	 *
152 	 * @throws Exception if there is a generic issue
153 	 */
154 	protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception {
155 		return doRead();
156 	}
157 
158 }