View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.factory;
17  
18  import org.apache.commons.logging.Log;
19  import org.apache.commons.logging.LogFactory;
20  import org.springframework.batch.core.ChunkListener;
21  import org.springframework.batch.core.ItemProcessListener;
22  import org.springframework.batch.core.ItemReadListener;
23  import org.springframework.batch.core.ItemWriteListener;
24  import org.springframework.batch.core.Step;
25  import org.springframework.batch.core.StepExecutionListener;
26  import org.springframework.batch.core.StepListener;
27  import org.springframework.batch.core.repository.JobRepository;
28  import org.springframework.batch.core.step.builder.SimpleStepBuilder;
29  import org.springframework.batch.core.step.builder.StepBuilder;
30  import org.springframework.batch.core.step.tasklet.TaskletStep;
31  import org.springframework.batch.item.ItemProcessor;
32  import org.springframework.batch.item.ItemReader;
33  import org.springframework.batch.item.ItemStream;
34  import org.springframework.batch.item.ItemWriter;
35  import org.springframework.batch.repeat.CompletionPolicy;
36  import org.springframework.batch.repeat.RepeatOperations;
37  import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
38  import org.springframework.batch.repeat.exception.ExceptionHandler;
39  import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
40  import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
41  import org.springframework.beans.factory.BeanNameAware;
42  import org.springframework.beans.factory.FactoryBean;
43  import org.springframework.core.task.TaskExecutor;
44  import org.springframework.transaction.PlatformTransactionManager;
45  import org.springframework.transaction.annotation.Isolation;
46  import org.springframework.transaction.annotation.Propagation;
47  import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48  import org.springframework.transaction.interceptor.TransactionAttribute;
49  
50  /**
51   * Most common configuration options for simple steps should be found here. Use this factory bean instead of creating a
52   * {@link Step} implementation manually.
53   *
54   * This factory does not support configuration of fault-tolerant behavior, use appropriate subclass of this factory bean
55   * to configure skip or retry.
56   *
57   * @see FaultTolerantStepFactoryBean
58   *
59   * @author Dave Syer
60   * @author Robert Kasanicky
61   *
62   */
63  @SuppressWarnings("rawtypes")
64  public class SimpleStepFactoryBean<T, S> implements FactoryBean, BeanNameAware {
65  
66  	private String name;
67  
68  	private int startLimit = Integer.MAX_VALUE;
69  
70  	private boolean allowStartIfComplete;
71  
72  	private ItemReader<? extends T> itemReader;
73  
74  	private ItemProcessor<? super T, ? extends S> itemProcessor;
75  
76  	private ItemWriter<? super S> itemWriter;
77  
78  	private PlatformTransactionManager transactionManager;
79  
80  	private Propagation propagation = Propagation.REQUIRED;
81  
82  	private Isolation isolation = Isolation.DEFAULT;
83  
84  	private int transactionTimeout = DefaultTransactionAttribute.TIMEOUT_DEFAULT;
85  
86  	private JobRepository jobRepository;
87  
88  	private boolean singleton = true;
89  
90  	private ItemStream[] streams = new ItemStream[0];
91  
92  	private StepListener[] listeners = new StepListener[0];
93  
94  	protected final Log logger = LogFactory.getLog(getClass());
95  
96  	private int commitInterval = 0;
97  
98  	private TaskExecutor taskExecutor;
99  
100 	private RepeatOperations stepOperations;
101 
102 	private RepeatOperations chunkOperations;
103 
104 	private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
105 
106 	private CompletionPolicy chunkCompletionPolicy;
107 
108 	private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT;
109 
110 	private boolean isReaderTransactionalQueue = false;
111 
112 	/**
113 	 * Default constructor for {@link SimpleStepFactoryBean}.
114 	 */
115 	public SimpleStepFactoryBean() {
116 		super();
117 	}
118 
119 	/**
120 	 * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a
121 	 * rollback. The default is false and readers are assumed to be forward-only.
122 	 *
123 	 * @param isReaderTransactionalQueue the value of the flag
124 	 */
125 	public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) {
126 		this.isReaderTransactionalQueue = isReaderTransactionalQueue;
127 	}
128 
129 	/**
130 	 * Convenience method for subclasses.
131 	 * @return true if the flag is set (default false)
132 	 */
133 	protected boolean isReaderTransactionalQueue() {
134 		return isReaderTransactionalQueue;
135 	}
136 
137 	/**
138 	 * Set the bean name property, which will become the name of the {@link Step} when it is created.
139 	 *
140 	 * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
141 	 */
142 	@Override
143 	public void setBeanName(String name) {
144 		this.name = name;
145 	}
146 
147 	/**
148 	 * Public getter for the name of the step.
149 	 * @return the name
150 	 */
151 	public String getName() {
152 		return name;
153 	}
154 
155 	/**
156 	 * The timeout for an individual transaction in the step.
157 	 *
158 	 * @param transactionTimeout the transaction timeout to set, defaults to infinite
159 	 */
160 	public void setTransactionTimeout(int transactionTimeout) {
161 		this.transactionTimeout = transactionTimeout;
162 	}
163 
164 	/**
165 	 * @param propagation the propagation to set for business transactions
166 	 */
167 	public void setPropagation(Propagation propagation) {
168 		this.propagation = propagation;
169 	}
170 
171 	/**
172 	 * @param isolation the isolation to set for business transactions
173 	 */
174 	public void setIsolation(Isolation isolation) {
175 		this.isolation = isolation;
176 	}
177 
178 	/**
179 	 * Public setter for the start limit for the step.
180 	 *
181 	 * @param startLimit the startLimit to set
182 	 */
183 	public void setStartLimit(int startLimit) {
184 		this.startLimit = startLimit;
185 	}
186 
187 	/**
188 	 * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the
189 	 * first time.
190 	 *
191 	 * @param allowStartIfComplete the shouldAllowStartIfComplete to set
192 	 */
193 	public void setAllowStartIfComplete(boolean allowStartIfComplete) {
194 		this.allowStartIfComplete = allowStartIfComplete;
195 	}
196 
197 	/**
198 	 * @param itemReader the {@link ItemReader} to set
199 	 */
200 	public void setItemReader(ItemReader<? extends T> itemReader) {
201 		this.itemReader = itemReader;
202 	}
203 
204 	/**
205 	 * @param itemWriter the {@link ItemWriter} to set
206 	 */
207 	public void setItemWriter(ItemWriter<? super S> itemWriter) {
208 		this.itemWriter = itemWriter;
209 	}
210 
211 	/**
212 	 * @param itemProcessor the {@link ItemProcessor} to set
213 	 */
214 	public void setItemProcessor(ItemProcessor<? super T, ? extends S> itemProcessor) {
215 		this.itemProcessor = itemProcessor;
216 	}
217 
218 	/**
219 	 * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then
220 	 * receive callbacks at the appropriate stage in the step.
221 	 *
222 	 * @param streams an array of listeners
223 	 */
224 	public void setStreams(ItemStream[] streams) {
225 		this.streams = streams;
226 	}
227 
228 	/**
229 	 * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then
230 	 * receive callbacks at the appropriate stage in the step.
231 	 *
232 	 * @param listeners an array of listeners
233 	 */
234 	public void setListeners(StepListener[] listeners) {
235 		this.listeners = listeners;
236 	}
237 
238 	/**
239 	 * Protected getter for the {@link StepListener}s.
240 	 * @return the listeners
241 	 */
242 	protected StepListener[] getListeners() {
243 		return listeners;
244 	}
245 
246 	/**
247 	 * Protected getter for the {@link ItemReader} for subclasses to use.
248 	 * @return the itemReader
249 	 */
250 	protected ItemReader<? extends T> getItemReader() {
251 		return itemReader;
252 	}
253 
254 	/**
255 	 * Protected getter for the {@link ItemWriter} for subclasses to use
256 	 * @return the itemWriter
257 	 */
258 	protected ItemWriter<? super S> getItemWriter() {
259 		return itemWriter;
260 	}
261 
262 	/**
263 	 * Protected getter for the {@link ItemProcessor} for subclasses to use
264 	 * @return the itemProcessor
265 	 */
266 	protected ItemProcessor<? super T, ? extends S> getItemProcessor() {
267 		return itemProcessor;
268 	}
269 
270 	/**
271 	 * Public setter for {@link JobRepository}.
272 	 *
273 	 * @param jobRepository is a mandatory dependence (no default).
274 	 */
275 	public void setJobRepository(JobRepository jobRepository) {
276 		this.jobRepository = jobRepository;
277 	}
278 
279 	/**
280 	 * Public setter for the {@link PlatformTransactionManager}.
281 	 *
282 	 * @param transactionManager the transaction manager to set
283 	 */
284 	public void setTransactionManager(PlatformTransactionManager transactionManager) {
285 		this.transactionManager = transactionManager;
286 	}
287 
288 	/**
289 	 * Getter for the {@link TransactionAttribute} for subclasses only.
290 	 * @return the transactionAttribute
291 	 */
292 	@SuppressWarnings("serial")
293 	protected TransactionAttribute getTransactionAttribute() {
294 
295 		DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
296 		attribute.setPropagationBehavior(propagation.value());
297 		attribute.setIsolationLevel(isolation.value());
298 		attribute.setTimeout(transactionTimeout);
299 		return new DefaultTransactionAttribute(attribute) {
300 
301 			/**
302 			 * Ignore the default behaviour and rollback on all exceptions that bubble up to the tasklet level. The
303 			 * tasklet has to deal with the rollback rules internally.
304 			 */
305 			@Override
306 			public boolean rollbackOn(Throwable ex) {
307 				return true;
308 			}
309 
310 		};
311 
312 	}
313 
314 	/**
315 	 * Create a {@link Step} from the configuration provided.
316 	 *
317 	 * @see FactoryBean#getObject()
318 	 */
319 	@Override
320 	public final Object getObject() throws Exception {
321 		SimpleStepBuilder<T, S> builder = createBuilder(getName());
322 		applyConfiguration(builder);
323 		TaskletStep step = builder.build();
324 		return step;
325 	}
326 
327 	protected SimpleStepBuilder<T, S> createBuilder(String name) {
328 		return new SimpleStepBuilder<T, S>(new StepBuilder(name));
329 	}
330 
331 	@Override
332 	public Class<TaskletStep> getObjectType() {
333 		return TaskletStep.class;
334 	}
335 
336 	/**
337 	 * Returns true by default, but in most cases a {@link Step} should not be treated as thread safe. Clients are
338 	 * recommended to create a new step for each job execution.
339 	 *
340 	 * @see org.springframework.beans.factory.FactoryBean#isSingleton()
341 	 */
342 	@Override
343 	public boolean isSingleton() {
344 		return this.singleton;
345 	}
346 
347 	/**
348 	 * Public setter for the singleton flag.
349 	 * @param singleton the value to set. Defaults to true.
350 	 */
351 	public void setSingleton(boolean singleton) {
352 		this.singleton = singleton;
353 	}
354 
355 	/**
356 	 * Set the commit interval. Either set this or the chunkCompletionPolicy but not both.
357 	 *
358 	 * @param commitInterval 1 by default
359 	 */
360 	public void setCommitInterval(int commitInterval) {
361 		this.commitInterval = commitInterval;
362 	}
363 
364 	/**
365 	 * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when
366 	 * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the
367 	 * commitInterval property.
368 	 *
369 	 * @param chunkCompletionPolicy the chunkCompletionPolicy to set
370 	 */
371 	public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) {
372 		this.chunkCompletionPolicy = chunkCompletionPolicy;
373 	}
374 
375 	/**
376 	 * Protected getter for the step operations to make them available in subclasses.
377 	 * @return the step operations
378 	 */
379 	protected RepeatOperations getStepOperations() {
380 		return stepOperations;
381 	}
382 
383 	/**
384 	 * Public setter for the stepOperations.
385 	 * @param stepOperations the stepOperations to set
386 	 */
387 	public void setStepOperations(RepeatOperations stepOperations) {
388 		this.stepOperations = stepOperations;
389 	}
390 
391 	/**
392 	 * Public setter for the chunkOperations.
393 	 * @param chunkOperations the chunkOperations to set
394 	 */
395 	public void setChunkOperations(RepeatOperations chunkOperations) {
396 		this.chunkOperations = chunkOperations;
397 	}
398 
399 	/**
400 	 * Protected getter for the chunk operations to make them available in subclasses.
401 	 * @return the step operations
402 	 */
403 	protected RepeatOperations getChunkOperations() {
404 		return chunkOperations;
405 	}
406 
407 	/**
408 	 * Public setter for the {@link ExceptionHandler}.
409 	 * @param exceptionHandler the exceptionHandler to set
410 	 */
411 	public void setExceptionHandler(ExceptionHandler exceptionHandler) {
412 		this.exceptionHandler = exceptionHandler;
413 	}
414 
415 	/**
416 	 * Protected getter for the {@link ExceptionHandler}.
417 	 * @return the {@link ExceptionHandler}
418 	 */
419 	protected ExceptionHandler getExceptionHandler() {
420 		return exceptionHandler;
421 	}
422 
423 	/**
424 	 * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing
425 	 * inside the {@link Step}.
426 	 *
427 	 * @param taskExecutor the taskExecutor to set
428 	 */
429 	public void setTaskExecutor(TaskExecutor taskExecutor) {
430 		this.taskExecutor = taskExecutor;
431 	}
432 
433 	/**
434 	 * Mkae the {@link TaskExecutor} available to subclasses
435 	 * @return the taskExecutor to be used to execute chunks
436 	 */
437 	protected TaskExecutor getTaskExecutor() {
438 		return taskExecutor;
439 	}
440 
441 	/**
442 	 * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent
443 	 * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}.
444 	 * @param throttleLimit the throttle limit to set.
445 	 */
446 	public void setThrottleLimit(int throttleLimit) {
447 		this.throttleLimit = throttleLimit;
448 	}
449 
450 	protected void applyConfiguration(SimpleStepBuilder<T, S> builder) {
451 
452 		builder.reader(itemReader);
453 		builder.processor(itemProcessor);
454 		builder.writer(itemWriter);
455 		for (StepExecutionListener listener : BatchListenerFactoryHelper.<StepExecutionListener> getListeners(
456 				listeners, StepExecutionListener.class)) {
457 			builder.listener(listener);
458 		}
459 		for (ChunkListener listener : BatchListenerFactoryHelper.<ChunkListener> getListeners(listeners,
460 				ChunkListener.class)) {
461 			builder.listener(listener);
462 		}
463 		for (ItemReadListener<T> listener : BatchListenerFactoryHelper.<ItemReadListener<T>> getListeners(listeners,
464 				ItemReadListener.class)) {
465 			builder.listener(listener);
466 		}
467 		for (ItemWriteListener<S> listener : BatchListenerFactoryHelper.<ItemWriteListener<S>> getListeners(listeners,
468 				ItemWriteListener.class)) {
469 			builder.listener(listener);
470 		}
471 		for (ItemProcessListener<T, S> listener : BatchListenerFactoryHelper.<ItemProcessListener<T, S>> getListeners(
472 				listeners, ItemProcessListener.class)) {
473 			builder.listener(listener);
474 		}
475 		builder.transactionManager(transactionManager);
476 		builder.transactionAttribute(getTransactionAttribute());
477 		builder.repository(jobRepository);
478 		builder.startLimit(startLimit);
479 		builder.allowStartIfComplete(allowStartIfComplete);
480 		builder.chunk(commitInterval);
481 		builder.chunk(chunkCompletionPolicy);
482 		builder.chunkOperations(chunkOperations);
483 		builder.stepOperations(stepOperations);
484 		builder.taskExecutor(taskExecutor);
485 		builder.throttleLimit(throttleLimit);
486 		builder.exceptionHandler(exceptionHandler);
487 		if (isReaderTransactionalQueue) {
488 			builder.readerIsTransactionalQueue();
489 		}
490 		for (ItemStream stream : streams) {
491 			builder.stream(stream);
492 		}
493 
494 	}
495 }