| 1 | /* |
| 2 | * Copyright 2006-2014 the original author or authors. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.springframework.batch.core.step.builder; |
| 17 | |
| 18 | import org.springframework.batch.core.ChunkListener; |
| 19 | import org.springframework.batch.core.ItemProcessListener; |
| 20 | import org.springframework.batch.core.ItemReadListener; |
| 21 | import org.springframework.batch.core.ItemWriteListener; |
| 22 | import org.springframework.batch.core.StepExecutionListener; |
| 23 | import org.springframework.batch.core.StepListener; |
| 24 | import org.springframework.batch.core.annotation.AfterProcess; |
| 25 | import org.springframework.batch.core.annotation.AfterRead; |
| 26 | import org.springframework.batch.core.annotation.AfterWrite; |
| 27 | import org.springframework.batch.core.annotation.BeforeProcess; |
| 28 | import org.springframework.batch.core.annotation.BeforeRead; |
| 29 | import org.springframework.batch.core.annotation.BeforeWrite; |
| 30 | import org.springframework.batch.core.annotation.OnProcessError; |
| 31 | import org.springframework.batch.core.annotation.OnReadError; |
| 32 | import org.springframework.batch.core.annotation.OnWriteError; |
| 33 | import org.springframework.batch.core.listener.StepListenerFactoryBean; |
| 34 | import org.springframework.batch.core.step.item.ChunkOrientedTasklet; |
| 35 | import org.springframework.batch.core.step.item.SimpleChunkProcessor; |
| 36 | import org.springframework.batch.core.step.item.SimpleChunkProvider; |
| 37 | import org.springframework.batch.core.step.tasklet.Tasklet; |
| 38 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
| 39 | import org.springframework.batch.item.ItemProcessor; |
| 40 | import org.springframework.batch.item.ItemReader; |
| 41 | import org.springframework.batch.item.ItemStream; |
| 42 | import org.springframework.batch.item.ItemWriter; |
| 43 | import org.springframework.batch.repeat.CompletionPolicy; |
| 44 | import org.springframework.batch.repeat.RepeatOperations; |
| 45 | import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; |
| 46 | import org.springframework.batch.repeat.support.RepeatTemplate; |
| 47 | import org.springframework.batch.support.ReflectionUtils; |
| 48 | import org.springframework.util.Assert; |
| 49 | |
| 50 | import java.lang.reflect.Method; |
| 51 | import java.util.ArrayList; |
| 52 | import java.util.HashSet; |
| 53 | import java.util.LinkedHashSet; |
| 54 | import java.util.Set; |
| 55 | |
| 56 | /** |
| 57 | * Step builder for simple item processing (chunk oriented) steps. Items are read and cached in chunks, and then |
| 58 | * processed (transformed) and written (optionally either the processor or the writer can be omitted) all in the same |
| 59 | * transaction. |
| 60 | * |
| 61 | * @see FaultTolerantStepBuilder for a step that handles retry and skip of failed items |
| 62 | * |
| 63 | * @author Dave Syer |
| 64 | * |
| 65 | * @since 2.2 |
| 66 | */ |
| 67 | public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> { |
| 68 | |
| 69 | private static final int DEFAULT_COMMIT_INTERVAL = 1; |
| 70 | |
| 71 | private ItemReader<? extends I> reader; |
| 72 | |
| 73 | private ItemWriter<? super O> writer; |
| 74 | |
| 75 | private ItemProcessor<? super I, ? extends O> processor; |
| 76 | |
| 77 | private int chunkSize = 0; |
| 78 | |
| 79 | private RepeatOperations chunkOperations; |
| 80 | |
| 81 | private CompletionPolicy completionPolicy; |
| 82 | |
| 83 | private Set<StepListener> itemListeners = new LinkedHashSet<StepListener>(); |
| 84 | |
| 85 | private boolean readerTransactionalQueue = false; |
| 86 | |
| 87 | /** |
| 88 | * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. |
| 89 | * |
| 90 | * @param parent a parent helper containing common step properties |
| 91 | */ |
| 92 | public SimpleStepBuilder(StepBuilderHelper<?> parent) { |
| 93 | super(parent); |
| 94 | } |
| 95 | |
| 96 | /** |
| 97 | * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. |
| 98 | * |
| 99 | * @param parent a parent helper containing common step properties |
| 100 | */ |
| 101 | protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) { |
| 102 | super(parent); |
| 103 | this.chunkSize = parent.chunkSize; |
| 104 | this.completionPolicy = parent.completionPolicy; |
| 105 | this.chunkOperations = parent.chunkOperations; |
| 106 | this.reader = parent.reader; |
| 107 | this.writer = parent.writer; |
| 108 | this.processor = parent.processor; |
| 109 | this.itemListeners = parent.itemListeners; |
| 110 | this.readerTransactionalQueue = parent.readerTransactionalQueue; |
| 111 | } |
| 112 | |
| 113 | public FaultTolerantStepBuilder<I, O> faultTolerant() { |
| 114 | FaultTolerantStepBuilder<I, O> builder = new FaultTolerantStepBuilder<I, O>(this); |
| 115 | return builder; |
| 116 | } |
| 117 | |
| 118 | /** |
| 119 | * Build a step with the reader, writer, processor as provided. |
| 120 | * |
| 121 | * @see org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder#build() |
| 122 | */ |
| 123 | @Override |
| 124 | public TaskletStep build() { |
| 125 | registerStepListenerAsItemListener(); |
| 126 | registerAsStreamsAndListeners(reader, processor, writer); |
| 127 | return super.build(); |
| 128 | } |
| 129 | |
| 130 | private void registerStepListenerAsItemListener() { |
| 131 | for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){ |
| 132 | checkAndAddItemListener(stepExecutionListener); |
| 133 | } |
| 134 | for (ChunkListener chunkListener: this.chunkListeners){ |
| 135 | checkAndAddItemListener(chunkListener); |
| 136 | } |
| 137 | } |
| 138 | |
| 139 | @SuppressWarnings("unchecked") |
| 140 | private void checkAndAddItemListener(StepListener stepListener) { |
| 141 | if (stepListener instanceof ItemReadListener){ |
| 142 | listener((ItemReadListener<I>)stepListener); |
| 143 | } |
| 144 | if (stepListener instanceof ItemProcessListener){ |
| 145 | listener((ItemProcessListener<I,O>)stepListener); |
| 146 | } |
| 147 | if (stepListener instanceof ItemWriteListener){ |
| 148 | listener((ItemWriteListener<O>)stepListener); |
| 149 | } |
| 150 | } |
| 151 | |
| 152 | @Override |
| 153 | protected Tasklet createTasklet() { |
| 154 | Assert.state(reader != null, "ItemReader must be provided"); |
| 155 | Assert.state(processor != null || writer != null, "ItemWriter or ItemProcessor must be provided"); |
| 156 | RepeatOperations repeatOperations = createChunkOperations(); |
| 157 | SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<I>(reader, repeatOperations); |
| 158 | SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<I, O>(processor, writer); |
| 159 | chunkProvider.setListeners(new ArrayList<StepListener>(itemListeners)); |
| 160 | chunkProcessor.setListeners(new ArrayList<StepListener>(itemListeners)); |
| 161 | ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor); |
| 162 | tasklet.setBuffering(!readerTransactionalQueue); |
| 163 | return tasklet; |
| 164 | } |
| 165 | |
| 166 | /** |
| 167 | * Sets the chunk size or commit interval for this step. This is the maximum number of items that will be read |
| 168 | * before processing starts in a single transaction. Not compatible with {@link #completionPolicy} |
| 169 | * . |
| 170 | * |
| 171 | * @param chunkSize the chunk size (a.k.a commit interval) |
| 172 | * @return this for fluent chaining |
| 173 | */ |
| 174 | public SimpleStepBuilder<I, O> chunk(int chunkSize) { |
| 175 | Assert.state(completionPolicy == null || chunkSize == 0, |
| 176 | "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); |
| 177 | this.chunkSize = chunkSize; |
| 178 | return this; |
| 179 | } |
| 180 | |
| 181 | /** |
| 182 | * Sets a completion policy for the chunk processing. Items are read until this policy determines that a chunk is |
| 183 | * complete, giving more control than with just the {@link #chunk(int) chunk size} (or commit interval). |
| 184 | * |
| 185 | * @param completionPolicy a completion policy for the chunk |
| 186 | * @return this for fluent chaining |
| 187 | */ |
| 188 | public SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) { |
| 189 | Assert.state(chunkSize == 0 || completionPolicy == null, |
| 190 | "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); |
| 191 | this.completionPolicy = completionPolicy; |
| 192 | return this; |
| 193 | } |
| 194 | |
| 195 | /** |
| 196 | * An item reader that provides a stream of items. Will be automatically registered as a {@link #stream(ItemStream)} |
| 197 | * or listener if it implements the corresponding interface. By default assumed to be non-transactional. |
| 198 | * |
| 199 | * @see #readerTransactionalQueue |
| 200 | * @param reader an item reader |
| 201 | * @return this for fluent chaining |
| 202 | */ |
| 203 | public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) { |
| 204 | this.reader = reader; |
| 205 | return this; |
| 206 | } |
| 207 | |
| 208 | /** |
| 209 | * An item writer that writes a chunk of items. Will be automatically registered as a {@link #stream(ItemStream)} or |
| 210 | * listener if it implements the corresponding interface. |
| 211 | * |
| 212 | * @param writer an item writer |
| 213 | * @return this for fluent chaining |
| 214 | */ |
| 215 | public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) { |
| 216 | this.writer = writer; |
| 217 | return this; |
| 218 | } |
| 219 | |
| 220 | /** |
| 221 | * An item processor that processes or transforms a stream of items. Will be automatically registered as a |
| 222 | * {@link #stream(ItemStream)} or listener if it implements the corresponding interface. |
| 223 | * |
| 224 | * @param processor an item processor |
| 225 | * @return this for fluent chaining |
| 226 | */ |
| 227 | public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) { |
| 228 | this.processor = processor; |
| 229 | return this; |
| 230 | } |
| 231 | |
| 232 | /** |
| 233 | * Sets a flag to say that the reader is transactional (usually a queue), which is to say that failed items might be |
| 234 | * rolled back and re-presented in a subsequent transaction. Default is false, meaning that the items are read |
| 235 | * outside a transaction and possibly cached. |
| 236 | * |
| 237 | * @return this for fluent chaining |
| 238 | */ |
| 239 | public SimpleStepBuilder<I, O> readerIsTransactionalQueue() { |
| 240 | this.readerTransactionalQueue = true; |
| 241 | return this; |
| 242 | } |
| 243 | |
| 244 | /** |
| 245 | * Registers objects using the annotation based listener configuration. |
| 246 | * |
| 247 | * @param listener the object that has a method configured with listener annotation |
| 248 | * @return this for fluent chaining |
| 249 | */ |
| 250 | @Override |
| 251 | public SimpleStepBuilder listener(Object listener) { |
| 252 | super.listener(listener); |
| 253 | |
| 254 | Set<Method> itemListenerMethods = new HashSet<Method>(); |
| 255 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeRead.class)); |
| 256 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterRead.class)); |
| 257 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeProcess.class)); |
| 258 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterProcess.class)); |
| 259 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeWrite.class)); |
| 260 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterWrite.class)); |
| 261 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnReadError.class)); |
| 262 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnProcessError.class)); |
| 263 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnWriteError.class)); |
| 264 | |
| 265 | if(itemListenerMethods.size() > 0) { |
| 266 | StepListenerFactoryBean factory = new StepListenerFactoryBean(); |
| 267 | factory.setDelegate(listener); |
| 268 | itemListeners.add((StepListener) factory.getObject()); |
| 269 | } |
| 270 | |
| 271 | @SuppressWarnings("unchecked") |
| 272 | SimpleStepBuilder result = this; |
| 273 | return result; |
| 274 | } |
| 275 | |
| 276 | |
| 277 | /** |
| 278 | * Register an item reader listener. |
| 279 | * |
| 280 | * @param listener the listener to register |
| 281 | * @return this for fluent chaining |
| 282 | */ |
| 283 | public SimpleStepBuilder<I, O> listener(ItemReadListener<? super I> listener) { |
| 284 | itemListeners.add(listener); |
| 285 | return this; |
| 286 | } |
| 287 | |
| 288 | /** |
| 289 | * Register an item writer listener. |
| 290 | * |
| 291 | * @param listener the listener to register |
| 292 | * @return this for fluent chaining |
| 293 | */ |
| 294 | public SimpleStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) { |
| 295 | itemListeners.add(listener); |
| 296 | return this; |
| 297 | } |
| 298 | |
| 299 | /** |
| 300 | * Register an item processor listener. |
| 301 | * |
| 302 | * @param listener the listener to register |
| 303 | * @return this for fluent chaining |
| 304 | */ |
| 305 | public SimpleStepBuilder<I, O> listener(ItemProcessListener<? super I, ? super O> listener) { |
| 306 | itemListeners.add(listener); |
| 307 | return this; |
| 308 | } |
| 309 | |
| 310 | /** |
| 311 | * Instead of a {@link #chunk(int) chunk size} or {@link #chunk(CompletionPolicy) completion policy} you can provide |
| 312 | * a complete repeat operations instance that handles the iteration over the item reader. |
| 313 | * |
| 314 | * @param repeatTemplate a cmplete repeat template for the chunk |
| 315 | * @return this for fluent chaining |
| 316 | */ |
| 317 | public SimpleStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) { |
| 318 | this.chunkOperations = repeatTemplate; |
| 319 | return this; |
| 320 | } |
| 321 | |
| 322 | protected RepeatOperations createChunkOperations() { |
| 323 | RepeatOperations repeatOperations = chunkOperations; |
| 324 | if (repeatOperations == null) { |
| 325 | RepeatTemplate repeatTemplate = new RepeatTemplate(); |
| 326 | repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy()); |
| 327 | repeatOperations = repeatTemplate; |
| 328 | } |
| 329 | return repeatOperations; |
| 330 | } |
| 331 | |
| 332 | protected ItemReader<? extends I> getReader() { |
| 333 | return reader; |
| 334 | } |
| 335 | |
| 336 | protected ItemWriter<? super O> getWriter() { |
| 337 | return writer; |
| 338 | } |
| 339 | |
| 340 | protected ItemProcessor<? super I, ? extends O> getProcessor() { |
| 341 | return processor; |
| 342 | } |
| 343 | |
| 344 | protected int getChunkSize() { |
| 345 | return chunkSize; |
| 346 | } |
| 347 | |
| 348 | protected boolean isReaderTransactionalQueue() { |
| 349 | return readerTransactionalQueue; |
| 350 | } |
| 351 | |
| 352 | protected Set<StepListener> getItemListeners() { |
| 353 | return itemListeners; |
| 354 | } |
| 355 | |
| 356 | /** |
| 357 | * @return a {@link CompletionPolicy} consistent with the chunk size and injected policy (if present). |
| 358 | */ |
| 359 | private CompletionPolicy getChunkCompletionPolicy() { |
| 360 | Assert.state(!(completionPolicy != null && chunkSize > 0), |
| 361 | "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); |
| 362 | Assert.state(chunkSize >= 0, "The commitInterval must be positive or zero (for default value)."); |
| 363 | |
| 364 | if (completionPolicy != null) { |
| 365 | return completionPolicy; |
| 366 | } |
| 367 | if (chunkSize == 0) { |
| 368 | logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")"); |
| 369 | chunkSize = DEFAULT_COMMIT_INTERVAL; |
| 370 | } |
| 371 | return new SimpleCompletionPolicy(chunkSize); |
| 372 | } |
| 373 | |
| 374 | private void registerAsStreamsAndListeners(ItemReader<? extends I> itemReader, |
| 375 | ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) { |
| 376 | for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) { |
| 377 | if (itemHandler instanceof ItemStream) { |
| 378 | stream((ItemStream) itemHandler); |
| 379 | } |
| 380 | if (StepListenerFactoryBean.isListener(itemHandler)) { |
| 381 | StepListener listener = StepListenerFactoryBean.getListener(itemHandler); |
| 382 | if (listener instanceof StepExecutionListener) { |
| 383 | listener((StepExecutionListener) listener); |
| 384 | } |
| 385 | if (listener instanceof ChunkListener) { |
| 386 | listener((ChunkListener) listener); |
| 387 | } |
| 388 | if (listener instanceof ItemReadListener<?> || listener instanceof ItemProcessListener<?, ?> |
| 389 | || listener instanceof ItemWriteListener<?>) { |
| 390 | itemListeners.add(listener); |
| 391 | } |
| 392 | } |
| 393 | } |
| 394 | } |
| 395 | |
| 396 | } |