Creating Custom ItemReaders and ItemWriters

So far, this chapter has discussed the basic contracts of reading and writing in Spring Batch and some common implementations for doing so. However, these are all fairly generic, and there are many potential scenarios that may not be covered by out-of-the-box implementations. This section shows, by using a simple example, how to create a custom ItemReader and ItemWriter implementation and implement their contracts correctly. The ItemReader also implements ItemStream, in order to illustrate how to make a reader or writer restartable.

Custom ItemReader Example

For the purpose of this example, we create a simple ItemReader implementation that reads from a provided list. We start by implementing the most basic contract of ItemReader, the read method, as shown in the following code:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

The preceding class takes a list of items and returns them one at a time, removing each from the list. When the list is empty, it returns null, thus satisfying the most basic requirements of an ItemReader, as illustrated in the following test code:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

Making the ItemReader Restartable

The final challenge is to make the ItemReader restartable. Currently, if processing is interrupted and begins again, the ItemReader must start at the beginning. This is actually valid in many scenarios, but it is sometimes preferable that a batch job restarts where it left off. The key discriminant is often whether the reader is stateful or stateless. A stateless reader does not need to worry about restartability, but a stateful one has to try to reconstitute its last known state on restart. For this reason, we recommend that you keep custom readers stateless if possible, so you need not worry about restartability.

If you do need to store state, then the ItemStream interface should be used:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

On each call to the ItemStream update method, the current index of the ItemReader is stored in the provided ExecutionContext with a key of 'current.index'. When the ItemStream open method is called, the ExecutionContext is checked to see if it contains an entry with that key. If the key is found, then the current index is moved to that location. This is a fairly trivial example, but it still meets the general contract:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

Most ItemReaders have much more sophisticated restart logic. The JdbcCursorItemReader, for example, stores the row ID of the last processed row in the cursor.

It is also worth noting that the key used within the ExecutionContext should not be trivial. That is because the same ExecutionContext is used for all ItemStreams within a Step. In most cases, simply prepending the key with the class name should be enough to guarantee uniqueness. However, in the rare cases where two of the same type of ItemStream are used in the same step (which can happen if two files are needed for output), a more unique name is needed. For this reason, many of the Spring Batch ItemReader and ItemWriter implementations have a setName() property that lets this key name be overridden.

Custom ItemWriter Example

Implementing a Custom ItemWriter is similar in many ways to the ItemReader example above but differs in enough ways as to warrant its own example. However, adding restartability is essentially the same, so it is not covered in this example. As with the ItemReader example, a List is used in order to keep the example as simple as possible:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

Making the ItemWriter Restartable

To make the ItemWriter restartable, we would follow the same process as for the ItemReader, adding and implementing the ItemStream interface to synchronize the execution context. In the example, we might have to count the number of items processed and add that as a footer record. If we needed to do that, we could implement ItemStream in our ItemWriter so that the counter was reconstituted from the execution context if the stream was re-opened.

In many realistic cases, custom ItemWriters also delegate to another writer that itself is restartable (for example, when writing to a file), or else it writes to a transactional resource and so does not need to be restartable, because it is stateless. When you have a stateful writer you should probably be sure to implement ItemStream as well as ItemWriter. Remember also that the client of the writer needs to be aware of the ItemStream, so you may need to register it as a stream in the configuration.