Support efficient processing of really large batch jobs (100K - 1000K records) through parallel processing, across multiple processes or physical or virtual machines. The goals of other use cases should not be compromised, e.g. we need to be able to start and stop a batch job easily (for non developer), and trace the progress and failure points of a batch. The client code should not be aware of whether the processing is parallel or serial.
Two failure cases can be distinguished, bad input data on a node and an internal node failure have different implications for how to proceed. In both cases, however
Then if a processing node detects bad data in the input source, it cannot be restarted or re-distributed because the data need to be modified for a successful outcome.
If a processing node fails unrecoverably (e.g. after retry timeout), but with no indication that the input data were bad, then the data can be re-used: Framework returns unprocessed input data, and redistributes it to other nodes.
Generally, chunking is easier to implement than partitioning, but there are tools available for implementing both patterns efficiently.
The messages from a dispatcher to worker processes consist of a chunk of items - a set of items to be processed together in a single transaction (or as the worker sees fit). The dispatcher is usually single threaded, but this is only a restriction based on the input data type (if it is a file it is difficult to read in parallel and maintain restartability). Using a process indicator the dispatcher could be reading from a database table in a multi-threaded model.
The main restriction is that for restartability the messages between the dispatcher and workers has to be durable (i.e. JMS or equivalent). If there is a durable middleware there are no in principle difficulties with this approach.
The practicalities deserve some discussion. In particular the dispatcher has to co-ordinate asynchronous replies from its workers, and also has to avoid overwhelming the workers (so there should be some throttling). As long as the middleware is durable the dispatcher can simply wait for replies whenever it thinks there are workers working. It needs to record this expectation in a durable form as well, as part of an ExecutionContext for the step.
The hard thing about this use case is the partitioning of input (and output) sources. Ideally, this has to be done in such a way that the individual operations are unaware that they are participating in a batch farm. Partitioning has to be at least partially deterministic because restarts have to be able to ignore data that have already been processed successfully.
Consider two examples: a file input source and a JDBC (SQL query) based input source. Each provides its own challenges.
For large batches a real messaging infrastructure (JMS etc.) with guaranteed delivery would be a benefit, but might be seen as overkill for a system that didn't otherwise require it. In this case we could imagine the partitioning process being one of simply dividing the input file up into smaller files, which are then processed by individual nodes independently. The integration pattern is then different - more like a Router.
batchTemplate.iterate(new ItemProviderRepeatCallback(provider, processor));
<bean id="itemProvider" class="org.springframework.aop.framework.ProxyFactoryBean"> <property name="target"> <bean class="test.input.TradeItemProvider"> ... </bean> </property> <property name="interceptorNames" value="partitioner"/> </bean> <bean id="partitioner" class="org.springframework.core.batch.support.provider.PartioningInterceptor"> ... </bean>
If each node prefers to do its own query then an interceptor would have to catch the call to a JDBC template and modify the query dynamically. This is quite a scary thing to be doing - it might end up with us needing to parse the SQL and add where clauses. Maybe a client should be forced to specify (in the case of a parallel batch) how his query should be partitioned. For example:
<bean id="inputSource" class="test.input.SqlInputDataProvider"> <property name="query"> <value>SELECT * from T_INPUT</value> </property> <property name="partitionQuery"> <value>SELECT * from T_INPUT where ID>=? and ID<?</value> </property> </bean>
It would be an error to run a batch in parallel if the partition query had not been provided.