1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.batch.integration.chunk;
18
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Queue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.springframework.batch.core.BatchStatus;
29 import org.springframework.batch.core.ExitStatus;
30 import org.springframework.batch.core.StepContribution;
31 import org.springframework.batch.core.StepExecution;
32 import org.springframework.batch.core.listener.StepExecutionListenerSupport;
33 import org.springframework.batch.item.ExecutionContext;
34 import org.springframework.batch.item.ItemStream;
35 import org.springframework.batch.item.ItemStreamException;
36 import org.springframework.batch.item.ItemWriter;
37 import org.springframework.integration.Message;
38 import org.springframework.integration.core.MessagingOperations;
39 import org.springframework.integration.core.PollableChannel;
40 import org.springframework.integration.message.GenericMessage;
41 import org.springframework.util.Assert;
42
43 public class ChunkMessageChannelItemWriter<T> extends StepExecutionListenerSupport implements ItemWriter<T>,
44 ItemStream, StepContributionSource {
45
46 private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class);
47
48 static final String ACTUAL = ChunkMessageChannelItemWriter.class.getName() + ".ACTUAL";
49
50 static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED";
51
52 private static final long DEFAULT_THROTTLE_LIMIT = 6;
53
54 private MessagingOperations messagingGateway;
55
56 private LocalState localState = new LocalState();
57
58 private long throttleLimit = DEFAULT_THROTTLE_LIMIT;
59
60 private int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
61
62 private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS;
63
64 private PollableChannel replyChannel;
65
66
67
68
69
70
71
72
73 public void setMaxWaitTimeouts(int maxWaitTimeouts) {
74 this.maxWaitTimeouts = maxWaitTimeouts;
75 }
76
77
78
79
80
81
82 public void setThrottleLimit(long throttleLimit) {
83 this.throttleLimit = throttleLimit;
84 }
85
86 public void setMessagingOperations(MessagingOperations messagingGateway) {
87 this.messagingGateway = messagingGateway;
88 }
89
90 public void setReplyChannel(PollableChannel replyChannel) {
91 this.replyChannel = replyChannel;
92 }
93
94 public void write(List<? extends T> items) throws Exception {
95
96
97 while (localState.getExpecting() > throttleLimit) {
98 getNextResult();
99 }
100
101 if (!items.isEmpty()) {
102
103 ChunkRequest<T> request = localState.getRequest(items);
104 if (logger.isDebugEnabled()) {
105 logger.debug("Dispatching chunk: " + request);
106 }
107 messagingGateway.send(new GenericMessage<ChunkRequest<T>>(request));
108 localState.incrementExpected();
109
110 }
111
112 }
113
114 @Override
115 public void beforeStep(StepExecution stepExecution) {
116 localState.setStepExecution(stepExecution);
117 }
118
119 @Override
120 public ExitStatus afterStep(StepExecution stepExecution) {
121 if (!(stepExecution.getStatus() == BatchStatus.COMPLETED)) {
122 return ExitStatus.EXECUTING;
123 }
124 long expecting = localState.getExpecting();
125 boolean timedOut;
126 try {
127 logger.debug("Waiting for results in step listener...");
128 timedOut = !waitForResults();
129 logger.debug("Finished waiting for results in step listener.");
130 }
131 catch (RuntimeException e) {
132 logger.debug("Detected failure waiting for results in step listener.", e);
133 stepExecution.setStatus(BatchStatus.FAILED);
134 return ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage());
135 }
136 finally {
137
138 if (logger.isDebugEnabled()) {
139 logger.debug("Finished waiting for results in step listener. Still expecting: "
140 + localState.getExpecting());
141 }
142
143 for (StepContribution contribution : getStepContributions()) {
144 stepExecution.apply(contribution);
145 }
146 }
147 if (timedOut) {
148 stepExecution.setStatus(BatchStatus.FAILED);
149 return ExitStatus.FAILED.addExitDescription("Timed out waiting for " + localState.getExpecting()
150 + " backlog at end of step");
151 }
152 return ExitStatus.COMPLETED.addExitDescription("Waited for " + expecting + " results.");
153 }
154
155 public void close() throws ItemStreamException {
156 localState.reset();
157 }
158
159 public void open(ExecutionContext executionContext) throws ItemStreamException {
160 if (executionContext.containsKey(EXPECTED)) {
161 localState.open(executionContext.getInt(EXPECTED), executionContext.getInt(ACTUAL));
162 if (!waitForResults()) {
163 throw new ItemStreamException("Timed out waiting for back log on open");
164 }
165 }
166 }
167
168 public void update(ExecutionContext executionContext) throws ItemStreamException {
169 executionContext.putInt(EXPECTED, localState.expected.intValue());
170 executionContext.putInt(ACTUAL, localState.actual.intValue());
171 }
172
173 public Collection<StepContribution> getStepContributions() {
174 List<StepContribution> contributions = new ArrayList<StepContribution>();
175 for (ChunkResponse response : localState.pollChunkResponses()) {
176 StepContribution contribution = response.getStepContribution();
177 if (logger.isDebugEnabled()) {
178 logger.debug("Applying: " + response);
179 }
180 contributions.add(contribution);
181 }
182 return contributions;
183 }
184
185
186
187
188
189
190 private boolean waitForResults() throws AsynchronousFailureException {
191 int count = 0;
192 int maxCount = maxWaitTimeouts;
193 Throwable failure = null;
194 logger.info("Waiting for " + localState.getExpecting() + " results");
195 while (localState.getExpecting() > 0 && count++ < maxCount) {
196 try {
197 getNextResult();
198 }
199 catch (Throwable t) {
200 logger.error("Detected error in remote result. Trying to recover " + localState.getExpecting()
201 + " outstanding results before completing.", t);
202 failure = t;
203 }
204 }
205 if (failure != null) {
206 throw wrapIfNecessary(failure);
207 }
208 return count < maxCount;
209 }
210
211
212
213
214
215
216
217
218
219 private void getNextResult() throws AsynchronousFailureException {
220 Message<ChunkResponse> message = messagingGateway.receive(replyChannel);
221 if (message != null) {
222 ChunkResponse payload = message.getPayload();
223 if (logger.isDebugEnabled()) {
224 logger.debug("Found result: " + payload);
225 }
226 Long jobInstanceId = payload.getJobId();
227 Assert.state(jobInstanceId != null, "Message did not contain job instance id.");
228 Assert.state(jobInstanceId.equals(localState.getJobId()), "Message contained wrong job instance id ["
229 + jobInstanceId + "] should have been [" + localState.getJobId() + "].");
230 if (payload.isRedelivered()) {
231 logger
232 .warn("Redelivered result detected, which may indicate stale state. In the best case, we just picked up a timed out message "
233 + "from a previous failed execution. In the worst case (and if this is not a restart), "
234 + "the step may now timeout. In that case if you believe that all messages "
235 + "from workers have been sent, the business state "
236 + "is probably inconsistent, and the step will fail.");
237 localState.incrementRedelivered();
238 }
239 localState.pushResponse(payload);
240 localState.incrementActual();
241 if (!payload.isSuccessful()) {
242 throw new AsynchronousFailureException("Failure or interrupt detected in handler: "
243 + payload.getMessage());
244 }
245 }
246 }
247
248
249
250
251
252 private static AsynchronousFailureException wrapIfNecessary(Throwable throwable) {
253 if (throwable instanceof Error) {
254 throw (Error) throwable;
255 }
256 else if (throwable instanceof AsynchronousFailureException) {
257 return (AsynchronousFailureException) throwable;
258 }
259 else {
260 return new AsynchronousFailureException("Exception in remote process", throwable);
261 }
262 }
263
264 private static class LocalState {
265
266 private AtomicInteger current = new AtomicInteger(-1);
267
268 private AtomicInteger actual = new AtomicInteger();
269
270 private AtomicInteger expected = new AtomicInteger();
271
272 private AtomicInteger redelivered = new AtomicInteger();
273
274 private StepExecution stepExecution;
275
276 private Queue<ChunkResponse> contributions = new LinkedBlockingQueue<ChunkResponse>();
277
278 public int getExpecting() {
279 return expected.get() - actual.get();
280 }
281
282 public <T> ChunkRequest<T> getRequest(List<? extends T> items) {
283 return new ChunkRequest<T>(current.incrementAndGet(), items, getJobId(), createStepContribution());
284 }
285
286 public void open(int expectedValue, int actualValue) {
287 actual.set(actualValue);
288 expected.set(expectedValue);
289 }
290
291 public Collection<ChunkResponse> pollChunkResponses() {
292 Collection<ChunkResponse> set = new ArrayList<ChunkResponse>();
293 synchronized (contributions) {
294 ChunkResponse item = contributions.poll();
295 while (item != null) {
296 set.add(item);
297 item = contributions.poll();
298 }
299 }
300 return set;
301 }
302
303 public void pushResponse(ChunkResponse stepContribution) {
304 synchronized (contributions) {
305 contributions.add(stepContribution);
306 }
307 }
308
309 public void incrementRedelivered() {
310 redelivered.incrementAndGet();
311 }
312
313 public void incrementActual() {
314 actual.incrementAndGet();
315 }
316
317 public void incrementExpected() {
318 expected.incrementAndGet();
319 }
320
321 public StepContribution createStepContribution() {
322 return stepExecution.createStepContribution();
323 }
324
325 public Long getJobId() {
326 return stepExecution.getJobExecution().getJobId();
327 }
328
329 public void setStepExecution(StepExecution stepExecution) {
330 this.stepExecution = stepExecution;
331 }
332
333 public void reset() {
334 expected.set(0);
335 actual.set(0);
336 }
337 }
338
339 }