View Javadoc

1   /*
2    * Copyright 2012-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.step.builder;
17  
18  import java.util.LinkedHashSet;
19  import java.util.Set;
20  
21  import org.springframework.batch.core.ChunkListener;
22  import org.springframework.batch.core.Step;
23  import org.springframework.batch.core.step.tasklet.Tasklet;
24  import org.springframework.batch.core.step.tasklet.TaskletStep;
25  import org.springframework.batch.item.ItemStream;
26  import org.springframework.batch.repeat.RepeatOperations;
27  import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
28  import org.springframework.batch.repeat.exception.ExceptionHandler;
29  import org.springframework.batch.repeat.support.RepeatTemplate;
30  import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
31  import org.springframework.core.task.SyncTaskExecutor;
32  import org.springframework.core.task.TaskExecutor;
33  import org.springframework.transaction.interceptor.TransactionAttribute;
34  
35  /**
36   * Base class for step builders that want to build a {@link TaskletStep}. Handles common concerns across all tasklet
37   * step variants, which are mostly to do with the type of tasklet they carry.
38   * 
39   * @author Dave Syer
40   * 
41   * @since 2.2
42   * 
43   * @param <B> the type of builder represented
44   */
45  public abstract class AbstractTaskletStepBuilder<B extends AbstractTaskletStepBuilder<B>> extends
46  		StepBuilderHelper<AbstractTaskletStepBuilder<B>> {
47  
48  	private Set<ChunkListener> listeners = new LinkedHashSet<ChunkListener>();
49  
50  	private RepeatOperations stepOperations;
51  
52  	private TransactionAttribute transactionAttribute;
53  
54  	private Set<ItemStream> streams = new LinkedHashSet<ItemStream>();
55  
56  	private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
57  
58  	private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT;
59  
60  	private TaskExecutor taskExecutor;
61  
62  	public AbstractTaskletStepBuilder(StepBuilderHelper<?> parent) {
63  		super(parent);
64  	}
65  
66  	protected abstract Tasklet createTasklet();
67  
68  	/**
69  	 * Build the step from the components collected by the fluent setters. Delegates first to {@link #enhance(Step)} and
70  	 * then to {@link #createTasklet()} in subclasses to create the actual tasklet.
71  	 * 
72  	 * @return a tasklet step fully configured and read to execute
73  	 */
74  	public TaskletStep build() {
75  
76  		TaskletStep step = new TaskletStep(getName());
77  
78  		super.enhance(step);
79  
80  		step.setChunkListeners(listeners.toArray(new ChunkListener[0]));
81  
82  		if (transactionAttribute != null) {
83  			step.setTransactionAttribute(transactionAttribute);
84  		}
85  
86  		if (stepOperations == null) {
87  
88  			stepOperations = new RepeatTemplate();
89  
90  			if (taskExecutor != null) {
91  				TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
92  				repeatTemplate.setTaskExecutor(taskExecutor);
93  				repeatTemplate.setThrottleLimit(throttleLimit);
94  				stepOperations = repeatTemplate;
95  			}
96  
97  			((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
98  
99  		}
100 		step.setStepOperations(stepOperations);
101 		step.setTasklet(createTasklet());
102 
103 		step.setStreams(streams.toArray(new ItemStream[0]));
104 
105 		try {
106 			step.afterPropertiesSet();
107 		}
108 		catch (Exception e) {
109 			throw new StepBuilderException(e);
110 		}
111 
112 		return step;
113 
114 	}
115 
116 	/**
117 	 * Register a chunk listener.
118 	 * 
119 	 * @param listener the listener to register
120 	 * @return this for fluent chaining
121 	 */
122 	public AbstractTaskletStepBuilder<B> listener(ChunkListener listener) {
123 		listeners.add(listener);
124 		return this;
125 	}
126 
127 	/**
128 	 * Register a stream for callbacks that manage restart data.
129 	 * 
130 	 * @param stream the stream to register
131 	 * @return this for fluent chaining
132 	 */
133 	public AbstractTaskletStepBuilder<B> stream(ItemStream stream) {
134 		streams.add(stream);
135 		return this;
136 	}
137 
138 	/**
139 	 * Provide a task executor to use when executing the tasklet. Default is to use a single-threaded (synchronous)
140 	 * executor.
141 	 * 
142 	 * @param taskExecutor the task executor to register
143 	 * @return this for fluent chaining
144 	 */
145 	public AbstractTaskletStepBuilder<B> taskExecutor(TaskExecutor taskExecutor) {
146 		this.taskExecutor = taskExecutor;
147 		return this;
148 	}
149 
150 	/**
151 	 * In the case of an asynchronous {@link #taskExecutor(TaskExecutor)} the number of concurrent tasklet executions
152 	 * can be throttled (beyond any throttling provided by a thread pool). The throttle limit should be less than the
153 	 * data source pool size used in the job repository for this step.
154 	 * 
155 	 * @param throttleLimit maximium number of concurrent tasklet executions allowed
156 	 * @return this for fluent chaining
157 	 */
158 	public AbstractTaskletStepBuilder<B> throttleLimit(int throttleLimit) {
159 		this.throttleLimit = throttleLimit;
160 		return this;
161 	}
162 
163 	/**
164 	 * Sets the exception handler to use in the case of tasklet failures. Default is to rethrow everything.
165 	 * 
166 	 * @param exceptionHandler the exception handler
167 	 * @return this for fluent chaining
168 	 */
169 	public AbstractTaskletStepBuilder<B> exceptionHandler(ExceptionHandler exceptionHandler) {
170 		this.exceptionHandler = exceptionHandler;
171 		return this;
172 	}
173 
174 	/**
175 	 * Sets the repeat template used for iterating the tasklet execution. By default it will terminate only when the
176 	 * tasklet returns FINISHED (or null).
177 	 * 
178 	 * @param repeatTemplate a repeat template with rules for iterating
179 	 * @return this for fluent chaining
180 	 */
181 	public AbstractTaskletStepBuilder<B> stepOperations(RepeatOperations repeatTemplate) {
182 		this.stepOperations = repeatTemplate;
183 		return this;
184 	}
185 
186 	/**
187 	 * Sets the transaction attributes for the tasklet execution. Defaults to the default values for the transaction
188 	 * manager, but can be manipulated to provide longer timeouts for instance.
189 	 * 
190 	 * @param transactionAttribute a transaction attribute set
191 	 * @return this for fluent chaining
192 	 */
193 	public AbstractTaskletStepBuilder<B> transactionAttribute(TransactionAttribute transactionAttribute) {
194 		this.transactionAttribute = transactionAttribute;
195 		return this;
196 	}
197 
198 	/**
199 	 * Convenience method for subclasses to access the step operations that were injected by user.
200 	 * 
201 	 * @return the repeat operations used to iterate the tasklet executions
202 	 */
203 	protected RepeatOperations getStepOperations() {
204 		return stepOperations;
205 	}
206 
207 	/**
208 	 * Convenience method for subclasses to access the exception handler that was injected by user.
209 	 * 
210 	 * @return the exception handler
211 	 */
212 	protected ExceptionHandler getExceptionHandler() {
213 		return exceptionHandler;
214 	}
215 
216 	/**
217 	 * Convenience method for subclasses to determine if the step is concurrent.
218 	 * 
219 	 * @return true if the tasklet is going to be run in multiple threads
220 	 */
221 	protected boolean concurrent() {
222 		boolean concurrent = taskExecutor != null && !(taskExecutor instanceof SyncTaskExecutor);
223 		return concurrent;
224 	}
225 
226 }