Use Case: Massively Parallel Batch Processing

Goal

Support efficient processing of really large batch jobs (100K - 1000K records) through parallel processing, across multiple processes or physical or virtual machines. The goals of other use cases should not be compromised, e.g. we need to be able to start and stop a batch job easily (for non developer), and trace the progress and failure points of a batch. The client code should not be aware of whether the processing is parallel or serial.

Scope

  • Any batch operation that reads data item-by-item from an input source is capable of being scaled up by parallelizing.
  • The initial implementation might concentrate on multiple threads in a single process. Ultimately we need to be able to support multiple processes each one running in an application server (e.g. so that jobs that require EJBs can be used).

Preconditions

  • A data source with multiple chunks (commitable units) - more chunks than parallel processes.
  • A way for the framework to launch parallel processes.

Success

  • A batch completes successfully, and the results are verified.
  • A batch fails in one of the nodes, and when restarted processes the remaining records.

Description

  1. Framework splits input data into partitions.
  2. Framework sends input data (or references to them) to processing nodes.
  3. Processing nodes act independently, converting the input data and sending it transactionally to output source (as per normal single process batch).
  4. Framework collects status data from individual nodes for reporting and auditing.
  5. When all nodes are complete Framework decides that batch is complete finishes processing.

Variations

Two failure cases can be distinguished, bad input data on a node and an internal node failure have different implications for how to proceed. In both cases, however

  1. Framework catches exception and classifies it. Rolls back current transaction to preserve state of data (input and output).
  2. Framework saves state for restart from last known good point, including a pointer to the next input record.

Then if a processing node detects bad data in the input source, it cannot be restarted or re-distributed because the data need to be modified for a successful outcome.

  1. Framework alerts Operator of the location and nature of the failure.
  2. Operator waits for batch to finish - the overall status will be a failure, but most of the data might be consumed.
  3. Operator fixes problem and restarts batch.
  4. Framework does not re-process data that has already been processed successfully. The parallel processing nodes are used as before.
  5. Batch completes normally.

If a processing node fails unrecoverably (e.g. after retry timeout), but with no indication that the input data were bad, then the data can be re-used: Framework returns unprocessed input data, and redistributes it to other nodes.

Implementation

  • There are actually two approaches to this problem, which are largely complementary.
    1. The Chunking model dynamically assigned chunks of items to be processed and sends them to durable middleware. Worker processes pick them up and process them, sending back a message about the status. This approach works best if the dispatching is efficient compared to the processing.
    2. The Partitioning approach is more like running multiple jobs in parallel, with input data partitioned into larger pieces, and not split any further by the dispatcher. The item reading happens in the worker processes. This approach is necessary if the dispatcher in the Chunking model becomes a bottle neck.

    Generally, chunking is easier to implement than partitioning, but there are tools available for implementing both patterns efficiently.

Chunking

The messages from a dispatcher to worker processes consist of a chunk of items - a set of items to be processed together in a single transaction (or as the worker sees fit). The dispatcher is usually single threaded, but this is only a restriction based on the input data type (if it is a file it is difficult to read in parallel and maintain restartability). Using a process indicator the dispatcher could be reading from a database table in a multi-threaded model.

The main restriction is that for restartability the messages between the dispatcher and workers has to be durable (i.e. JMS or equivalent). If there is a durable middleware there are no in principle difficulties with this approach.

The practicalities deserve some discussion. In particular the dispatcher has to co-ordinate asynchronous replies from its workers, and also has to avoid overwhelming the workers (so there should be some throttling). As long as the middleware is durable the dispatcher can simply wait for replies whenever it thinks there are workers working. It needs to record this expectation in a durable form as well, as part of an ExecutionContext for the step.

Partitioning

The hard thing about this use case is the partitioning of input (and output) sources. Ideally, this has to be done in such a way that the individual operations are unaware that they are participating in a batch farm. Partitioning has to be at least partially deterministic because restarts have to be able to ignore data that have already been processed successfully.

Consider two examples: a file input source and a JDBC (SQL query) based input source. Each provides its own challenges.

File Data Source
  • If each node reads the whole file there could be a performance issue. They would all need to have instructions about which lines to process.
  • If each record of input data is a line, this isn't so bad. Each node can have a range of line numbers to process. The only problem is knowing how many lines there are, and how many nodes, so that the job can be partitionaed efficiently.
  • But if each input record can span a variable number of lines (not that unlikely in practice), then we can't use line numbers
  • Maybe the best solution is to use middleware anyway. A single process parses the file and sends it to a message queue, item by item (or chunk by chunk). The integration pattern could then be a simple Eager Consumer, assuming that all records are processed independently. The messaging semantics would simply have to ensure that a consumer can roll back and return the input records to a queue for another consumer to retry.
  • For large batches a real messaging infrastructure (JMS etc.) with guaranteed delivery would be a benefit, but might be seen as overkill for a system that didn't otherwise require it. In this case we could imagine the partitioning process being one of simply dividing the input file up into smaller files, which are then processed by individual nodes independently. The integration pattern is then different - more like a Router.
  • What would parallel processing look like to the client? We can make it completely transparent if we assume that the client only ever implements ItemReader and ItemWriter. The client code is unaware of the partitioning of its data source.
  • Parallelisation could also take place at the level of the ItemReader - we could proxy the data provider and wrap it in a partitioning proxy:
    <bean id="itemReader" 
        class="org.springframework.aop.framework.ProxyFactoryBean">
    
        <property name="target">
            <bean class="test.input.TradeItemProvider">
                ...
            </bean>
        </property>
        <property name="interceptorNames" value="partitioner"/>
    
    </bean>
    
    <bean id="partitioner" 
        class="org.springframework.core.batch.support.provider.PartioningInterceptor">
        ...
    </bean>
    
SQL Data Source Partitioning
  • If each node is allowed to do its own query or queries to determine the input data:
    • Each node has to be given a way to narrow the query so that they don't all use the same data. There is no easy universal way to achieve this, and in the general case we have to know in advance when we are going to execute in a parallel or as a single process. Maybe a range of primary keys would work as a special case that we could support as a strategy.
    • Maybe we could assume that all nodes execute precisely the same query, and then provide a way to add a cursor to the result set, so it can be treated a bit more like a file.
    • We might be forced to use a distributed transaction to ensure that all the nodes see the same data. This would be unfortunate, but possibly necessary. It would be up to the client to configure distributed transactions if that was required, otherwise the result might be unpredictable if data can be added to an input source while it is being read.
  • If only one query is done by the Framework and the results shared out amongst the nodes we face the issue of how to send the data between nodes. Performance problems might ensue. Plus (more seriously) the individual nodes would now need a different implementation if they were acting in a parallel cluster to the vanilla serial processing case - a single node would do the query and work directly with the results, whereas in a parallel environment it would be one step removed from the actual query. This breaks our encapsulation design goal.
  • When considering the approach to partitioning the data source we should follow closely the discussion above on partitioning a file input source. If the client is to remain unaware of the batch parameters, then an interceptor looks like the best approach.

    If each node prefers to do its own query then an interceptor would have to catch the call to a JDBC template and modify the query dynamically. This is quite a scary thing to be doing - it might end up with us needing to parse the SQL and add where clauses. Maybe a client should be forced to specify (in the case of a parallel batch) how his query should be partitioned. For example:

    <bean id="inputSource" 
        class="test.input.SqlInputItemReader">
    
        <property name="query">
            <value>SELECT * from T_INPUT</value>
        </property>
    
        <property name="partitionQuery">
            <value>SELECT * from T_INPUT where ID>=? and ID<?</value>
        </property>
    
    </bean>

    It would be an error to run a batch in parallel if the partition query had not been provided.

  • What happens if the data source changes between failed execution and restart? We can't legislate for that because it is outside the realm of what can be controlled through a transaction. A restart might produce different results than the original failed batch would have done were it successful.