View Javadoc
1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.integration.chunk;
18  
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.List;
22  import java.util.Queue;
23  import java.util.concurrent.LinkedBlockingQueue;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.springframework.batch.core.BatchStatus;
29  import org.springframework.batch.core.ExitStatus;
30  import org.springframework.batch.core.StepContribution;
31  import org.springframework.batch.core.StepExecution;
32  import org.springframework.batch.core.listener.StepExecutionListenerSupport;
33  import org.springframework.batch.item.ExecutionContext;
34  import org.springframework.batch.item.ItemStream;
35  import org.springframework.batch.item.ItemStreamException;
36  import org.springframework.batch.item.ItemWriter;
37  import org.springframework.integration.Message;
38  import org.springframework.integration.core.MessagingOperations;
39  import org.springframework.integration.core.PollableChannel;
40  import org.springframework.integration.message.GenericMessage;
41  import org.springframework.util.Assert;
42  
43  public class ChunkMessageChannelItemWriter<T> extends StepExecutionListenerSupport implements ItemWriter<T>,
44  		ItemStream, StepContributionSource {
45  
46  	private static final Log logger = LogFactory.getLog(ChunkMessageChannelItemWriter.class);
47  
48  	static final String ACTUAL = ChunkMessageChannelItemWriter.class.getName() + ".ACTUAL";
49  
50  	static final String EXPECTED = ChunkMessageChannelItemWriter.class.getName() + ".EXPECTED";
51  
52  	private static final long DEFAULT_THROTTLE_LIMIT = 6;
53  
54  	private MessagingOperations messagingGateway;
55  
56  	private LocalState localState = new LocalState();
57  
58  	private long throttleLimit = DEFAULT_THROTTLE_LIMIT;
59  
60  	private int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
61  
62  	private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS;
63  
64  	private PollableChannel replyChannel;
65  
66  	/**
67  	 * The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a
68  	 * multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing
69  	 * slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40.
70  	 * 
71  	 * @param maxWaitTimeouts the maximum number of wait timeouts
72  	 */
73  	public void setMaxWaitTimeouts(int maxWaitTimeouts) {
74  		this.maxWaitTimeouts = maxWaitTimeouts;
75  	}
76  
77  	/**
78  	 * Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid
79  	 * overwhelming the receivers.
80  	 * @param throttleLimit the throttle limit to set
81  	 */
82  	public void setThrottleLimit(long throttleLimit) {
83  		this.throttleLimit = throttleLimit;
84  	}
85  
86  	public void setMessagingOperations(MessagingOperations messagingGateway) {
87  		this.messagingGateway = messagingGateway;
88  	}
89  
90  	public void setReplyChannel(PollableChannel replyChannel) {
91  		this.replyChannel = replyChannel;
92  	}
93  
94  	public void write(List<? extends T> items) throws Exception {
95  
96  		// Block until expecting <= throttle limit
97  		while (localState.getExpecting() > throttleLimit) {
98  			getNextResult();
99  		}
100 
101 		if (!items.isEmpty()) {
102 
103 			ChunkRequest<T> request = localState.getRequest(items);
104 			if (logger.isDebugEnabled()) {
105 				logger.debug("Dispatching chunk: " + request);
106 			}
107 			messagingGateway.send(new GenericMessage<ChunkRequest<T>>(request));
108 			localState.incrementExpected();
109 
110 		}
111 
112 	}
113 
114 	@Override
115 	public void beforeStep(StepExecution stepExecution) {
116 		localState.setStepExecution(stepExecution);
117 	}
118 
119 	@Override
120 	public ExitStatus afterStep(StepExecution stepExecution) {
121 		if (!(stepExecution.getStatus() == BatchStatus.COMPLETED)) {
122 			return ExitStatus.EXECUTING;
123 		}
124 		long expecting = localState.getExpecting();
125 		boolean timedOut;
126 		try {
127 			logger.debug("Waiting for results in step listener...");
128 			timedOut = !waitForResults();
129 			logger.debug("Finished waiting for results in step listener.");
130 		}
131 		catch (RuntimeException e) {
132 			logger.debug("Detected failure waiting for results in step listener.", e);
133 			stepExecution.setStatus(BatchStatus.FAILED);
134 			return ExitStatus.FAILED.addExitDescription(e.getClass().getName() + ": " + e.getMessage());
135 		}
136 		finally {
137 
138 			if (logger.isDebugEnabled()) {
139 				logger.debug("Finished waiting for results in step listener.  Still expecting: "
140 						+ localState.getExpecting());
141 			}
142 
143 			for (StepContribution contribution : getStepContributions()) {
144 				stepExecution.apply(contribution);
145 			}
146 		}
147 		if (timedOut) {
148 			stepExecution.setStatus(BatchStatus.FAILED);
149 			return ExitStatus.FAILED.addExitDescription("Timed out waiting for " + localState.getExpecting()
150 					+ " backlog at end of step");
151 		}
152 		return ExitStatus.COMPLETED.addExitDescription("Waited for " + expecting + " results.");
153 	}
154 
155 	public void close() throws ItemStreamException {
156 		localState.reset();
157 	}
158 
159 	public void open(ExecutionContext executionContext) throws ItemStreamException {
160 		if (executionContext.containsKey(EXPECTED)) {
161 			localState.open(executionContext.getInt(EXPECTED), executionContext.getInt(ACTUAL));
162 			if (!waitForResults()) {
163 				throw new ItemStreamException("Timed out waiting for back log on open");
164 			}
165 		}
166 	}
167 
168 	public void update(ExecutionContext executionContext) throws ItemStreamException {
169 		executionContext.putInt(EXPECTED, localState.expected.intValue());
170 		executionContext.putInt(ACTUAL, localState.actual.intValue());
171 	}
172 
173 	public Collection<StepContribution> getStepContributions() {
174 		List<StepContribution> contributions = new ArrayList<StepContribution>();
175 		for (ChunkResponse response : localState.pollChunkResponses()) {
176 			StepContribution contribution = response.getStepContribution();
177 			if (logger.isDebugEnabled()) {
178 				logger.debug("Applying: " + response);
179 			}
180 			contributions.add(contribution);
181 		}
182 		return contributions;
183 	}
184 
185 	/**
186 	 * Wait until all the results that are in the pipeline come back to the reply channel.
187 	 * 
188 	 * @return true if successfully received a result, false if timed out
189 	 */
190 	private boolean waitForResults() throws AsynchronousFailureException {
191 		int count = 0;
192 		int maxCount = maxWaitTimeouts;
193 		Throwable failure = null;
194 		logger.info("Waiting for " + localState.getExpecting() + " results");
195 		while (localState.getExpecting() > 0 && count++ < maxCount) {
196 			try {
197 				getNextResult();
198 			}
199 			catch (Throwable t) {
200 				logger.error("Detected error in remote result. Trying to recover " + localState.getExpecting()
201 						+ " outstanding results before completing.", t);
202 				failure = t;
203 			}
204 		}
205 		if (failure != null) {
206 			throw wrapIfNecessary(failure);
207 		}
208 		return count < maxCount;
209 	}
210 
211 	/**
212 	 * Get the next result if it is available (within the timeout specified in the gateway), otherwise do nothing.
213 	 * 
214 	 * @throws AsynchronousFailureException If there is a response and it contains a failed chunk response.
215 	 * 
216 	 * @throws IllegalStateException if the result contains the wrong job instance id (maybe we are sharing a channel
217 	 * and we shouldn't be)
218 	 */
219 	private void getNextResult() throws AsynchronousFailureException {
220 		Message<ChunkResponse> message = messagingGateway.receive(replyChannel);
221 		if (message != null) {
222 			ChunkResponse payload = message.getPayload();
223 			if (logger.isDebugEnabled()) {
224 				logger.debug("Found result: " + payload);
225 			}
226 			Long jobInstanceId = payload.getJobId();
227 			Assert.state(jobInstanceId != null, "Message did not contain job instance id.");
228 			Assert.state(jobInstanceId.equals(localState.getJobId()), "Message contained wrong job instance id ["
229 					+ jobInstanceId + "] should have been [" + localState.getJobId() + "].");
230 			if (payload.isRedelivered()) {
231 				logger
232 						.warn("Redelivered result detected, which may indicate stale state. In the best case, we just picked up a timed out message "
233 								+ "from a previous failed execution. In the worst case (and if this is not a restart), "
234 								+ "the step may now timeout.  In that case if you believe that all messages "
235 								+ "from workers have been sent, the business state "
236 								+ "is probably inconsistent, and the step will fail.");
237 				localState.incrementRedelivered();
238 			}
239 			localState.pushResponse(payload);
240 			localState.incrementActual();
241 			if (!payload.isSuccessful()) {
242 				throw new AsynchronousFailureException("Failure or interrupt detected in handler: "
243 						+ payload.getMessage());
244 			}
245 		}
246 	}
247 
248 	/**
249 	 * Re-throws the original throwable if it is unchecked, wraps checked exceptions into
250 	 * {@link AsynchronousFailureException}.
251 	 */
252 	private static AsynchronousFailureException wrapIfNecessary(Throwable throwable) {
253 		if (throwable instanceof Error) {
254 			throw (Error) throwable;
255 		}
256 		else if (throwable instanceof AsynchronousFailureException) {
257 			return (AsynchronousFailureException) throwable;
258 		}
259 		else {
260 			return new AsynchronousFailureException("Exception in remote process", throwable);
261 		}
262 	}
263 
264 	private static class LocalState {
265 
266 		private AtomicInteger current = new AtomicInteger(-1);
267 
268 		private AtomicInteger actual = new AtomicInteger();
269 
270 		private AtomicInteger expected = new AtomicInteger();
271 
272 		private AtomicInteger redelivered = new AtomicInteger();
273 
274 		private StepExecution stepExecution;
275 
276 		private Queue<ChunkResponse> contributions = new LinkedBlockingQueue<ChunkResponse>();
277 
278 		public int getExpecting() {
279 			return expected.get() - actual.get();
280 		}
281 
282 		public <T> ChunkRequest<T> getRequest(List<? extends T> items) {
283 			return new ChunkRequest<T>(current.incrementAndGet(), items, getJobId(), createStepContribution());
284 		}
285 
286 		public void open(int expectedValue, int actualValue) {
287 			actual.set(actualValue);
288 			expected.set(expectedValue);
289 		}
290 
291 		public Collection<ChunkResponse> pollChunkResponses() {
292 			Collection<ChunkResponse> set = new ArrayList<ChunkResponse>();
293 			synchronized (contributions) {
294 				ChunkResponse item = contributions.poll();
295 				while (item != null) {
296 					set.add(item);
297 					item = contributions.poll();
298 				}
299 			}
300 			return set;
301 		}
302 
303 		public void pushResponse(ChunkResponse stepContribution) {
304 			synchronized (contributions) {
305 				contributions.add(stepContribution);
306 			}
307 		}
308 
309 		public void incrementRedelivered() {
310 			redelivered.incrementAndGet();
311 		}
312 
313 		public void incrementActual() {
314 			actual.incrementAndGet();
315 		}
316 
317 		public void incrementExpected() {
318 			expected.incrementAndGet();
319 		}
320 
321 		public StepContribution createStepContribution() {
322 			return stepExecution.createStepContribution();
323 		}
324 
325 		public Long getJobId() {
326 			return stepExecution.getJobExecution().getJobId();
327 		}
328 
329 		public void setStepExecution(StepExecution stepExecution) {
330 			this.stepExecution = stepExecution;
331 		}
332 
333 		public void reset() {
334 			expected.set(0);
335 			actual.set(0);
336 		}
337 	}
338 
339 }