Class MessageChannelPartitionHandler
java.lang.Object
org.springframework.batch.integration.partition.MessageChannelPartitionHandler
- All Implemented Interfaces:
PartitionHandler
,org.springframework.beans.factory.InitializingBean
@MessageEndpoint
public class MessageChannelPartitionHandler
extends Object
implements PartitionHandler, org.springframework.beans.factory.InitializingBean
A
PartitionHandler
that uses MessageChannel
instances to send instructions to remote workers and
receive their responses. The MessageChannel
provides a nice abstraction so that the location of the workers
and the transport used to communicate with them can be changed at run time. The communication with the remote workers
does not need to be transactional or have guaranteed delivery, so a local thread pool based implementation works as
well as a remote web service or JMS implementation. If a remote worker fails, the job will fail and can be restarted
to pick up missing messages and processing. The remote workers need access to the Spring Batch JobRepository
so that the shared state across those restarts can be managed centrally.
While a MessageChannel
is used for sending the requests to the workers, the
worker's responses can be obtained in one of two ways:
- A reply channel - Workers will respond with messages that will be aggregated via this component.
- Polling the job repository - Since the state of each worker is maintained independently within the job repository, we can poll the store to determine the state without the need of the workers to formally respond.
- Author:
- Dave Syer, Will Schipp, Michael Minella, Mahmoud Ben Hassine
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
List<?>
handle
(StepExecutionSplitter stepExecutionSplitter, StepExecution managerStepExecution) SendsStepExecutionRequest
objects to the request channel of theMessagingTemplate
, and then receives the result back as a list ofStepExecution
on a reply channel.void
setDataSource
(DataSource dataSource) DataSource
pointing to the job repositoryvoid
setGridSize
(int gridSize) Passed to theStepExecutionSplitter
in thehandle(StepExecutionSplitter, StepExecution)
method, instructing it how manyStepExecution
instances are required, ideally.void
setJobExplorer
(JobExplorer jobExplorer) JobExplorer
to use to query the job repository.void
setMessagingOperations
(org.springframework.integration.core.MessagingTemplate messagingGateway) A pre-configured gateway for sending and receiving messages to the remote workers.void
setPollInterval
(long pollInterval) How often to poll the job repository for the status of the workers.void
setReplyChannel
(org.springframework.messaging.PollableChannel replyChannel) void
setStepName
(String stepName) The name of theStep
that will be used to execute the partitionedStepExecution
.void
setTimeout
(long timeout) When using job repository polling, the time limit to wait.
-
Constructor Details
-
MessageChannelPartitionHandler
public MessageChannelPartitionHandler()
-
-
Method Details
-
afterPropertiesSet
- Specified by:
afterPropertiesSet
in interfaceorg.springframework.beans.factory.InitializingBean
- Throws:
Exception
-
setTimeout
public void setTimeout(long timeout) When using job repository polling, the time limit to wait.- Parameters:
timeout
- milliseconds to wait, defaults to -1 (no timeout).
-
setJobExplorer
JobExplorer
to use to query the job repository. Either this or aDataSource
is required when using job repository polling.- Parameters:
jobExplorer
-JobExplorer
to use for lookups
-
setPollInterval
public void setPollInterval(long pollInterval) How often to poll the job repository for the status of the workers.- Parameters:
pollInterval
- milliseconds between polls, defaults to 10000 (10 seconds).
-
setDataSource
DataSource
pointing to the job repository- Parameters:
dataSource
-DataSource
that points to the job repository's store
-
setMessagingOperations
public void setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway) A pre-configured gateway for sending and receiving messages to the remote workers. Using this property allows a large degree of control over the timeouts and other properties of the send. It should have channels set up internally:- request channel capable of accepting
StepExecutionRequest
payloads - reply
channel that returns a list of
StepExecution
results
- Parameters:
messagingGateway
- theMessagingTemplate
to set
- request channel capable of accepting
-
setGridSize
public void setGridSize(int gridSize) Passed to theStepExecutionSplitter
in thehandle(StepExecutionSplitter, StepExecution)
method, instructing it how manyStepExecution
instances are required, ideally. TheStepExecutionSplitter
is allowed to ignore the grid size in the case of a restart, since the input data partitions must be preserved.- Parameters:
gridSize
- the number of step executions that will be created
-
setStepName
The name of theStep
that will be used to execute the partitionedStepExecution
. This is a regular Spring Batch step, with all the business logic required to complete an execution based on the input parameters in itsStepExecution
context. The name will be translated into aStep
instance by the remote worker.- Parameters:
stepName
- the name of theStep
instance to execute business logic
-
aggregate
- Parameters:
messages
- the messages to be aggregated- Returns:
- the list as it was passed in
-
setReplyChannel
public void setReplyChannel(org.springframework.messaging.PollableChannel replyChannel) -
handle
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, StepExecution managerStepExecution) throws Exception SendsStepExecutionRequest
objects to the request channel of theMessagingTemplate
, and then receives the result back as a list ofStepExecution
on a reply channel. Use theaggregate(List)
method as an aggregator of the individual remote replies. The receive timeout needs to be set realistically in theMessagingTemplate
and the aggregator, so that there is a good chance of all work being done.- Specified by:
handle
in interfacePartitionHandler
- Parameters:
stepExecutionSplitter
- a strategy for generating a collection ofStepExecution
instancesmanagerStepExecution
- the manager step execution for the whole partition- Returns:
- a collection of completed
StepExecution
instances - Throws:
Exception
- if anything goes wrong. This allows implementations to be liberal and rely on the caller to translate an exception into a step failure as necessary.- See Also:
-