Class ChunkMessageChannelItemWriter<T>
java.lang.Object
org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter<T>
- All Implemented Interfaces:
StepExecutionListener
,StepListener
,StepContributionSource
,ItemStream
,ItemWriter<T>
public class ChunkMessageChannelItemWriter<T>
extends Object
implements StepExecutionListener, ItemWriter<T>, ItemStream, StepContributionSource
-
Nested Class Summary
-
Field Summary
Modifier and TypeFieldDescriptionprotected final int
protected static final long
protected final ChunkMessageChannelItemWriter.LocalState
protected int
protected org.springframework.integration.core.MessagingTemplate
protected org.springframework.messaging.PollableChannel
protected long
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionafterStep
(StepExecution stepExecution) Give a listener a chance to modify the exit status from a step.void
beforeStep
(StepExecution stepExecution) Initialize the state of the listener with theStepExecution
from the current scope.void
close()
If any resources are needed for the stream to operate they need to be destroyed here.protected void
Get the next result if it is available (within the timeout specified in the gateway), otherwise do nothing.Get the currently available contributions and drain the source.void
open
(ExecutionContext executionContext) Open the stream for the providedExecutionContext
.void
setMaxWaitTimeouts
(int maxWaitTimeouts) The maximum number of times to wait at the end of a step for a non-null result from the remote workers.void
setMessagingOperations
(org.springframework.integration.core.MessagingTemplate messagingGateway) void
setReplyChannel
(org.springframework.messaging.PollableChannel replyChannel) void
setThrottleLimit
(long throttleLimit) Public setter for the throttle limit.void
update
(ExecutionContext executionContext) Indicates that the execution context provided during open is about to be saved.protected boolean
Wait until all the results that are in the pipeline come back to the reply channel.protected static AsynchronousFailureException
wrapIfNecessary
(Throwable throwable) Re-throws the original throwable if it is unchecked, wraps checked exceptions intoAsynchronousFailureException
.void
Process the supplied data element.
-
Field Details
-
DEFAULT_THROTTLE_LIMIT
protected static final long DEFAULT_THROTTLE_LIMIT- See Also:
-
messagingGateway
protected org.springframework.integration.core.MessagingTemplate messagingGateway -
localState
-
throttleLimit
protected long throttleLimit -
DEFAULT_MAX_WAIT_TIMEOUTS
protected final int DEFAULT_MAX_WAIT_TIMEOUTS- See Also:
-
maxWaitTimeouts
protected int maxWaitTimeouts -
replyChannel
protected org.springframework.messaging.PollableChannel replyChannel
-
-
Constructor Details
-
ChunkMessageChannelItemWriter
public ChunkMessageChannelItemWriter()
-
-
Method Details
-
setMaxWaitTimeouts
public void setMaxWaitTimeouts(int maxWaitTimeouts) The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40.- Parameters:
maxWaitTimeouts
- the maximum number of wait timeouts
-
setThrottleLimit
public void setThrottleLimit(long throttleLimit) Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid overwhelming the receivers.- Parameters:
throttleLimit
- the throttle limit to set
-
setMessagingOperations
public void setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway) -
setReplyChannel
public void setReplyChannel(org.springframework.messaging.PollableChannel replyChannel) -
write
Description copied from interface:ItemWriter
Process the supplied data element. Will not be called with any null items in normal operation.- Specified by:
write
in interfaceItemWriter<T>
- Parameters:
items
- of items to be written. Must not benull
.- Throws:
Exception
- if there are errors. The framework will catch the exception and convert or rethrow it as appropriate.
-
beforeStep
Description copied from interface:StepExecutionListener
Initialize the state of the listener with theStepExecution
from the current scope.- Specified by:
beforeStep
in interfaceStepExecutionListener
- Parameters:
stepExecution
- instance ofStepExecution
.
-
afterStep
Description copied from interface:StepExecutionListener
Give a listener a chance to modify the exit status from a step. The value returned is combined with the normal exit status by usingExitStatus.and(ExitStatus)
.Called after execution of the step's processing logic (whether successful or failed). Throwing an exception in this method has no effect, as it is only logged.
- Specified by:
afterStep
in interfaceStepExecutionListener
- Parameters:
stepExecution
- aStepExecution
instance.- Returns:
- an
ExitStatus
to combine with the normal value. Returnnull
(the default) to leave the old value unchanged.
-
close
Description copied from interface:ItemStream
If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been called all other methods (except open) may throw an exception.- Specified by:
close
in interfaceItemStream
- Throws:
ItemStreamException
-
open
Description copied from interface:ItemStream
Open the stream for the providedExecutionContext
.- Specified by:
open
in interfaceItemStream
- Parameters:
executionContext
- current step'sExecutionContext
. Will be the executionContext from the last run of the step on a restart.- Throws:
ItemStreamException
-
update
Description copied from interface:ItemStream
Indicates that the execution context provided during open is about to be saved. If any state is remaining, but has not been put in the context, it should be added here.- Specified by:
update
in interfaceItemStream
- Parameters:
executionContext
- to be updated- Throws:
ItemStreamException
-
getStepContributions
Description copied from interface:StepContributionSource
Get the currently available contributions and drain the source. The next call would return an empty collection, unless new contributions have arrived.- Specified by:
getStepContributions
in interfaceStepContributionSource
- Returns:
- a collection of
StepContribution
instances
-
waitForResults
Wait until all the results that are in the pipeline come back to the reply channel.- Returns:
- true if successfully received a result, false if timed out
- Throws:
AsynchronousFailureException
-
getNextResult
Get the next result if it is available (within the timeout specified in the gateway), otherwise do nothing.- Throws:
AsynchronousFailureException
- If there is a response and it contains a failed chunk response.IllegalStateException
- if the result contains the wrong job instance id (maybe we are sharing a channel and we shouldn't be)
-
wrapIfNecessary
Re-throws the original throwable if it is unchecked, wraps checked exceptions intoAsynchronousFailureException
.
-