Creating Custom ItemReaders and ItemWriters
So far, this chapter has discussed the basic contracts of reading and writing in Spring
Batch and some common implementations for doing so. However, these are all fairly
generic, and there are many potential scenarios that may not be covered by out-of-the-box
implementations. This section shows, by using a simple example, how to create a custom
ItemReader
and ItemWriter
implementation and implement their contracts correctly. The
ItemReader
also implements ItemStream
, in order to illustrate how to make a reader or
writer restartable.
Custom ItemReader
Example
For the purpose of this example, we create a simple ItemReader
implementation that
reads from a provided list. We start by implementing the most basic contract of
ItemReader
, the read
method, as shown in the following code:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
The preceding class takes a list of items and returns them one at a time, removing each
from the list. When the list is empty, it returns null
, thus satisfying the most basic
requirements of an ItemReader
, as illustrated in the following test code:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
Making the ItemReader
Restartable
The final challenge is to make the ItemReader
restartable. Currently, if processing is
interrupted and begins again, the ItemReader
must start at the beginning. This is
actually valid in many scenarios, but it is sometimes preferable that a batch job
restarts where it left off. The key discriminant is often whether the reader is stateful
or stateless. A stateless reader does not need to worry about restartability, but a
stateful one has to try to reconstitute its last known state on restart. For this reason,
we recommend that you keep custom readers stateless if possible, so you need not worry
about restartability.
If you do need to store state, then the ItemStream
interface should be used:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
On each call to the ItemStream
update
method, the current index of the ItemReader
is stored in the provided ExecutionContext
with a key of 'current.index'. When the
ItemStream
open
method is called, the ExecutionContext
is checked to see if it
contains an entry with that key. If the key is found, then the current index is moved to
that location. This is a fairly trivial example, but it still meets the general contract:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
Most ItemReaders
have much more sophisticated restart logic. The
JdbcCursorItemReader
, for example, stores the row ID of the last processed row in the
cursor.
It is also worth noting that the key used within the ExecutionContext
should not be
trivial. That is because the same ExecutionContext
is used for all ItemStreams
within
a Step
. In most cases, simply prepending the key with the class name should be enough
to guarantee uniqueness. However, in the rare cases where two of the same type of
ItemStream
are used in the same step (which can happen if two files are needed for
output), a more unique name is needed. For this reason, many of the Spring Batch
ItemReader
and ItemWriter
implementations have a setName()
property that lets this
key name be overridden.
Custom ItemWriter
Example
Implementing a Custom ItemWriter
is similar in many ways to the ItemReader
example
above but differs in enough ways as to warrant its own example. However, adding
restartability is essentially the same, so it is not covered in this example. As with the
ItemReader
example, a List
is used in order to keep the example as simple as
possible:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
Making the ItemWriter
Restartable
To make the ItemWriter
restartable, we would follow the same process as for the
ItemReader
, adding and implementing the ItemStream
interface to synchronize the
execution context. In the example, we might have to count the number of items processed
and add that as a footer record. If we needed to do that, we could implement
ItemStream
in our ItemWriter
so that the counter was reconstituted from the execution
context if the stream was re-opened.
In many realistic cases, custom ItemWriters
also delegate to another writer that itself
is restartable (for example, when writing to a file), or else it writes to a
transactional resource and so does not need to be restartable, because it is stateless.
When you have a stateful writer you should probably be sure to implement ItemStream
as
well as ItemWriter
. Remember also that the client of the writer needs to be aware of
the ItemStream
, so you may need to register it as a stream in the configuration.