1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.scope.context; |
17 | |
18 | import java.util.Queue; |
19 | import java.util.concurrent.LinkedBlockingQueue; |
20 | |
21 | import org.apache.commons.logging.Log; |
22 | import org.apache.commons.logging.LogFactory; |
23 | import org.springframework.batch.core.Step; |
24 | import org.springframework.batch.core.StepExecution; |
25 | import org.springframework.batch.repeat.RepeatCallback; |
26 | import org.springframework.batch.repeat.RepeatContext; |
27 | import org.springframework.batch.repeat.RepeatStatus; |
28 | import org.springframework.util.ObjectUtils; |
29 | |
30 | /** |
31 | * Convenient base class for clients who need to do something in a repeat |
32 | * callback inside a {@link Step}. |
33 | * |
34 | * @author Dave Syer |
35 | * |
36 | */ |
37 | public abstract class StepContextRepeatCallback implements RepeatCallback { |
38 | |
39 | private final Queue<ChunkContext> attributeQueue = new LinkedBlockingQueue<ChunkContext>(); |
40 | |
41 | private final StepExecution stepExecution; |
42 | |
43 | private final Log logger = LogFactory.getLog(StepContextRepeatCallback.class); |
44 | |
45 | /** |
46 | * @param stepExecution |
47 | */ |
48 | public StepContextRepeatCallback(StepExecution stepExecution) { |
49 | this.stepExecution = stepExecution; |
50 | } |
51 | |
52 | /** |
53 | * Manage the {@link StepContext} lifecycle. Business processing should be |
54 | * delegated to {@link #doInChunkContext(RepeatContext, ChunkContext)}. This |
55 | * is to ensure that the current thread has a reference to the context, even |
56 | * if the callback is executed in a pooled thread. Handles the registration |
57 | * and de-registration of the step context, so clients should not duplicate |
58 | * those calls. |
59 | * |
60 | * @see RepeatCallback#doInIteration(RepeatContext) |
61 | */ |
62 | @Override |
63 | public RepeatStatus doInIteration(RepeatContext context) throws Exception { |
64 | |
65 | // The StepContext has to be the same for all chunks, |
66 | // otherwise step-scoped beans will be re-initialised for each chunk. |
67 | StepContext stepContext = StepSynchronizationManager.register(stepExecution); |
68 | logger.debug("Preparing chunk execution for StepContext: "+ObjectUtils.identityToString(stepContext)); |
69 | |
70 | ChunkContext chunkContext = attributeQueue.poll(); |
71 | if (chunkContext == null) { |
72 | chunkContext = new ChunkContext(stepContext); |
73 | } |
74 | |
75 | try { |
76 | logger.debug("Chunk execution starting: queue size="+attributeQueue.size()); |
77 | return doInChunkContext(context, chunkContext); |
78 | } |
79 | finally { |
80 | // Still some stuff to do with the data in this chunk, |
81 | // pass it back. |
82 | if (!chunkContext.isComplete()) { |
83 | attributeQueue.add(chunkContext); |
84 | } |
85 | StepSynchronizationManager.close(); |
86 | } |
87 | } |
88 | |
89 | /** |
90 | * Do the work required for this chunk of the step. The {@link ChunkContext} |
91 | * provided is managed by the base class, so that if there is still work to |
92 | * do for the task in hand state can be stored here. In a multi-threaded |
93 | * client, the base class ensures that only one thread at a time can be |
94 | * working on each instance of {@link ChunkContext}. Workers should signal |
95 | * that they are finished with a context by removing all the attributes they |
96 | * have added. If a worker does not remove them another thread might see |
97 | * stale state. |
98 | * |
99 | * @param context the current {@link RepeatContext} |
100 | * @param chunkContext the chunk context in which to carry out the work |
101 | * @return the repeat status from the execution |
102 | * @throws Exception implementations can throw an exception if anything goes |
103 | * wrong |
104 | */ |
105 | public abstract RepeatStatus doInChunkContext(RepeatContext context, ChunkContext chunkContext) throws Exception; |
106 | |
107 | } |