Class MessageChannelPartitionHandler
java.lang.Object
org.springframework.batch.core.partition.support.AbstractPartitionHandler
org.springframework.batch.integration.partition.MessageChannelPartitionHandler
- All Implemented Interfaces:
PartitionHandler,org.springframework.beans.factory.InitializingBean
@MessageEndpoint
public class MessageChannelPartitionHandler
extends AbstractPartitionHandler
implements org.springframework.beans.factory.InitializingBean
A
PartitionHandler that uses MessageChannel instances to send
instructions to remote workers and receive their responses. The MessageChannel
provides a nice abstraction so that the location of the workers and the transport used
to communicate with them can be changed at run time. The communication with the remote
workers does not need to be transactional or have guaranteed delivery, so a local
thread pool based implementation works as well as a remote web service or JMS
implementation. If a remote worker fails, the job will fail and can be restarted to
pick up missing messages and processing. The remote workers need access to the Spring
Batch JobRepository so that the shared state across those restarts can be
managed centrally.
While a MessageChannel is used for sending the
requests to the workers, the worker's responses can be obtained in one of two ways:
- A reply channel - Workers will respond with messages that will be aggregated via this component.
- Polling the job repository - Since the state of each worker is maintained independently within the job repository, we can poll the store to determine the state without the need of the workers to formally respond.
- Author:
- Dave Syer, Will Schipp, Michael Minella, Mahmoud Ben Hassine
-
Field Summary
Fields inherited from class org.springframework.batch.core.partition.support.AbstractPartitionHandler
gridSize -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidList<?>protected Set<StepExecution>doHandle(StepExecution managerStepExecution, Set<StepExecution> partitionStepExecutions) SendsStepExecutionRequestobjects to the request channel of theMessagingTemplate, and then receives the result back as a list ofStepExecutionon a reply channel.voidsetDataSource(DataSource dataSource) DataSourcepointing to the job repositoryvoidsetJobExplorer(JobExplorer jobExplorer) JobExplorerto use to query the job repository.voidsetMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway) A pre-configured gateway for sending and receiving messages to the remote workers.voidsetPollInterval(long pollInterval) How often to poll the job repository for the status of the workers.voidsetReplyChannel(org.springframework.messaging.PollableChannel replyChannel) voidsetStepName(String stepName) The name of theStepthat will be used to execute the partitionedStepExecution.voidsetTimeout(long timeout) When using job repository polling, the time limit to wait.Methods inherited from class org.springframework.batch.core.partition.support.AbstractPartitionHandler
getGridSize, handle, setGridSize
-
Constructor Details
-
MessageChannelPartitionHandler
public MessageChannelPartitionHandler()
-
-
Method Details
-
afterPropertiesSet
- Specified by:
afterPropertiesSetin interfaceorg.springframework.beans.factory.InitializingBean- Throws:
Exception
-
setTimeout
public void setTimeout(long timeout) When using job repository polling, the time limit to wait.- Parameters:
timeout- milliseconds to wait, defaults to -1 (no timeout).
-
setJobExplorer
JobExplorerto use to query the job repository. Either this or aDataSourceis required when using job repository polling.- Parameters:
jobExplorer-JobExplorerto use for lookups
-
setPollInterval
public void setPollInterval(long pollInterval) How often to poll the job repository for the status of the workers.- Parameters:
pollInterval- milliseconds between polls, defaults to 10000 (10 seconds).
-
setDataSource
DataSourcepointing to the job repository- Parameters:
dataSource-DataSourcethat points to the job repository's store
-
setMessagingOperations
public void setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway) A pre-configured gateway for sending and receiving messages to the remote workers. Using this property allows a large degree of control over the timeouts and other properties of the send. It should have channels set up internally:- request channel capable of accepting
StepExecutionRequestpayloads - reply channel that returns a list of
StepExecutionresults
- Parameters:
messagingGateway- theMessagingTemplateto set
- request channel capable of accepting
-
setStepName
The name of theStepthat will be used to execute the partitionedStepExecution. This is a regular Spring Batch step, with all the business logic required to complete an execution based on the input parameters in itsStepExecutioncontext. The name will be translated into aStepinstance by the remote worker.- Parameters:
stepName- the name of theStepinstance to execute business logic
-
aggregate
- Parameters:
messages- the messages to be aggregated- Returns:
- the list as it was passed in
-
setReplyChannel
public void setReplyChannel(org.springframework.messaging.PollableChannel replyChannel) -
doHandle
protected Set<StepExecution> doHandle(StepExecution managerStepExecution, Set<StepExecution> partitionStepExecutions) throws Exception SendsStepExecutionRequestobjects to the request channel of theMessagingTemplate, and then receives the result back as a list ofStepExecutionon a reply channel. Use theaggregate(List)method as an aggregator of the individual remote replies. The receive timeout needs to be set realistically in theMessagingTemplateand the aggregator, so that there is a good chance of all work being done.- Specified by:
doHandlein classAbstractPartitionHandler- Parameters:
managerStepExecution- the whole partition executionpartitionStepExecutions- theStepExecutioninstances to execute- Returns:
- an updated view of these completed
StepExecutioninstances - Throws:
Exception- if anything goes wrong. This allows implementations to be liberal and rely on the caller to translate an exception into a step failure as necessary.- See Also:
-