1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.item; |
17 | |
18 | import org.apache.commons.logging.Log; |
19 | import org.apache.commons.logging.LogFactory; |
20 | import org.springframework.batch.core.Step; |
21 | import org.springframework.batch.repeat.CompletionPolicy; |
22 | import org.springframework.batch.repeat.exception.DefaultExceptionHandler; |
23 | import org.springframework.batch.repeat.exception.ExceptionHandler; |
24 | import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; |
25 | import org.springframework.batch.repeat.support.RepeatTemplate; |
26 | import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; |
27 | import org.springframework.core.task.TaskExecutor; |
28 | import org.springframework.util.Assert; |
29 | |
30 | /** |
31 | * Most common configuration options for simple steps should be found here. Use |
32 | * this factory bean instead of creating a {@link Step} implementation manually. |
33 | * |
34 | * This factory does not support configuration of fault-tolerant behavior, use |
35 | * appropriate subclass of this factory bean to configure skip or retry. |
36 | * |
37 | * @see SkipLimitStepFactoryBean |
38 | * |
39 | * @author Dave Syer |
40 | * |
41 | */ |
42 | public class SimpleStepFactoryBean extends AbstractStepFactoryBean { |
43 | |
44 | protected final Log logger = LogFactory.getLog(getClass()); |
45 | |
46 | private static final int DEFAULT_COMMIT_INTERVAL = 1; |
47 | |
48 | private int commitInterval = 0; |
49 | |
50 | private TaskExecutor taskExecutor; |
51 | |
52 | private ItemHandler itemHandler; |
53 | |
54 | private RepeatTemplate stepOperations; |
55 | |
56 | private RepeatTemplate chunkOperations; |
57 | |
58 | private ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); |
59 | |
60 | private CompletionPolicy chunkCompletionPolicy; |
61 | |
62 | private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT; |
63 | |
64 | /** |
65 | * Set the commit interval. Either set this or the chunkCompletionPolicy but |
66 | * not both. |
67 | * |
68 | * @param commitInterval 1 by default |
69 | */ |
70 | public void setCommitInterval(int commitInterval) { |
71 | this.commitInterval = commitInterval; |
72 | } |
73 | |
74 | /** |
75 | * Public setter for the {@link CompletionPolicy} applying to the chunk |
76 | * level. A transaction will be committed when this policy decides to |
77 | * complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size |
78 | * equal to the commitInterval property. |
79 | * |
80 | * @param chunkCompletionPolicy the chunkCompletionPolicy to set |
81 | */ |
82 | public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) { |
83 | this.chunkCompletionPolicy = chunkCompletionPolicy; |
84 | } |
85 | |
86 | /** |
87 | * Protected getter for the step operations to make them available in |
88 | * subclasses. |
89 | * @return the step operations |
90 | */ |
91 | protected RepeatTemplate getStepOperations() { |
92 | return stepOperations; |
93 | } |
94 | |
95 | /** |
96 | * Protected getter for the chunk operations to make them available in |
97 | * subclasses. |
98 | * @return the step operations |
99 | */ |
100 | protected RepeatTemplate getChunkOperations() { |
101 | return chunkOperations; |
102 | } |
103 | |
104 | /** |
105 | * Public setter for the {@link ExceptionHandler}. |
106 | * @param exceptionHandler the exceptionHandler to set |
107 | */ |
108 | public void setExceptionHandler(ExceptionHandler exceptionHandler) { |
109 | this.exceptionHandler = exceptionHandler; |
110 | } |
111 | |
112 | /** |
113 | * Protected getter for the {@link ExceptionHandler}. |
114 | * @return the {@link ExceptionHandler} |
115 | */ |
116 | protected ExceptionHandler getExceptionHandler() { |
117 | return exceptionHandler; |
118 | } |
119 | |
120 | /** |
121 | * Public setter for the {@link TaskExecutor}. If this is set, then it will |
122 | * be used to execute the chunk processing inside the {@link Step}. |
123 | * |
124 | * @param taskExecutor the taskExecutor to set |
125 | */ |
126 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
127 | this.taskExecutor = taskExecutor; |
128 | } |
129 | |
130 | /** |
131 | * Public setter for the throttle limit. This limits the number of tasks |
132 | * queued for concurrent processing to prevent thread pools from being |
133 | * overwhelmed. Defaults to |
134 | * {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}. |
135 | * @param throttleLimit the throttle limit to set. |
136 | */ |
137 | public void setThrottleLimit(int throttleLimit) { |
138 | this.throttleLimit = throttleLimit; |
139 | } |
140 | |
141 | /** |
142 | * Public getter for the ItemHandler. |
143 | * @return the ItemHandler |
144 | */ |
145 | protected ItemHandler getItemHandler() { |
146 | return itemHandler; |
147 | } |
148 | |
149 | /** |
150 | * Public setter for the ItemHandler. |
151 | * @param itemHandler the ItemHandler to set |
152 | */ |
153 | protected void setItemHandler(ItemHandler itemHandler) { |
154 | this.itemHandler = itemHandler; |
155 | } |
156 | |
157 | /** |
158 | * @param step |
159 | * |
160 | */ |
161 | protected void applyConfiguration(ItemOrientedStep step) { |
162 | |
163 | super.applyConfiguration(step); |
164 | |
165 | chunkOperations = new RepeatTemplate(); |
166 | chunkOperations.setCompletionPolicy(getChunkCompletionPolicy()); |
167 | BatchListenerFactoryHelper.addChunkListeners(chunkOperations, getListeners()); |
168 | step.setChunkOperations(chunkOperations); |
169 | |
170 | stepOperations = new RepeatTemplate(); |
171 | |
172 | if (taskExecutor != null) { |
173 | TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); |
174 | repeatTemplate.setTaskExecutor(taskExecutor); |
175 | repeatTemplate.setThrottleLimit(throttleLimit); |
176 | stepOperations = repeatTemplate; |
177 | } |
178 | |
179 | stepOperations.setExceptionHandler(exceptionHandler); |
180 | |
181 | step.setStepOperations(stepOperations); |
182 | |
183 | } |
184 | |
185 | /** |
186 | * @return a {@link CompletionPolicy} consistent with the commit interval |
187 | * and injected policy (if present). |
188 | */ |
189 | private CompletionPolicy getChunkCompletionPolicy() { |
190 | Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0), |
191 | "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); |
192 | Assert.state(commitInterval >= 0, "The commitInterval must be positive or zero (for default value)."); |
193 | |
194 | if (chunkCompletionPolicy != null) { |
195 | return chunkCompletionPolicy; |
196 | } |
197 | if (commitInterval == 0) { |
198 | logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")"); |
199 | commitInterval = DEFAULT_COMMIT_INTERVAL; |
200 | } |
201 | return new SimpleCompletionPolicy(commitInterval); |
202 | } |
203 | |
204 | } |