Class MessageChannelPartitionHandler

java.lang.Object
org.springframework.batch.integration.partition.MessageChannelPartitionHandler
All Implemented Interfaces:
PartitionHandler, org.springframework.beans.factory.InitializingBean

@MessageEndpoint public class MessageChannelPartitionHandler extends Object implements PartitionHandler, org.springframework.beans.factory.InitializingBean
A PartitionHandler that uses MessageChannel instances to send instructions to remote workers and receive their responses. The MessageChannel provides a nice abstraction so that the location of the workers and the transport used to communicate with them can be changed at run time. The communication with the remote workers does not need to be transactional or have guaranteed delivery, so a local thread pool based implementation works as well as a remote web service or JMS implementation. If a remote worker fails, the job will fail and can be restarted to pick up missing messages and processing. The remote workers need access to the Spring Batch JobRepository so that the shared state across those restarts can be managed centrally. While a MessageChannel is used for sending the requests to the workers, the worker's responses can be obtained in one of two ways:
  • A reply channel - Workers will respond with messages that will be aggregated via this component.
  • Polling the job repository - Since the state of each worker is maintained independently within the job repository, we can poll the store to determine the state without the need of the workers to formally respond.
Note: The reply channel for this is instance based. Sharing this component across multiple step instances may result in the crossing of messages. It's recommended that this component be step or job scoped.
Author:
Dave Syer, Will Schipp, Michael Minella, Mahmoud Ben Hassine
  • Constructor Details

    • MessageChannelPartitionHandler

      public MessageChannelPartitionHandler()
  • Method Details

    • afterPropertiesSet

      public void afterPropertiesSet() throws Exception
      Specified by:
      afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean
      Throws:
      Exception
    • setTimeout

      public void setTimeout(long timeout)
      When using job repository polling, the time limit to wait.
      Parameters:
      timeout - milliseconds to wait, defaults to -1 (no timeout).
    • setJobExplorer

      public void setJobExplorer(JobExplorer jobExplorer)
      JobExplorer to use to query the job repository. Either this or a DataSource is required when using job repository polling.
      Parameters:
      jobExplorer - JobExplorer to use for lookups
    • setPollInterval

      public void setPollInterval(long pollInterval)
      How often to poll the job repository for the status of the workers.
      Parameters:
      pollInterval - milliseconds between polls, defaults to 10000 (10 seconds).
    • setDataSource

      public void setDataSource(DataSource dataSource)
      DataSource pointing to the job repository
      Parameters:
      dataSource - DataSource that points to the job repository's store
    • setMessagingOperations

      public void setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway)
      A pre-configured gateway for sending and receiving messages to the remote workers. Using this property allows a large degree of control over the timeouts and other properties of the send. It should have channels set up internally: The timeout for the reply should be set sufficiently long that the remote steps have time to complete.
      Parameters:
      messagingGateway - the MessagingTemplate to set
    • setGridSize

      public void setGridSize(int gridSize)
      Passed to the StepExecutionSplitter in the handle(StepExecutionSplitter, StepExecution) method, instructing it how many StepExecution instances are required, ideally. The StepExecutionSplitter is allowed to ignore the grid size in the case of a restart, since the input data partitions must be preserved.
      Parameters:
      gridSize - the number of step executions that will be created
    • setStepName

      public void setStepName(String stepName)
      The name of the Step that will be used to execute the partitioned StepExecution. This is a regular Spring Batch step, with all the business logic required to complete an execution based on the input parameters in its StepExecution context. The name will be translated into a Step instance by the remote worker.
      Parameters:
      stepName - the name of the Step instance to execute business logic
    • aggregate

      @Aggregator(sendPartialResultsOnExpiry="true") public List<?> aggregate(@Payloads List<?> messages)
      Parameters:
      messages - the messages to be aggregated
      Returns:
      the list as it was passed in
    • setReplyChannel

      public void setReplyChannel(org.springframework.messaging.PollableChannel replyChannel)
    • handle

      public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, StepExecution managerStepExecution) throws Exception
      Sends StepExecutionRequest objects to the request channel of the MessagingTemplate, and then receives the result back as a list of StepExecution on a reply channel. Use the aggregate(List) method as an aggregator of the individual remote replies. The receive timeout needs to be set realistically in the MessagingTemplate and the aggregator, so that there is a good chance of all work being done.
      Specified by:
      handle in interface PartitionHandler
      Parameters:
      stepExecutionSplitter - a strategy for generating a collection of StepExecution instances
      managerStepExecution - the manager step execution for the whole partition
      Returns:
      a collection of completed StepExecution instances
      Throws:
      Exception - if anything goes wrong. This allows implementations to be liberal and rely on the caller to translate an exception into a step failure as necessary.
      See Also: