View Javadoc

1   /*
2    * Copyright 2006-2011 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.builder;
17  
18  import java.util.ArrayList;
19  import java.util.LinkedHashSet;
20  import java.util.Set;
21  
22  import org.springframework.batch.core.ChunkListener;
23  import org.springframework.batch.core.ItemProcessListener;
24  import org.springframework.batch.core.ItemReadListener;
25  import org.springframework.batch.core.ItemWriteListener;
26  import org.springframework.batch.core.StepExecutionListener;
27  import org.springframework.batch.core.StepListener;
28  import org.springframework.batch.core.listener.StepListenerFactoryBean;
29  import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
30  import org.springframework.batch.core.step.item.SimpleChunkProcessor;
31  import org.springframework.batch.core.step.item.SimpleChunkProvider;
32  import org.springframework.batch.core.step.tasklet.Tasklet;
33  import org.springframework.batch.core.step.tasklet.TaskletStep;
34  import org.springframework.batch.item.ItemProcessor;
35  import org.springframework.batch.item.ItemReader;
36  import org.springframework.batch.item.ItemStream;
37  import org.springframework.batch.item.ItemWriter;
38  import org.springframework.batch.repeat.CompletionPolicy;
39  import org.springframework.batch.repeat.RepeatOperations;
40  import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
41  import org.springframework.batch.repeat.support.RepeatTemplate;
42  import org.springframework.util.Assert;
43  
44  /**
45   * Step builder for simple item processing (chunk oriented) steps. Items are read and cached in chunks, and then
46   * processed (transformed) and written (optionally either the processor or the writer can be omitted) all in the same
47   * transaction.
48   * 
49   * @see FaultTolerantStepBuilder for a step that handles retry and skip of failed items
50   * 
51   * @author Dave Syer
52   * 
53   * @since 2.2
54   */
55  public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {
56  
57  	private static final int DEFAULT_COMMIT_INTERVAL = 1;
58  
59  	private ItemReader<? extends I> reader;
60  
61  	private ItemWriter<? super O> writer;
62  
63  	private ItemProcessor<? super I, ? extends O> processor;
64  
65  	private int chunkSize = 0;
66  
67  	private RepeatOperations chunkOperations;
68  
69  	private CompletionPolicy completionPolicy;
70  
71  	private Set<StepListener> itemListeners = new LinkedHashSet<StepListener>();
72  
73  	private boolean readerTransactionalQueue = false;
74  
75  	/**
76  	 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
77  	 * 
78  	 * @param parent a parent helper containing common step properties
79  	 */
80  	public SimpleStepBuilder(StepBuilderHelper<?> parent) {
81  		super(parent);
82  	}
83  	
84  	/**
85  	 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
86  	 * 
87  	 * @param parent a parent helper containing common step properties
88  	 */
89  	protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) {
90  		super(parent);
91  		this.chunkSize = parent.chunkSize;
92  		this.completionPolicy = parent.completionPolicy;
93  		this.chunkOperations = parent.chunkOperations;
94  		this.reader = parent.reader;
95  		this.writer = parent.writer;
96  		this.processor = parent.processor;
97  		this.itemListeners = parent.itemListeners;
98  		this.readerTransactionalQueue = parent.readerTransactionalQueue;
99  	}
100 	
101 	public FaultTolerantStepBuilder<I, O> faultTolerant() {
102 		FaultTolerantStepBuilder<I, O> builder = new FaultTolerantStepBuilder<I, O>(this);
103 		return builder;
104 	}
105 
106 	/**
107 	 * Build a step with the reader, writer, processor as provided.
108 	 * 
109 	 * @see org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder#build()
110 	 */
111 	@Override
112 	public TaskletStep build() {
113 		registerAsStreamsAndListeners(reader, processor, writer);
114 		return super.build();
115 	}
116 
117 	@Override
118 	protected Tasklet createTasklet() {
119 		Assert.state(reader != null, "ItemReader must be provided");
120 		Assert.state(processor != null || writer != null, "ItemWriter or ItemProcessor must be provided");
121 		RepeatOperations repeatOperations = createChunkOperations();
122 		SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<I>(reader, repeatOperations);
123 		SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<I, O>(processor, writer);
124 		chunkProvider.setListeners(new ArrayList<StepListener>(itemListeners));
125 		chunkProcessor.setListeners(new ArrayList<StepListener>(itemListeners));
126 		ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor);
127 		tasklet.setBuffering(!readerTransactionalQueue);
128 		return tasklet;
129 	}
130 
131 	/**
132 	 * Sets the chunk size or commit interval for this step. This is the maximum number of items that will be read
133 	 * before processing starts in a single transaction. Not compatible with {@link #completionPolicy(CompletionPolicy)}
134 	 * .
135 	 * 
136 	 * @param chunkSize the chunk size (a.k.a commit interval)
137 	 * @return this for fluent chaining
138 	 */
139 	public SimpleStepBuilder<I, O> chunk(int chunkSize) {
140 		Assert.state(completionPolicy == null || chunkSize == 0,
141 				"You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
142 		this.chunkSize = chunkSize;
143 		return this;
144 	}
145 
146 	/**
147 	 * Sets a completion policy for the chunk processing. Items are read until this policy determines that a chunk is
148 	 * complete, giving more control than with just the {@link #chunk(int) chunk size} (or commit interval).
149 	 * 
150 	 * @param completionPolicy a completion policy for the chunk
151 	 * @return this for fluent chaining
152 	 */
153 	public SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
154 		Assert.state(chunkSize == 0 || completionPolicy == null,
155 				"You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
156 		this.completionPolicy = completionPolicy;
157 		return this;
158 	}
159 
160 	/**
161 	 * An item reader that provides a stream of items. Will be automatically registered as a {@link #stream(ItemStream)}
162 	 * or listener if it implements the corresponding interface. By default assumed to be non-transactional.
163 	 * 
164 	 * @see #readerTransactionalQueue
165 	 * @param reader an item reader
166 	 * @return this for fluent chaining
167 	 */
168 	public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
169 		this.reader = reader;
170 		return this;
171 	}
172 
173 	/**
174 	 * An item writer that writes a chunk of items. Will be automatically registered as a {@link #stream(ItemStream)} or
175 	 * listener if it implements the corresponding interface.
176 	 * 
177 	 * @param writer an item writer
178 	 * @return this for fluent chaining
179 	 */
180 	public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
181 		this.writer = writer;
182 		return this;
183 	}
184 
185 	/**
186 	 * An item processor that processes or transforms a stream of items. Will be automatically registered as a
187 	 * {@link #stream(ItemStream)} or listener if it implements the corresponding interface.
188 	 * 
189 	 * @param processor an item processor
190 	 * @return this for fluent chaining
191 	 */
192 	public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
193 		this.processor = processor;
194 		return this;
195 	}
196 
197 	/**
198 	 * Sets a flag to say that the reader is transactional (usually a queue), which is to say that failed items might be
199 	 * rolled back and re-presented in a subsequent transaction. Default is false, meaning that the items are read
200 	 * outside a transaction and possibly cached.
201 	 * 
202 	 * @return this for fluent chaining
203 	 */
204 	public SimpleStepBuilder<I, O> readerIsTransactionalQueue() {
205 		this.readerTransactionalQueue = true;
206 		return this;
207 	}
208 
209 	/**
210 	 * Register an item reader listener.
211 	 * 
212 	 * @param listener the listener to register
213 	 * @return this for fluent chaining
214 	 */
215 	public SimpleStepBuilder<I, O> listener(ItemReadListener<? super I> listener) {
216 		itemListeners.add(listener);
217 		return this;
218 	}
219 
220 	/**
221 	 * Register an item writer listener.
222 	 * 
223 	 * @param listener the listener to register
224 	 * @return this for fluent chaining
225 	 */
226 	public SimpleStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) {
227 		itemListeners.add(listener);
228 		return this;
229 	}
230 
231 	/**
232 	 * Register an item processor listener.
233 	 * 
234 	 * @param listener the listener to register
235 	 * @return this for fluent chaining
236 	 */
237 	public SimpleStepBuilder<I, O> listener(ItemProcessListener<? super I, ? super O> listener) {
238 		itemListeners.add(listener);
239 		return this;
240 	}
241 
242 	/**
243 	 * Instead of a {@link #chunk(int) chunk size} or {@link #chunk(CompletionPolicy) completion policy} you can provide
244 	 * a complete repeat operations instance that handles the iteration over the item reader.
245 	 * 
246 	 * @param repeatTemplate a cmplete repeat template for the chunk
247 	 * @return this for fluent chaining
248 	 */
249 	public SimpleStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) {
250 		this.chunkOperations = repeatTemplate;
251 		return this;
252 	}
253 
254 	protected RepeatOperations createChunkOperations() {
255 		RepeatOperations repeatOperations = chunkOperations;
256 		if (repeatOperations == null) {
257 			RepeatTemplate repeatTemplate = new RepeatTemplate();
258 			repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy());
259 			repeatOperations = repeatTemplate;
260 		}
261 		return repeatOperations;
262 	}
263 
264 	protected ItemReader<? extends I> getReader() {
265 		return reader;
266 	}
267 
268 	protected ItemWriter<? super O> getWriter() {
269 		return writer;
270 	}
271 
272 	protected ItemProcessor<? super I, ? extends O> getProcessor() {
273 		return processor;
274 	}
275 
276 	protected int getChunkSize() {
277 		return chunkSize;
278 	}
279 
280 	protected boolean isReaderTransactionalQueue() {
281 		return readerTransactionalQueue;
282 	}
283 
284 	protected Set<StepListener> getItemListeners() {
285 		return itemListeners;
286 	}
287 
288 	/**
289 	 * @return a {@link CompletionPolicy} consistent with the chunk size and injected policy (if present).
290 	 */
291 	private CompletionPolicy getChunkCompletionPolicy() {
292 		Assert.state(!(completionPolicy != null && chunkSize > 0),
293 				"You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
294 		Assert.state(chunkSize >= 0, "The commitInterval must be positive or zero (for default value).");
295 
296 		if (completionPolicy != null) {
297 			return completionPolicy;
298 		}
299 		if (chunkSize == 0) {
300 			logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");
301 			chunkSize = DEFAULT_COMMIT_INTERVAL;
302 		}
303 		return new SimpleCompletionPolicy(chunkSize);
304 	}
305 
306 	private void registerAsStreamsAndListeners(ItemReader<? extends I> itemReader,
307 			ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
308 		for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) {
309 			if (itemHandler instanceof ItemStream) {
310 				stream((ItemStream) itemHandler);
311 			}
312 			if (StepListenerFactoryBean.isListener(itemHandler)) {
313 				StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
314 				if (listener instanceof StepExecutionListener) {
315 					listener((StepExecutionListener) listener);
316 				}
317 				if (listener instanceof ChunkListener) {
318 					listener((ChunkListener) listener);
319 				}
320 				if (listener instanceof ItemReadListener<?> || listener instanceof ItemProcessListener<?, ?>
321 						|| listener instanceof ItemWriteListener<?>) {
322 					itemListeners.add(listener);
323 				}
324 			}
325 		}
326 	}
327 
328 }