@MessageEndpoint public class MessageChannelPartitionHandler extends java.lang.Object implements PartitionHandler, org.springframework.beans.factory.InitializingBean
PartitionHandler
that uses MessageChannel
instances to send instructions to remote workers and
receive their responses. The MessageChannel
provides a nice abstraction so that the location of the workers
and the transport used to communicate with them can be changed at run time. The communication with the remote workers
does not need to be transactional or have guaranteed delivery, so a local thread pool based implementation works as
well as a remote web service or JMS implementation. If a remote worker fails, the job will fail and can be restarted
to pick up missing messages and processing. The remote workers need access to the Spring Batch JobRepository
so that the shared state across those restarts can be managed centrally.
While a MessageChannel
is used for sending the requests to the workers, the
worker's responses can be obtained in one of two ways:
Constructor and Description |
---|
MessageChannelPartitionHandler() |
Modifier and Type | Method and Description |
---|---|
void |
afterPropertiesSet() |
java.util.List<?> |
aggregate(java.util.List<?> messages) |
java.util.Collection<StepExecution> |
handle(StepExecutionSplitter stepExecutionSplitter,
StepExecution masterStepExecution)
Sends
StepExecutionRequest objects to the request channel of the MessagingTemplate , and then
receives the result back as a list of StepExecution on a reply channel. |
void |
setDataSource(javax.sql.DataSource dataSource)
DataSource pointing to the job repository |
void |
setGridSize(int gridSize)
Passed to the
StepExecutionSplitter in the handle(StepExecutionSplitter, StepExecution) method,
instructing it how many StepExecution instances are required, ideally. |
void |
setJobExplorer(JobExplorer jobExplorer)
JobExplorer to use to query the job repository. |
void |
setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway)
A pre-configured gateway for sending and receiving messages to the remote workers.
|
void |
setPollInterval(long pollInterval)
How often to poll the job repository for the status of the workers.
|
void |
setReplyChannel(org.springframework.messaging.PollableChannel replyChannel) |
void |
setStepName(java.lang.String stepName)
The name of the
Step that will be used to execute the partitioned StepExecution . |
void |
setTimeout(long timeout)
When using job repository polling, the time limit to wait.
|
public void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
java.lang.Exception
public void setTimeout(long timeout)
timeout
- milliseconds to wait, defaults to -1 (no timeout).public void setJobExplorer(JobExplorer jobExplorer)
JobExplorer
to use to query the job repository. Either this or
a DataSource
is required when using job repository polling.jobExplorer
- JobExplorer
to use for lookupspublic void setPollInterval(long pollInterval)
pollInterval
- milliseconds between polls, defaults to 10000 (10 seconds).public void setDataSource(javax.sql.DataSource dataSource)
DataSource
pointing to the job repositorydataSource
- DataSource
that points to the job repository's storepublic void setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway)
StepExecutionRequest
payloadsStepExecution
resultsmessagingGateway
- the MessagingTemplate
to setpublic void setGridSize(int gridSize)
StepExecutionSplitter
in the handle(StepExecutionSplitter, StepExecution)
method,
instructing it how many StepExecution
instances are required, ideally. The StepExecutionSplitter
is allowed to ignore the grid size in the case of a restart, since the input data partitions must be preserved.gridSize
- the number of step executions that will be createdpublic void setStepName(java.lang.String stepName)
Step
that will be used to execute the partitioned StepExecution
. This is a
regular Spring Batch step, with all the business logic required to complete an execution based on the input
parameters in its StepExecution
context. The name will be translated into a Step
instance by the
remote worker.stepName
- the name of the Step
instance to execute business logic@Aggregator(sendPartialResultsOnExpiry="true") public java.util.List<?> aggregate(@Payloads java.util.List<?> messages)
messages
- the messages to be aggregatedpublic void setReplyChannel(org.springframework.messaging.PollableChannel replyChannel)
public java.util.Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, StepExecution masterStepExecution) throws java.lang.Exception
StepExecutionRequest
objects to the request channel of the MessagingTemplate
, and then
receives the result back as a list of StepExecution
on a reply channel. Use the aggregate(List)
method as an aggregator of the individual remote replies. The receive timeout needs to be set realistically in
the MessagingTemplate
and the aggregator, so that there is a good chance of all work being done.handle
in interface PartitionHandler
stepExecutionSplitter
- a strategy for generating a collection of
StepExecution
instancesmasterStepExecution
- the master step execution for the whole partitionStepExecution
instancesjava.lang.Exception
- if anything goes wrong. This allows implementations to
be liberal and rely on the caller to translate an exception into a step
failure as necessary.PartitionHandler.handle(StepExecutionSplitter, StepExecution)