View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.scope.context;
17  
18  import java.util.Queue;
19  import java.util.concurrent.LinkedBlockingQueue;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.springframework.batch.core.Step;
24  import org.springframework.batch.core.StepExecution;
25  import org.springframework.batch.repeat.RepeatCallback;
26  import org.springframework.batch.repeat.RepeatContext;
27  import org.springframework.batch.repeat.RepeatStatus;
28  import org.springframework.util.ObjectUtils;
29  
30  /**
31   * Convenient base class for clients who need to do something in a repeat
32   * callback inside a {@link Step}.
33   *
34   * @author Dave Syer
35   *
36   */
37  public abstract class StepContextRepeatCallback implements RepeatCallback {
38  
39  	private final Queue<ChunkContext> attributeQueue = new LinkedBlockingQueue<ChunkContext>();
40  
41  	private final StepExecution stepExecution;
42  
43  	private final Log logger = LogFactory.getLog(StepContextRepeatCallback.class);
44  
45  	/**
46  	 * @param stepExecution
47  	 */
48  	public StepContextRepeatCallback(StepExecution stepExecution) {
49  		this.stepExecution = stepExecution;
50  	}
51  
52  	/**
53  	 * Manage the {@link StepContext} lifecycle. Business processing should be
54  	 * delegated to {@link #doInChunkContext(RepeatContext, ChunkContext)}. This
55  	 * is to ensure that the current thread has a reference to the context, even
56  	 * if the callback is executed in a pooled thread. Handles the registration
57  	 * and de-registration of the step context, so clients should not duplicate
58  	 * those calls.
59  	 *
60  	 * @see RepeatCallback#doInIteration(RepeatContext)
61  	 */
62  	@Override
63  	public RepeatStatus doInIteration(RepeatContext context) throws Exception {
64  
65  		// The StepContext has to be the same for all chunks,
66  		// otherwise step-scoped beans will be re-initialised for each chunk.
67  		StepContext stepContext = StepSynchronizationManager.register(stepExecution);
68  		logger.debug("Preparing chunk execution for StepContext: "+ObjectUtils.identityToString(stepContext));
69  
70  		ChunkContext chunkContext = attributeQueue.poll();
71  		if (chunkContext == null) {
72  			chunkContext = new ChunkContext(stepContext);
73  		}
74  
75  		try {
76  			logger.debug("Chunk execution starting: queue size="+attributeQueue.size());
77  			return doInChunkContext(context, chunkContext);
78  		}
79  		finally {
80  			// Still some stuff to do with the data in this chunk,
81  			// pass it back.
82  			if (!chunkContext.isComplete()) {
83  				attributeQueue.add(chunkContext);
84  			}
85  			StepSynchronizationManager.close();
86  		}
87  	}
88  
89  	/**
90  	 * Do the work required for this chunk of the step. The {@link ChunkContext}
91  	 * provided is managed by the base class, so that if there is still work to
92  	 * do for the task in hand state can be stored here. In a multi-threaded
93  	 * client, the base class ensures that only one thread at a time can be
94  	 * working on each instance of {@link ChunkContext}. Workers should signal
95  	 * that they are finished with a context by removing all the attributes they
96  	 * have added. If a worker does not remove them another thread might see
97  	 * stale state.
98  	 *
99  	 * @param context the current {@link RepeatContext}
100 	 * @param chunkContext the chunk context in which to carry out the work
101 	 * @return the repeat status from the execution
102 	 * @throws Exception implementations can throw an exception if anything goes
103 	 * wrong
104 	 */
105 	public abstract RepeatStatus doInChunkContext(RepeatContext context, ChunkContext chunkContext) throws Exception;
106 
107 }