Use Case: Asynchronous Chunk Processing

Goal

Increased the efficiency of chunk processing by having it execute asynchronously: in multiple threads. Maintain transactional intergrity of the chunk.

Scope

  • All chunks might conceivably benefit from parallel processing, so we don't want any unnecessary restrictions on the batch operation, or its implementation. A should be possible for Client to write a batch operation without reference to the fact that it might run in an asynchronous chunk.

Preconditions

  • Input data exists with non-trivial size: chunks contain more than one record.
  • Batch processing of a record is slow, or can be delayed, so that the asynchronous processing can take longer than launching the threads.
  • A chunk can be made to fail after at least one record is processed.

Success

  • A chunk is processed and the results inspected to verify that all records were processed.
  • Transactional behaviour is verified by rolling back a chunk and verifying that no records were processed.

Description

The vanilla case proceeds as for normal chunk processing, but:

  1. Within a chunk, Container processes records in parallel.
  2. At the end of a chunk, Container waits for the last record to be processed (with a timeout if the wait is long).

Variations

Rollback on Failure

If there is an exception in one of the record processing threads, the whole chunk should roll back:

  1. Client throws exception in record processing.
  2. Container catahes exception and attempts to abort other running processes.
  3. Container waits for running processes to abort (or finish normally, but preferably to abort).
  4. Container propagates the exception and signals transaction to rollback.

Timeout

If there is a timeout during a chunk, it might happen before the chunk has finished, or while waiting for the processes to complete before exiting.

  1. At end of chunk, Container is waiting for all processes to finish. It times out, according to a parameter set by the Operator.
  2. Container does not start any new processes, and attempts to abort running processes.
  3. Container waits for running processes to abort (or finish normally, but preferably to abort).
  4. Container throws a time out exception and signals chunk transaction to rollback.

Implementation

  • The implementation of this use case could be tricky in the general case. In particular, the transactional nature is going to be hard or impossible to maintain across multiple threads without the individual processes being aware of the transaction, and (perhaps) without global (XA) transaction support.

    A "normal" local transaction is thread bound - i.e. it only executes in one thread. If the code inside the transaction creates new threads, then they might not finish processing before the parent exits and the transaction wants to finish. The transaction needs to wait for the sub-processes before committing, or (more difficult) rolling back. The rollback case basically forces us to a model of one transaction per thread, and therefore to one transaction per data item in a concurrent environment.

    Otherwise some transactional semantics might be respected in a parallel process, but others certainly will not be because synchronizations and resources are managed at the level of the thread where the transaction started. If the transaction manager is a local one (not XA) there is little hope even that the datasource resource would be the same for all the parallel threads and the parent method.

    If we use a global transaction manager to make the parallel processes transactional, how will they know which transaction to participate in? There could be many active chunks, and each would have its own threads - how would each one be able to guide its child processes to participate in the same transaction?

  • Beware a framework that extracts data from an ItemReader before executing the business logic (e.g. in a ItemWriter). It is not enough to allow concurrent processing but simply insist that the individual records are processed transactionally because the ItemReader will then not be able to participate in the transaction - its next record has already been passed to the consumer when the transaction starts, so if there is a rollback then the record is lost.

    This is the origin of the signature:

    public interface ItemReader {
        Object next();
    }

    There is no peeking and no iterator-style hasNext. If there is a processing problem, transactional clients of the ItemReader throw an exception after the provider's next() has been called, but in the same thread (so that transactional semantics are preserved and the data provider reverts to its previous state).

    This means that in the callback interface also picks up an Object return type

    public interface RepeatCallback {
        Object doInIteration(BatchContext context);
    }

    so we can return an object, which is null when the processing has finished.

    In the end we decided against the Object return type and went with an exit status to signal for no more processing.