All batch processing can be described in its most simple form as
reading in large amounts of data, performing some type of calculation or
transformation, and writing the result out. Spring Batch provides two key
interfaces to help perform bulk reading and writing:
ItemReader
and
ItemWriter
.
Although a simple concept, an ItemReader
is
the means for providing data from many different types of input. The most
general examples include:
Flat File- Flat File Item Readers read lines of data from a flat file that typically describe records with fields of data defined by fixed positions in the file or delimited by some special character (e.g. Comma).
XML - XML ItemReaders process XML independently of technologies used for parsing, mapping and validating objects. Input data allows for the validation of and XML file against an XSD schema.
Database - A database resource is accessed that returns
resultsets which can be mapped to objects for processing. The
default SQL Input Sources invoke a RowMapper
to return objects, keep track of the current row if restart is
required, basic statistics, and some transaction enhancements that
will be explained later.
There are many more possibilities, but we'll focus on the basic ones for this chapter. A complete list of all available ItemReaders can be found in Appendix A.
ItemReader
is a basic interface for generic
input operations:
public interface ItemReader { Object read() throws Exception; void mark() throws MarkFailedException; void reset() throws ResetFailedException; }
The read
method defines the most essential
contract of the ItemReader
, calling it returns one
Item, returning null if no more items are left. An item might represent a
line in a file, a row in a database, or an element in an XML file. It is
generally expected that these will be mapped to a usable domain object
(i.e. Trade, Foo, etc) but there is no requirement in the contract to do
so.
The mark
and reset
methods are important due to the transactional nature of batch processing.
Mark() will be called before reading begins. Calling
reset
at anytime will position the
ItemReader
to its position when
mark
was last called. The semantics are very
similar to java.io.Reader
.
ItemWriter
is similar in functionality to an
ItemReader
with the exception that the operations
are reversed. Resources still need to be located, opened and closed but
they differ in the case that an ItemWriter
writes
out, rather than reading in. In the case of databases or queues these may
be inserts, updates or sends. The format of the serialization of the
output is specific for every batch job.
As with ItemReader
,
ItemWriter
is a fairly generic interface:
public interface ItemWriter { void write(Object item) throws Exception; void flush() throws FlushFailedException; void clear() throws ClearFailedException; }
As with read
on
ItemReader
, write
provides
the basic contract of ItemWriter
, it will attempt
to write out the item passed in as long as it is open. As with
mark
and reset
,
flush
and clear
are
necessary due to the transactional nature of batch processing. Because it
is generally expected that items will be 'batched' together into a chunk,
and then output, it is expected that an ItemWriter
will perform some type of buffering. flush
will
empty the buffer by actually writing the items out, whereas
clear
will simply throw the contents of the
buffer away. In most cases, a Step implementation will call
flush
before a commit and
clear
in case of rollback. It is expected that
implementations of the Step
interface will call
these methods.
Both ItemReaders and ItemWriters serve their individual purposes well, but there is a common concern among both of them that necessitates another interface. In general, as part of the scope of a batch job, readers and writers need to be opened, closed, and require a mechanism for persisting state:
public interface ItemStream { void open(ExecutionContext executionContext) throws StreamException; void update(ExecutionContext executionContext); void close(ExecutionContext executionContext) throws StreamException; }
Before describing each method, it's worth briefly mentioning the
ExecutionContext
. Clients of an
ItemReader
that also implements
ItemStream
should call
open
before any calls to
read
, to open any resources such as files or
obtain connections. A similar restriction applies to an
ItemWriter
is also implements
ItemStream
. As mentioned before, if expected data
is found in the ExecutionContext
, it may be used to
start the ItemReader
or
ItemWriter
at a location other than its initial
state. Conversely, close
will be called to ensure
any resources allocated during open
will be
released safely. update
is called primarily to
ensure that any state currently being held is loaded into the provided
ExecutionContext
. This method will be called before
committing, to ensure that the current state is persisted in the database
before commit.
In the special case where the client of an
ItemStream
is a Step
(from
the Spring Batch Core), an ExecutionContext
is
created for each StepExecution
to allow users to
store the state of a particular execution, with the expectation that it
will be returned if the same JobInstance
is started
again. For those familiar with Quartz, the semantics are very similar to a
Quartz JobDataMap
.
One of the most common mechanisms for interchanging bulk data has always been the flat file. Unlike XML, which has an agreed upon standard for defining how it is structured (XSD), anyone reading a flat file must understand ahead of time exactly how the file is structured. In general, all flat files fall into two general types: Delimited and Fixed Length.
When working with flat files in Spring Batch, regardless of
whether it is for input or output, one of the most important classes is
the FieldSet
. Many architectures and libraries
contain abstractions for helping you read in from a file, but they
usually return a String or an array of Strings. This really only gets
you halfway there. A FieldSet
is Spring Batch’s
abstraction for enabling the binding of fields from a file resource. It
allows developers to work with file input in much the same way as they
would work with database input. A FieldSet
is
conceptually very similar to a Jdbc ResultSet
.
FieldSets only require one argument, a String
array of tokens. Optionally, you can also configure in the names of the
fields so that the fields may be accessed either by index or name as
patterned after ResultSet
. In code it means it's
as simple as:
String[] tokens = new String[]{"foo", "1", "true"}; FieldSet fs = new DefaultFieldSet(tokens); String name = fs.readString(0); int value = fs.readInt(1); boolean booleanValue = fs.readBoolean(2);
There are many more options on the FieldSet
interface, such as Date
, long,
BigDecimal
, etc. The biggest advantage of the
FieldSet
is that it provides consistent parsing
of flat file input. Rather than each batch job parsing differently in
potentially unexpected ways, it can be consistent, both when erroring
out due to a format exception, or when doing simple data
conversions.
A flat file is any type of file that contains at most
two-dimensional (tabular) data. Reading flat files in the Spring Batch
framework is facilitated by the class
FlatFileItemReader
, which provides basic
functionality for reading and parsing flat files.
FlatFileItemReader
class has several properties.
The three most important of these properties are
Resource
, FieldSetMapper
and LineTokenizer.
The
FieldSetMapper
and
LineTokenizer
interfaces will be explored more in
the next sections. The resource property represents a Spring Core
Resource
. Documentation explaining how to create
beans of this type can be found in Spring
Framework, Chapter 4.Resources. Therefore, this
guide will not go into the details of creating
Resource
objects. A resource is used to locate,
open, and close resources. It can be as simple as:
Resource resource = new FileSystemResource("resources/trades.csv");
In complex batch environments the directory structures are often managed by the EAI infrastructure where drop zones for external interfaces are established for moving files from ftp locations to batch processing locations and vice versa. File moving utilities are beyond the scope of the spring batch architecture but it is not unusual for batch job streams to include file moving utilities as steps in the job stream. It's sufficient to know that the batch architecture only needs to know how to locate the files to be processed. Spring Batch begins the process of feeding the data into the pipe from this starting point.
The other properties in FlatFileItemReader
allow you to further specify how your data will be interpreted:
Table 3.1. Flat File Item Reader Properties
Property | Type | Description |
---|---|---|
encoding | String | Specifies what text encoding to use - default is "ISO-8859-1" |
comments | String[] | Specifies line prefixes that indicate comment rows |
linesToSkip | int | Number of lines to ignore at the top of the file |
firstLineIsHeader | boolean | Indicates that the first line of the file is a header containing field names. If the column names have not been set yet and the tokenizer extends AbstractLineTokenizer, field names will be set automatically from this line |
recordSeparatorPolicy | RecordSeparatorPolicy | Used to determine where the line endings are and do things like continue over a line ending if inside a quoted string. |
The FieldSetMapper
interface defines a
single method, mapLine
, which takes a
FieldSet
object and maps its contents to an
object. This object may be a custom DTO or domain object, or it could
be as simple as an array, depending on your needs. The
FieldSetMapper
is used in conjunction with the
LineTokenizer
to translate a line of data from
a resource into an object of the desired type:
public interface FieldSetMapper { public Object mapLine(FieldSet fs); }
As you can see, the pattern used is exactly the same as
RowMapper
used by
JdbcTemplate
.
Because there can be many formats of flat file data, which all
need to be converted to a FieldSet
so that a
FieldSetMapper
can create a useful domain
object from them, an abstraction for turning a line of input into a
FieldSet
is necessary. In Spring Batch, this is
called a LineTokenizer
:
public interface LineTokenizer { FieldSet tokenize(String line); }
The contract of a LineTokenizer
is such
that, given a line of input (in theory the
String
could encompass more than one line) a
FieldSet
representing the line will be
returned. This will then be passed to a
FieldSetMapper
. Spring Batch contains the
following LineTokenizers:
DelmitedLineTokenizer
- Used for
files that separate records by a delimiter. The most common is a
comma, but pipes or semicolons are often used as well
FixedLengthTokenizer
- Used for
tokenizing files where each record is separated by a 'fixed width'
that must be defined per record.
PrefixMatchingCompositeLineTokenizer
- Tokenizer that determines which among a list of Tokenizers
should be used on a particular line by checking against a
prefix.
Now that the basic interfaces for reading in flat files have been defined, a simple example explaining how they work together is helpful. In it's most simple form, the flow when reading a line from a file is the following:
Read one line from the file.
Pass the string line into the LineTokenizer#tokenize()
method, in order to retrieve a
FieldSet
Pass the FieldSet returned from tokenizing to a FieldSetMapper, returning the result from the ItemReader#read() method
The following example will be used to illustrate this using an actual domain scenario. This particular batch job reads in football players from the following file:
ID,lastName,firstName,position,birthYear,debutYear "AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996", "AbduRa00,Abdullah,Rabih,rb,1975,1999", "AberWa00,Abercrombie,Walter,rb,1959,1982", "AbraDa00,Abramowicz,Danny,wr,1945,1967", "AdamBo00,Adams,Bob,te,1946,1969", "AdamCh00,Adams,Charlie,wr,1979,2003"
We want to map this data to the following Player domain object:
public class Player implements Serializable { private String ID; private String lastName; private String firstName; private String position; private int birthYear; private int debutYear; public String toString() { return "PLAYER:ID=" + ID + ",Last Name=" + lastName + ",First Name=" + firstName + ",Position=" + position + ",Birth Year=" + birthYear + ",DebutYear=" + debutYear; } // setters and getters... }
In order to map a FieldSet
into our
Player object, we need to create a
FieldSetMapper
that returns players:
protected static class PlayerFieldSetMapper implements FieldSetMapper { public Object mapLine(FieldSet fieldSet) { Player player = new Player(); player.setID(fieldSet.readString(0)); player.setLastName(fieldSet.readString(1)); player.setFirstName(fieldSet.readString(2)); player.setPosition(fieldSet.readString(3)); player.setBirthYear(fieldSet.readInt(4)); player.setDebutYear(fieldSet.readInt(5)); return player; } }
We can then read in from the file by correctly constructing our FlatFileItemReader and calling read():
FlatFileItemReader itemReader = new FlatFileItemReader(); itemReader.setResource = new FileSystemResource("resources/players.csv"); //DelimitedLineTokenizer defaults to comma as it's delimiter itemReader.setLineTokenizer(new DelimitedLineTokenizer()); itemReader.setFieldSetMapper(new PlayerFieldSetMapper()); itemReader.read();
Each call to read
will return a new
Player object from each line in the file. When the end of the file is
reached, null will be returned.
There is one additional functionality line tokenizers that is
similar in function to a JDBC ResultSet
. The
names of the fields can be injected into the
LineTokenizer
to increase the readability of
the mapping function. First, we tell the
LineTokenizer
what the names of the fields in
the fieldset are:
tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});
and provide a FieldSetMapper
that uses
this information as follows:
public class PlayerMapper implements FieldSetMapper { public Object mapLine(FieldSet fs) { if(fs == null){ return null; } Player player = new Player(); player.setID(fs.readString("ID")); player.setLastName(fs.readString("lastName")); player.setFirstName(fs.readString("firstName")); player.setPosition(fs.readString("position")); player.setDebutYear(fs.readInt("debutYear")); player.setBirthYear(fs.readInt("birthYear")); return player; } }
For many, having to write a specific
FieldSetMapper
is equally as cumbersome as
writing a specific RowMapper
for a
JdbcTemplate. Spring Batch makes this easier by providing a
FieldSetMapper
that automatically maps fields
by matching a field name with a setter using the JavaBean spec. Again
using the football example, the FieldSetMapper
configuration looks like the following:
<bean id="fieldSetMapper" class="org.springframework.batch.io.file.mapping.BeanWrapperFieldSetMapper"> <property name="prototypeBeanName" value="player" /> </bean> <bean id="person" class="org.springframework.batch.sample.domain.Player" scope="prototype" />
For each entry in the FieldSet
, the
mapper will look for a corresponding setter on a new instance of the
Player
object (for this reason, prototype scope
is required) in the same way the Spring container will look for
setters matching a property name. Each available field in the
FieldSet
will be mapped, and the resultant
Player
object will be returned, with no code
required.
So far only delimited files have been discussed in much detail, however, they respresent only half of the file reading picture. Many organizations that use flat files use fixed length formats. An example fixed length file is below:
UK21341EAH4121131.11customer1 UK21341EAH4221232.11customer2 UK21341EAH4321333.11customer3 UK21341EAH4421434.11customer4 UK21341EAH4521535.11customer5
While this looks like one large field, it actually represent 4 distinct fields:
ISIN: Unique identifier for the item being order - 12 characters long.
Quantity: Number of this item being ordered - 3 characters long.
Price: Price of the item - 4 characters long.
Customer: Id of the customer ordering the item - 8 characters long.
When configuring the
FixedLengthLineTokenizer
, each of these lengths
must be provided in the form of ranges:
<bean id="fixedLengthLineTokenizer" class="org.springframework.batch.io.file.transform.FixedLengthTokenizer"> <property name="names" value="ISIN, Quantity, Price, Customer" /> <property name="columns" value="1-12, 13-15, 16-20, 21-29" /> </bean>
This LineTokenizer
will return the same
FieldSet
as if a dlimiter had been used,
allowing the same approachs above to be used such as the
BeanWrapperFieldSetMapper
, in a way that is
ignorant of how the actual line was parsed.
All of the file reading examples up to this point have all made a key assumption for simplicity's sake: one record equals one line. However, this may not always be the case. Its very common that a file might have records spanning multiple lines with multiple formats. The following excerpt from a file illustrates this:
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US SAD;Smith, Elizabeth;Elm Street 17;;Some City;30011;FL;United States BIN;VISA;VISA-12345678903 LIT;1044391041;37.49;0;0;4.99;2.99;1;45.47 LIT;2134776319;221.99;5;0;7.99;2.99;1;221.87 SIN;UPS;EXP;DELIVER ONLY ON WEEKDAYS FOT;2;2;267.34
Everything between the line starting with 'HEA' and the line starting with 'FOT' is considered one record. The PrefixMatchingCompositeLineTokenizer makes this easier by matching the prefix in a line with a particular tokenizer:
<bean id="orderFileDescriptor" class="org.springframework.batch.io.file.transform.PrefixMatchingCompositeLineTokenizer"> <property name="tokenizers"> <map> <entry key="HEA" value-ref="headerRecordDescriptor" /> <entry key="FOT" value-ref="footerRecordDescriptor" /> <entry key="BCU" value-ref="businessCustomerLineDescriptor" /> <entry key="NCU" value-ref="customerLineDescriptor" /> <entry key="BAD" value-ref="billingAddressLineDescriptor" /> <entry key="SAD" value-ref="shippingAddressLineDescriptor" /> <entry key="BIN" value-ref="billingLineDescriptor" /> <entry key="SIN" value-ref="shippingLineDescriptor" /> <entry key="LIT" value-ref="itemLineDescriptor" /> <entry key="" value-ref="defaultLineDescriptor" /> </map> </property> </bean>
This ensures that the line will be parsed correctly, which is
especially important for fixed length input, with the correct field
names. Any users of the FlatFileItemReader
in
this scenario must continue calling read
until the footer for the record is returned, allowing them to return a
complete order as one 'item'.
Writing out to flat files has the same problems and issues that reading in from a file must overcome. It must be able to write out in either delimited or fixed length formats in a transactional manner.
Just as the LineTokenizer
interface is
necessary to take a string and split it into tokens, file writing must
have a way to aggregate multiple fields into a single string for
writing to a file. In Spring Batch this is the
LineAggregator
:
public interface LineAggregator { public String aggregate(FieldSet fieldSet); }
The LineAggregator
is exactly the
opposite of a LineTokenizer
.
LineTokenizer
takes a
String
and returns a
FieldSet
, whereas
LineAggregator
takes a
FieldSet
and returns a
String
. As with reading there are two types:
DelimitedLineAggregator
and
FixedLengthLineAggregator
.
Because the LineAggregator interface uses a
FieldSet
as it's mechanism for converting to a
string, there needs to be an interface that describes how to convert
from an object into a FieldSet
:
public interface FieldSetCreator { FieldSet mapItem(Object data); }
As with LineTokenizer
and
LineAggregator
,
FieldSetCreator
is the polar opposite of
FieldSetMapper
.
FieldSetMapper
takes a
FieldSet
and returns a mapped object, whereas a
FieldSetCreator
takes an Object and returns a
FieldSet
.
Now that both the LineAggregator
and
FieldSetCreator
interfaces have been defined,
the basic flow of writing can be explained:
The object to be written is passed to the
FieldSetCreator
in order to obtain a
FieldSet
.
The returned FieldSet
is passed to
the LineAggregator
The returned String
is written to the
configured file.
The following excerpt from the
FlatFileItemWriter
expresses this in
code:
public void write(Object data) throws Exception { FieldSet fieldSet = fieldSetCreator.mapItem(data); getOutputState().write(lineAggregator.aggregate(fieldSet) + LINE_SEPARATOR); }
A simple configuration with the smallest ammount of setters would look like the following:
<bean id="itemWriter" class="org.springframework.batch.io.file.FlatFileItemWriter"> <property name="resource" value="file:target/test-outputs/20070122.testStream.multilineStep.txt" /> <property name="fieldSetCreator"> <bean class="org.springframework.batch.io.file.mapping.PassThroughFieldSetMapper"/> </property> </bean>
FlatFileItemReader
has a very simple
relationship with file resources. When the reader is initialized, it
opens the file if it exists, and throws an exception if it does not.
File writing isn't quite so simple. At first glance it seems like a
similar straight forward contract should exist for
FlatFileItemWriter
, if the file already exists,
throw an exception, if it does not, create it and start writing.
However, potentially restarting a Job
can cause
issues. In the normal restart scenario, the contract is reversed, if
the file exists start writing to it from the last known good position,
if it does not, throw an exception. However, what happens if the file
name for this job is always the same? In this case, you would want to
delete the file if it exists, unless it's a restart. Because of this
possibility, the FlatFileItemWriter
contains
the property, shouldDeleteIfExists
. Setting
this property to true will cause an existing file with the same name
to be deleted when the writer is opened.
Spring Batch provides transactional infrastructure for both reading XML records and mapping them to Java objects as well as writing Java objects as XML records.
The StAX API is used for I/O as other standard XML parsing APIs do not fit batch processing requirements (DOM loads the whole input into memory at once and SAX controls the parsing process allowing the user only to provide callbacks).
Lets take a closer look how XML input and output works in Spring Batch. First, there are a few concepts that vary from file reading and writing but are common across Spring Batch XML processing. With XML processing instead of lines of records (FieldSets) that need to be tokenized, it is assumed an XML resource is a collection of 'fragments' corresponding to individual records. Note that OXM tools are designed to work with standalone XML documents rather than XML fragments cut out of an XML document, therefore the Spring Batch infrastructure needs to work around this fact, as described below:
The 'trade' tag is defined as the 'root element' in the scenario above. Everything between '<trade>' and '</trade>' is considered one 'fragment'. Spring Batch uses Object/XML Mapping (OXM) to bind fragments to objects. However, Spring Batch is not tied to any particular xml binding technology. Typical use is to delegate to Spring OXM, which provides uniform abstraction for the most popular OXM technologies. The dependency on Spring OXM is optional and you can choose to implement Spring Batch specific interfaces if desired. The relationship to the technologies that OXM supports can be shown as the following:
Now with an introduction to OXM and how one can use XML fragments to represent records, let's take a closer look at Item Readers and Item Writers.
The StaxEventItemReader
configuration
provides a typical setup for the processing of records from an XML input
stream. First, lets examine a set of XML records that the
StaxEventItemReader
can process.
<?xml version="1.0" encoding="UTF-8"?> <records> <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain"> <isin>XYZ0001</isin> <quantity>5</quantity> <price>11.39</price> <customer>Customer1</customer> </trade> <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain"> <isin>XYZ0002</isin> <quantity>2</quantity> <price>72.99</price> <customer>Customer2c</customer> </trade> <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain"> <isin>XYZ0003</isin> <quantity>9</quantity> <price>99.99</price> <customer>Customer3</customer> </trade> </records>
To be able to process the XML records we need the following:
Root Element Name - Name of the root element of the fragment that constitutes the object to be mapped. The example configuration demonstrates this with the value of trade.
Resource - Spring Resource that represents the file to be read.
FragmentDeserializer
- UnMarshalling
facility provided by Spring OXM for mapping the XML fragment to an
object.
<property name="itemReader"> <bean class="org.springframework.batch.io.xml.StaxEventItemReader"> <property name="fragmentRootElementName" value="trade" /> <property name="resource" value="data/staxJob/input/20070918.testStream.xmlFileStep.xml" /> <property name="fragmentDeserializer"> <bean class="org.springframework.batch.io.xml.oxm.UnmarshallingEventReaderDeserializer"> <constructor-arg> <bean class="org.springframework.oxm.xstream.XStreamMarshaller"> <property name="aliases" ref="aliases" /> </bean> </constructor-arg> </bean> </property> </bean> </property>
Notice that in this example we have chosen to use an
XStreamMarshaller
that requires an alias passed
in as a map with the first key and value being the name of the fragment
(i.e. root element) and the object type to bind. Then, similar to a
FieldSet
, the names of the other elements that
map to fields within the object type are described as key/value pairs in
the map. In the configuration file we can use a spring configuration
utility to describe the required alias as follows:
<util:map id="aliases"> <entry key="trade" value="org.springframework.batch.sample.domain.Trade" /> <entry key="isin" value="java.lang.String" /> <entry key="quantity" value="long" /> <entry key="price" value="java.math.BigDecimal" /> <entry key="customer" value="java.lang.String" /> </util:map>
On input the reader reads the XML resource until it recognizes a
new fragment is about to start (by matching the tag name by default).
The reader creates a standalone XML document from the fragment (or at
least makes it appear so) and passes the document to a deserializer
(typically a wrapper around a Spring OXM
Unmarshaller
) to map the XML to a Java
object.
In summary, if you were to see this in scripted code like Java the injection provided by the spring configuration would look something like the following:
StaxEventItemReader xmlStaxEventItemReader = new StaxEventItemReader() Resource resource = new ByteArrayResource(xmlResource.getBytes()) Map aliases = new HashMap(); aliases.put("trade","org.springframework.batch.sample.domain.Trade"); aliases.put("isin","java.lang.String"); aliases.put("quantity","long"); aliases.put("price","java.math.BigDecimal"); aliases.put("customer","java.lang.String"); Marshaller marshaller = new XStreamMarshaller(); marshaller.setAliases(aliases); xmlStaxEventItemReader.setFragmentDeserializer(new UnmarshallingEventReaderDeserializer(marshaller)); xmlStaxEventItemReader.setResource(resource); xmlStaxEventItemReader.setFragmentRootElementName("trade"); xmlStaxEventItemReader.open(new ExecutionContext()); boolean hasNext = true while (hasNext) { trade = xmlStaxEventItemReader.read(); if (trade == null) { hasNext = false; } else { println trade; } }
Output works symmetrically to input. The
StaxEventItemWriter
needs a
Resource
, a serializer, and a rootTagName. A Java
object is passed to a serializer (typically a wrapper around Spring OXM
Marshaller
) which writes to a
Resource
using a custom event writer that filters
the StartDocument
and
EndDocument
events produced for each fragment by
the OXM tools. We'll show this in an example using the
MarshallingEventWriterSerializer
. The Spring
configuration for this setup looks as follows:
<bean class="org.springframework.batch.item.xml.StaxEventItemWriter" id="tradeStaxWriter"> <property name="resource"value="file:target/test-outputs/20070918.testStream.xmlFileStep.output.xml" /> <property name="serializer" ref="tradeMarshallingSerializer" /> <property name="rootTagName" value="trades" /> <property name="overwriteOutput" value="true" /> </bean>
The configuration sets up the three required properties and
optionally sets the overwriteOutput=true, mentioned earlier in the
chapter for specifying whether an existing file can be overwritten. The
TradeMarshallingSerializer
is configured as
follows:
<bean class="org.springframework.batch.item.xml.oxm.MarshallingEventWriterSerializer" id="tradeMarshallingSerializer">
<constructor-arg>
<bean class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases" ref="aliases" />
</bean>
</constructor-arg>
</bean>
To summarize with a Java example, the following code illustrates all of the points discussed, demonstrating the programmatic setup of the required properties.
StaxEventItemWriter staxItemWriter = new StaxEventItemWriter() FileSystemResource resource = new FileSystemResource(File.createTempFile("StaxEventWriterOutputSourceTests", "xml")) Map aliases = new HashMap(); aliases.put("trade","org.springframework.batch.sample.domain.Trade"); aliases.put("isin","java.lang.String"); aliases.put("quantity","long"); aliases.put("price","java.math.BigDecimal"); aliases.put("customer","java.lang.String"); XStreamMarshaller marshaller = new XStreamMarshaller() marshaller.setAliases(aliases) MarshallingEventWriterSerializer tradeMarshallingSerializer = new MarshallingEventWriterSerializer(marshaller) staxItemWriter.setResource(resource) staxItemWriter.setSerializer(tradeMarshallingSerializer) staxItemWriter.setRootTagName("trades") staxItemWriter.setOverwriteOutput(true) ExecutionContext executionContext = new ExecutionContext() staxItemWriter.open(executionContext) Trade trade = new Trade() trade.isin = "XYZ0001" trade.quantity =5 trade.price = 11.39 trade.customer = "Customer1" println trade staxItemWriter.write(trade) staxItemWriter.flush()
For a complete example configuration of XML input and output and a corresponding Job see the sample xmlStaxJob.
Both the XML and Flat File examples above use the Spring
Resource
abstraction to obtain the file to read or
write from. This works because Resource
has a
getFile method, that returns a
java.io.File
. Both XML and Flat File resources can
be configured using standard Spring constructs:
<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="file://outputs/20070122.testStream.CustomerReportStep.TEMP.txt" /> </bean>
The above Resource
will load the file from
the file system, at the location specificied. Note that absolute locations
have to start with a double slash ("//"). In most spring applications,
this solution is good enough because the names of these are known at
compile time. However, in batch scenarios, the file name may need to be
determined at runtime as a parameter to the job. This could be solved
using '-D' parameters, i.e. a system property:
<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="${input.file.name}" /> </bean>
All that would be required for this solution to work would be a
system argument (-Dinput.file.name="file://file.txt"). (Note that although
a PropertyPlaceholderConfigurer
can be used here,
it is not necessary if the system property is always set because the
ResourceEditor
in Spring already filters and does
placeholder replacement on system properties.)
Often in a batch setting it is preferable to parameterize the file
name in the JobParameters
of the job, instead of
through system properties, and access them that way. To allow for this,
Spring Batch provides the
StepExecutionResourceProxy
. The proxy can use
either job name, step name, or any values from the
JobParameters
, by surrounding them with %:
<bean id="inputFile" class="org.springframework.batch.core.resource.StepExecutionResourceProxy" /> <property name="filePattern" value="//%JOB_NAME%/%STEP_NAME%/%file.name%" /> </bean>
Assuming a job name of 'fooJob', and a step name of 'fooStep', and
the key-value pair of 'file.name="fileName.txt"' is in the
JobParameters
the job is started with, the
following filename will be passed as the Resource
:
"//fooJob/fooStep/fileName.txt
". It should be noted
that in order for the proxy to have access to the
StepExecution
, it must be registered as a
StepListener
:
<bean id="fooStep" parent="abstractStep" p:itemReader-ref="itemReader" p:itemWriter-ref="itemWriter"> <property name="listeners" ref="inputFile" /> </bean>
The StepListener
interface will be discussed
in more detail in Chapter 4. For now, it is sufficient to know that the
proxy must be registered.
Like most enterprise application styles, a database is the central
storage mechanism for batch. However, batch differs from other application
styles due to the sheer size of the datasets that must be worked with. The
Spring Core JdbcTemplate
illustrates this problem
well. If you use JdbcTemplate
with a
RowMapper
, the RowMapper
will be called once for every result returned from the provided query.
This causes few issues in scenarios where the dataset is small, but the
large datasets often necessary for batch processing would cause any JVM to
crash quickly. If the sql statement returns 1 million rows, the
RowMapper
will be called 1 million times. Spring
Batch provides two types of solutions for this problem: Cursor and
DrivingQuery ItemReaders.
Using a database cursor is generally the default approach of most
batch developers. This is because it is the database's solution to the
problem of 'streaming' relational data. The Java
ResultSet
class is essentially an object
orientated mechanism for manipulating a cursor. A
ResultSet
maintains a cursor to the current row
of data. Calling next
on a
ResultSet
moves this cursor to the next row.
Spring Batch cursor based ItemReaders open the a cursor on
initialization, and move the cursor forward one row for every call to
read
, returning a mapped object that can be
used for processing. The close
method will then
be called to ensure all resources are freed up. The Spring core
JdbcTemplate
gets around this problem by using
the callback pattern to completely map all rows in a
ResultSet
and close before returning control back
to the method caller. However, in batch this must wait until the step is
complete. Below is a generic diagram of how a cursor based
ItemReader
works, and while a SQL statement is
used as an example since it is so widely known, any technology could
implement the basic approach:
The example illustrates the basic pattern. Given a 'FOO' table, which has three columns: ID, NAME, and BAR, select all rows with an ID greater than one but less than 7. This puts the beginning of the cursor (row 1) on ID 2. The result of this row should be a completely mapped Foo object, calling read() again, moves the cursor to the next row, which is the Foo with an ID of 3.
JdbcCursorItemReader
is the JDBC
implementation of the cursor based technique. It works directly with a
ResultSet
and requires a SQL statement to run
against a connection obtained from a
DataSource
. The following database schema will
be used as an example:
CREATE TABLE CUSTOMER ( ID BIGINT IDENTITY PRIMARY KEY, NAME VARCHAR(45), CREDIT FLOAT );
Many people prefer to use a domain object for each row, so we'll
use an implementation of the RowMapper
interface to map a CustomerCredit
object:
public class CustomerCreditRowMapper implements RowMapper { public static final String ID_COLUMN = "id"; public static final String NAME_COLUMN = "name"; public static final String CREDIT_COLUMN = "credit"; public Object mapRow(ResultSet rs, int rowNum) throws SQLException { CustomerCredit customerCredit = new CustomerCredit(); customerCredit.setId(rs.getInt(ID_COLUMN)); customerCredit.setName(rs.getString(NAME_COLUMN)); customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN)); return customerCredit; } }
Because JdbcTemplate
is so familiar to
users of Spring, and the JdbcCursorItemReader
shares key interfaces with it, it's useful to see an example of how to
read in this data with JdbcTemplate
, in order
to contrast it with the item reader. For the purposes of this example,
let's assume there are 1,000 rows in the CUSTOMER database. The first
example will be using JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER", new CustomerCreditRowMapper());
After running this code snippet the customerCredits list will
contain 1,000 CustomerCredit
objects. In the
query method, a connection will be obtained from the
DataSource
, the provided SQL will be run
against it, and the mapRow
method will be
called for each row in the ResultSet
. Let's
constrast this with the approach of the
JdbcCursorItemReader
:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader(); itemReader.setDataSource(dataSource); itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER"); itemReader.setMapper(new CustomerCreditRowMapper()); int counter = 0; ExecutionContext executionContext = new ExecutionContext(); itemReader.open(executionContext); Object customerCredit = new Object(); while(customerCredit != null){ customerCredit = itemReader.read(); counter++; } itemReader.close(executionContext);
After running this code snippet the counter will equal 1,000. If
the code above had put the returned customerCredit into a list, the
result would have been exactly the same as with the
JdbcTemplate
example. However, the big
advantage of the ItemReader
is that it allows
items to be 'streamed'. The read
method can
be called once, and the item written out via an
ItemWriter
, and then the next item obtained via
read
. This allows item reading and writing to
be done in 'chunks' and committed periodically, which is the essence
of high performance batch processing.
Because there are so many varying options for opening a cursor
in Java, there are many properties on the
JdbcCustorItemReader
that can be set:
Table 3.2. JdbcCursorItemReader Properties
ignoreWarnings | Determines whether or not SQLWarnings are logged or cause an exception - default is true |
fetchSize | Gives the JDBC driver a hint as to the number of rows
that should be fetched from the database when more rows are
needed by the ResultSet object used
by the ItemReader. By default, no hint is given. |
maxRows | Sets the limits for the maximum number of rows the
underlying ResultSet can hold at any
one time. |
queryTimeout | Sets the number of seconds the driver will wait for a
Statement object to execute to the given number of seconds.
If the limit is exceeded, a
DataAccessEception is thrown.
(consult your driver vendor documentation for
details). |
verifyCursorPosition | Because the same ResultSet
held by the ItemReader is passed to the
RowMapper , it's possible for users to
call ResultSet.next() themselves, which could cause issues
with the reader's internal count. Settings this value to
true will cause an exception to be thrown if the cursor
position is not the same after the
RowMapper call as it was
before. |
saveState | Indicates whether or not the reader's state should be saved in the ExecutionContext provided by ItemStream#update(ExecutionContext) The default value is false. |
Just as normal Spring users make important decisions about
whether or not to use ORM solutions, which affects whether or not they
use a JdbcTemplate
or a
HibernateTemplate
, Spring Batch users have the
same options. HibernateCursorItemReader
is the
Hibernate implementation of the cursor technique. Hibernate's usage in
batch has been fairly controversial. This has largely been because
hibernate was originally developed to support online application
styles. However, that doesn't mean it can't be used for batch
processing. The easiest approach for solving this problem is to use a
StatelessSession
rather than a standard
session. This removes all of the caching and dirty checking hibernate
employs that can cause issues when using it in a batch scenario. For
more information on the differences between stateless and normal
hibernate sessions, refer to the documentation of your specific
hibernate release. The
HibernateCursorItemReader
allows you to declare
an HQL statement and pass in a SessionFactory
,
which will pass back one item per call to
read
in the same basic fashion as the
JdbcCursorItemReader
. Below is an example
configuration using the same 'customer credit' example as the JDBC
reader:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader(); itemReader.setQueryString("from CustomerCredit"); //For simplicity sake, assume sessionFactory already obtained. itemReader.setSessionFactory(sessionFactory); itemReader.setUseStatelessSession(true); int counter = 0; ExecutionContext executionContext = new ExecutionContext(); itemReader.open(executionContext); Object customerCredit = new Object(); while(customerCredit != null){ customerCredit = itemReader.read(); counter++; } itemReader.close(executionContext);
This configured ItemReader
will return
CustomerCredit
objects in the exact same manner
as described by the JdbcCursorItemReader
,
assuming hibernate mapping files have been created correctly for the
Customer table. The 'useStatelessSession' property defaults to true,
but has been added here to draw attention to the ability to switch it
on or off.
In the previous section, Cursor based database input was discussed. However, it isn't the only option. Many database vendors, such as DB2, have extremely pessimistic locking strategies that can cause issues if the table being read also needs to be used by other portions of the online application. Furthermore, opening cursors over extremely large datasets can cause issues on certain vendors. Therefore, many projects prefer to use a 'Driving Query' approach to reading in data. This approach works by iterating over keys, rather than the entire object that needs to be returned, as the following example illustrates:
As you can see, this example uses the same 'FOO' table as was used
in the cursor based example. However, rather than selecting the entire
row, only the ID's were selected in the SQL statement. So, rather than a
FOO object being returned from read
, an Integer
will be returned. This number can then be used to query for the
'details', which is a complete Foo object:
As you can see, an existing DAO can be used to obtain a full 'Foo'
object using the key obtained from the driving query. In Spring Batch,
driving query style input is implemented with a
DrivingQueryItemReader
, which has only one
dependency: a KeyCollector
As the previous example illustrates, the DrivingQueryItemReader
is fairly simple. It simply iterates over a list of keys. However, the
real complication is how those keys are obtained. The
KeyCollector
interface abstracts this:
public interface KeyCollector { List retrieveKeys(ExecutionContext executionContext); void updateContext(Object key, ExecutionContext executionContext); }
The primary method in this interface is the
retrieveKeys
method. It is expected that this
method will return the keys to be processed regardless of whether or
not it is a restart scenario. For example, if a job starts processing
keys 1 through 1,000, and fails after processing key 500, upon
restarting keys 500 through 1,000 should be returned. This
functionality is made possible by the
updateContext
method, which saves the
provided key (which should be the current key being processed) in the
provided ExecutionContext
. The
retrieveKeys
method can then use this value
to retrieve a subset of the original keys:
ExecutionContext executionContext = new ExecutionContext(); List keys = keyStrategy.retrieveKeys(executionContext); //Assume keys contains 1 through 1,000 keyStrategy.updateContext(new Long(500), executionContext); keys = keyStrategy.retrieveKeys(executionContext); //keys should now contains 500 through 1,000
This generalization illustrates the
KeyCollector
contract. If we assume that
initially calling retrieveKeys
returned 1,000
keys (1 through 1,000), calling updateContext
with key 500 should mean that calling
retrieveKeys
again with the same
ExecutionContext
will return 500 keys (501
through 1,000).
The most common driving query scenario is that of input that has
only one column that represents its key. This is implemented as the
SingleColumnJdbcKeyCollector
class, which has
the following options:
Table 3.3. SinglecolumnJdbcKeyCollector properties
jdbcTemplate | The JdbcTemplate to be used to query the database |
sql | The sql statement to query the database with. It should return only one value. |
restartSql | The sql statement to use in the case of restart. Because only one key will be used, this query should require only one argument. |
keyMapper | The RowMapper implementation to be used to map the keys to objects. By default, this is a Spring Core SingleColumnRowMapper, which maps them to well known types such as Integer, String, etc. For more information, check the documentation of your specific Spring release. |
The following code helps illustrate how to setup and use a
SingleColumnJdbcKeyCollector
:
SingleColumnJdbcKeyCollector keyCollector = new SingleColumnJdbcKeyCollector(getJdbcTemplate(), "SELECT ID from T_FOOS order by ID"); keyCollector.setRestartSql("SELECT ID from T_FOOS where ID > ? order by ID"); ExecutionContext executionContext = new ExecutionContext(); List keys = keyStrategy.retrieveKeys(new ExecutionContext()); for (int i = 0; i < keys.size(); i++) { System.out.println(keys.get(i)); }
If this code were run in the proper environment with the correct database tables setup, then it would output the following:
1 2 3 4 5
Now, let's modify the code slightly to show what would happen if the code were started again after a restart, having failed after processing key 3 successfully:
SingleColumnJdbcKeyCollector keyCollector = new SingleColumnJdbcKeyCollector(getJdbcTemplate(), "SELECT ID from T_FOOS order by ID"); keyCollector.setRestartSql("SELECT ID from T_FOOS where ID > ? order by ID"); ExecutionContext executionContext = new ExecutionContext(); keyStrategy.updateContext(new Long(3), executionContext); List keys = keyStrategy.retrieveKeys(executionContext); for (int i = 0; i < keys.size(); i++) { System.out.println(keys.get(i)); }
Running this code snippet would result in the following:
4 5
The key difference between the two examples is the following line:
keyStrategy.updateContext(new Long(3), executionContext);
This tells the key collector to update the provided
ExecutionContext
with the key of three. This
will normally be called by the
DrivingQueryItemReader
, but is called directly
for simplicities sake. By calling
retrieveKeys
with the
ExecutionContext
that was updated to contain 3,
the argument of 3 will be passed to the restartSql:
keyCollector.setRestartSql("SELECT ID from T_FOOS where ID > ? order by ID");
This will cause only keys 4 and 5 to be returned, since they are the only ones with an ID greater than 3.
The SingleColumnJdbcKeyCollector
is
extremely useful for generating keys, but only if one column uniquely
identifies your record. What if more than one column is required to be
able to uniquely identify your record? This should be a minority
scenario, but it is still possible. In this case, the
MultipleColumnJdbcKeyCollector
should be used.
It allows for mapping multiple columns by sacrificing simplicity. The
properties needed to use the multiple column collector are the same as
the single column version except one difference: instead of a regular
RowMaper
, an
ExecutionContextRowMapper
must be provided.
Just like the single column version, it requires a normal SQL
statement and a restart SQL statement. However, because the restart
SQL statement will require more than one argument, there needs to be
more complex handling of how keys are mapped to an execution context.
An ExecutionContextRowMapper
provides
this:
public interface ExecutionContextRowMapper extends RowMapper { public void mapKeys(Object key, ExecutionContext executionContext); public PreparedStatementSetter createSetter(ExecutionContext executionContext); }
The ExecutionContextRowMapper
interface
extends the standard RowMapper
interface to
allow for multiple keys to be stored in an
ExecutionContext
, and a
PreparedStatementSetter
be created so that
arguments to a the restart SQL statement can be set for the key
returned.
By default a implementation of the
ExecutionContextRowMapper
that uses a
Map
will be used. It is recommended that this
implementation not be overridden. However, if a specific type of key
needs to be returned, then a new implementation can be
provided.
Jdbc is not the only option available for key collectors, iBatis
can be used as well. The usage of iBatis doesn't change the basic
requirements of a KeyCollector
: query, restart
query, and DataSource
. However, because iBatis
is used, both queries are simply iBatis query ids, and the data source
is a SqlMapClient
.
While both Flat Files and XML have specific ItemWriters, there is
no exact equivalent in the database world. This is because transactions
provide all the functionality that is needed. ItemWriters are necessary
for files because they must act as if they're transactional, keeping
track of written items and flushing or clearing at the appropriate
times. Databases have no need for this functionality, since the write is
already contained in a transaction. Users can create their own DAO's
that implement the ItemWriter
interface or use
one from a custom ItemWriter
that's written for
generic processing concerns, either way, they should work without any
issues. The one exception to this is buffered output. This is most
common when using hibernate as an ItemWriter
, but
could have the same issues when using Jdbc batch mode. Buffering
database output doesn't have any inherent flaws, assuming there are no
errors in the data. However, any errors while writing out can cause
issues because there is no way to know which individual item caused an
exception. An example would be a record that causes a
DataIntegrityViolationException, perhaps because of a primary key
violation. If items are buffered before being written out, this error
will not be thrown until the buffer is flushed just before a commit. For
example, let's assume that 20 items will be written per chunk, and the
15th item throws a DataIntegrityViolationException. As far as the Step
is concerned, all 20 item will be written out successfully, since
there's no way to know that an error will occur until they are actually
written out. Once
ItemWriter#
flush
() is
called, the buffer will be emptied and the exception will be hit. At
this point, there's nothing the Step
can do, the
transaction must be rolled back. Normally, this exception will cause the
Item to be skipped (depending upon the skip/retry policies), and then it
won't be written out again. However, in this scenario, there's no way
for it to know which item caused the issue, the whole buffer was being
written out when the failure happened. Because this is a common enough
use case, especially when using Hibernate, Spring Batch provides an
implementation to help: HibernateAwareItemWriter
.
The HibernateAwareItemWriter
solves the problem
in a straightforward way: if a chunk fails the first time, on subsequent
runs it will be flushed after after each time. This effectively lowers
the commit interval to one for the length of the chunk. Doing so allows
for items to be skipped reliably. The following example illustrates how
to configure the HibernateAwareItemWriter
:
<bean id="hibernateItemWriter" class="org.springframework.batch.item.database.HibernateAwareItemWriter"> <property name="sessionFactory" ref="sessionFactory" /> <property name="delegate" ref="customerCreditWriter" /> </bean> <bean id="customerCreditWriter" class="org.springframework.batch.sample.dao.HibernateCreditDao"> <property name="sessionFactory" ref="sessionFactory" /> </bean>
Batch systems are often used in conjunction with other application
styles. The most common is an online system, but it may also support
integration or even a thick client application by moving necessary bulk
data that each application style uses. For this reason, it is common that
many users want to reuse existing DAOs or other services within their
batch jobs. The Spring container itself makes this fairly easy by allowing
any necessary class to be injected. However, there may be cases where the
existing service needs to act as an ItemReader
or
ItemWriter
, either to satisfy the dependency of
another Spring Batch class, or because it truly is the main
ItemReader
for a step. It's fairly trivial to write
an adaptor class for each service that needs wrapping, but because it's
such a common concern, Spring Batch provides implementations:
ItemReaderAdapter
and
ItemWriterAdapter
. Both classes implement the
standard Spring method invoking delegator pattern and are fairly simple to
set up. Below is an example of the reader:
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter"> <property name="targetObject" ref="fooService" /> <property name="targetMethod" value="generateFoo" /> </bean> <bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
One important point to note is that the contract of the targetMethod
must be the same as the contract for read
. That
is, when exhausted it will return null, otherwise an
Object
. Anything else will prevent the framework
from correctly knowing when processing should end, either causing an
infinite loop or incorrect failure, depending upon the implementation of
the ItemWriter
. The
ItemWriter
implementation is equally as
simple:
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter"> <property name="targetObject" ref="fooService" /> <property name="targetMethod" value="processFoo" /> </bean> <bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
The ItemReader
and
ItemWriter
interfaces have been discussed in detail
in this chapter, but what if you want to insert business logic before
writing? One option for both reading and writing is to use the composite
pattern. That is, create an ItemWriter
that
contains another ItemWriter
, or an
ItemReader
that contains another
ItemReader
. For example:
public class CompositeItemWriter implements ItemWriter { ItemWriter itemWriter; public CompositeItemWriter(ItemWriter itemWriter) { this.itemWriter = itemWriter; } public void write(Object item) throws Exception { //Add business logic here itemWriter.write(item); } public void clear() throws ClearFailedException { itemWriter.clear(); } public void flush() throws FlushFailedException { itemWriter.flush(); } }
The class above contains another ItemWriter
that it delgates to after having provided some business logic. It should
be noted that the clear
and
flush
methods must be propogated as well so that
the delegate ItemWriter
is notified. This pattern
could easily be used for an ItemReader
as well,
perhaps to obtain more reference data based upon the input that was
provided by the main ItemReader
. This pattern is
very useful if you need to control the call to
write
yourself. However, if you only want to
'transform' the item passed in for writing before it is actual written,
there isn't much need to call write
yourself, you
just want to modify the item. For this scenario, Spring Batch provides the
ItemTransformer
interface:
public interface ItemTransformer { Object transform(Object item) throws Exception; }
An ItemTransformer is very simple, given one object, transorm it and
return another. The object provided may or may not be of the same type.
The point is that business logic may be applied within transform, and is
completely up to the developer to create. An
ItemTransformer
is used as part of the
ItemTransformerItemWriter
, which accepts an
ItemWriter
and an
ItemTransformer
, passing the item first to the
transformer, before writing it. For example, assuming an
ItemReader
provides a class of type Foo, and it
needs to be converted to type Bar before being written out. An
ItemTransformer
can be written that performs the
conversion:
public class Foo {} public class Bar { public Bar(Foo foo) {} } public class FooTransformer implements ItemTransformer{ //Preform simple transformation, convert a Foo to a Barr public Object transform(Object item) throws Exception { assertTrue(item instanceof Foo); Foo foo = (Foo)item; return new Bar(foo); } } public class BarWriter implements ItemWriter{ public void write(Object item) throws Exception { assertTrue(item instanceof Bar); } //rest of class ommitted for clarity }
In the very simple example above, there is a class
Foo
, a class Bar
, and a
class FooTransformer
that adheres to the
ItemTransformer
interface. The transformation is
simple, but any type of transformation could be done here. The
BarWriter
will be used to write out 'Bars',
throwing an exception if any other type is provided. Similarly, the
FooTransformer will throw an exception if anything but a
Foo
is provided. An
ItemTransformerItemWriter
can then be used like a
normal ItemWriter. It will be passed a Foo
for
writing, which will be passed to the transformer, and a
Bar
returned. The resulting
Bar
will then be written:
ItemTransformerItemWriter itemTransformerItemWriter = new ItemTransformerItemWriter(); itemTransformerItemWriter.setItemTransformer(new FooTransformer()); itemTransformerItemWriter.setDelegate(new BarWriter()); itemTransformerItemWriter.write(new Foo());
Note that the ItemTransformerItemWriter
and the CompositeItemWriter
are examples of a
delegation pattern, which is common in Spring Batch. The delegates
themselves might implement callback interfaces like
ItemStream
or
StepListener
. If they do, and they are being
used in conjunction with Spring Batch Core as part of a
Step
in a Job
, then they
almost certainly need to be registered manually with the
Step
. Registration is automatic when using the
factory beans (*StepFactoryBean
) , but only for
the ItemReader
and
ItemWriter
injected directly. The delegates are
not known to the Step
, so they need to be
injected as listeners or streams (or both if appropriate).
Performing a single transformation is useful in many scenarios,
but what if you want to 'chain' together multiple ItemTransformers? This
can be accomplished using a
CompositeItemTransformer
. To update the previous,
single transformation, example, Foo
will be
Transformed to Bar
, which will be transformed to
Foobar
and written out:
public class Foo {} public class Bar { public Bar(Foo foo) {} } public class Foobar{ public Foobar(Bar bar){} } public class FooTransformer implements ItemTransformer{ //Preform simple transformation, convert a Foo to a Barr public Object transform(Object item) throws Exception { assertTrue(item instanceof Foo); Foo foo = (Foo)item; return new Bar(foo); } } public class BarTransformer implements ItemTransformer{ public Object transform(Object item) throws Exception { assertTrue(item instanceof Bar); return new Foobar((Bar)item); } } public class FoobarWriter implements ItemWriter{ public void write(Object item) throws Exception { assertTrue(item instanceof Foobar); } //rest of class ommitted for clarity }
A FooTransformer
and
BarTransformer
can be 'chained' together to give
the resultant Foobar
:
CompositeItemTransformer compositeTransformer = new CompositeItemTransformer(); List itemTransformers = new ArrayList(); itemTransformers.add(new FooTransformer()); itemTransformers.add(new BarTransformer()); compositeTransformer.setItemTransformers(itemTransformers);
The compositeTransformer could be said to accept a
Foo
and return a Foobar
.
Clients of the composite transformer don't need to know that there are
actually two separate transformations taking place. By updating the
example from above to use the composite transformer, the correct class
can be passed to FoobarWriter
:
ItemTransformerItemWriter itemTransformerItemWriter = new ItemTransformerItemWriter();
itemTransformerItemWriter.setItemTransformer(compositeTransformer);
itemTransformerItemWriter.setDelegate(new FoobarWriter());
itemTransformerItemWriter.write(new Foo());
During the course of this chapter, multiple approaches to parsing
input have been discussed. Each major implementation will throw exception
if it is not 'well-formed'. The
FixedLengthTokenizer
will throw an exception if a
range of data is missing. Similarly, attempting to access an index in a
RowMapper
of FieldSetMapper
that doesn't exist or is in a different format than the one expected will
cause an exception to be thrown. All of these types of exceptions will be
thrown before read
returns. However, they don't
address the issue of whether or not the returned item is valid. For
example, if one of the fields is an age, it obviously cannot be negative.
It will parse correctly, because it existed and is a number, but it won't
cause an exception. Since there are already a plethora of Validation
frameworks, Spring Batch does not attempt to provide yet another, but
rather provides a very simple interface that can be implemented by any
number of frameworks:
public interface Validator { void validate(Object value) throws ValidationException; }
The contract is that the validate
method
will throw an exception if the object is invalid, and return normally if
it is valid. Spring Batch provides an out of the box
ItemReader
that delegates to another
ItemReader
and validates the returned item:
<bean class="org.springframework.batch.item.validator.ValidatingItemReader"> <property name="itemReader"> <bean class="org.springframework.batch.sample.item.reader.OrderItemReader" /> </property> <property name="validator" ref="validator" /> </bean> <bean id="validator" class="org.springframework.batch.item.validator.SpringValidator"> <property name="validator"> <bean id="orderValidator" class="org.springmodules.validation.valang.ValangValidator"> <property name="valang"> <value> <![CDATA[ { orderId : ? > 0 AND ? <= 9999999999 : 'Incorrect order ID' : 'error.order.id' } { totalLines : ? = size(lineItems) : 'Bad count of order lines' : 'error.order.lines.badcount'} { customer.registered : customer.businessCustomer = FALSE OR ? = TRUE : 'Business customer must be registered' : 'error.customer.registration'} { customer.companyName : customer.businessCustomer = FALSE OR ? HAS TEXT : 'Company name for business customer is mandatory' :'error.customer.companyname'} ]]> </value> </property> </bean> </property> </bean>
This simple example shows a simple
ValangValidator
that is used to validate an order
object. The intent is not to show Valang functionality as much as to show
how a validator could be added.
Note that the ValidatingItemReader
is
another example of a delegation pattern, and the delegates themselves
might implement callback interfaces like
ItemStream
or
StepListener
. If they do, and they are being
used in conjunction with Spring Batch Core as part of a step in a job,
then they almost certainly need to be registered manually with the
Step
. Registration is automatic when using the
factory beans (*StepFactoryBean
) , but only for
the ItemReader
and
ItemWriter
injected directly - the delegates
are not known to the step, so they need to be injected as listeners or
streams (or both if appropriate).
So far in this chapter the basic contracts that exist for reading
and writing in Spring Batch and some common implementations have been
discussed. However, these are all fairly generic, and there are many
potential scenarios that may not be covered by out of the box
implementations. This section will show, using a simple example, how to
create a custom ItemReader
and
ItemWriter
implementation and implement their
contracts correctly. The ItemReader
will also
implement ItemStream
, in order to illustrate how to
make a reader or writer restartable.
For the purpose of this example, a simple
ItemReader
implementation that reads from a
provided list will be created. We'll start out by implementing the most
basic contract of ItemReader
,
read
:
public class CustomItemReader implements ItemReader{ List items; public CustomItemReader(List items) { this.items = items; } public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException { if (!items.isEmpty()) { return items.remove(0); } return null; } public void mark() throws MarkFailedException { }; public void reset() throws ResetFailedException { }; }
This very simple class takes a list of items, and returns one at a
time, removing it from the list. When the list empty, it returns null,
thus satisfying the most basic requirements of an
ItemReader
, as illustrated below:
List items = new ArrayList(); items.add("1"); items.add("2"); items.add("3"); ItemReader itemReader = new CustomItemReader(items); assertEquals("1", itemReader.read()); assertEquals("2", itemReader.read()); assertEquals("3", itemReader.read()); assertNull(itemReader.read());
This most basic ItemReader
will work, but
what happens if the transaction needs to be rolled back? This will
usually caused by an error in the ItemWriter, since the ItmReader
generally won't do anything that invalidates the transaction, but
without supporting it, there would be erroneous results. ItemReaders
are notified about rollbacks via the mark
and
reset
methods. In the example above they're
empty, but we'll need to add code to them in order to support the
rollback scenario:
public class CustomItemReader implements ItemReader{ List items; int currentIndex = 0; int lastMarkedIndex = 0; public CustomItemReader(List items) { this.items = items; } public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException { if (currentIndex < items.size()) { return items.get(currentIndex++); } return null; } public void mark() throws MarkFailedException { lastMarkedIndex = currentIndex; }; public void reset() throws ResetFailedException { currentIndex = lastMarkedIndex; }; }
The CustomItemReader
has now been
modified to keep track of where it is currently, and where it was when
mark() was last called. This allows the new
ItemReader
to fulfill the basic contract that
calling reset
returns the
ItemReader
to the state it was in when
mark
was last called:
//Assume same setup as last example, a list with "1", "2", and "3" itemReader.mark(); assertEquals("1", itemReader.read()); assertEquals("2", itemReader.read()); itemReader.reset(); assertEquals("1", itemReader.read());
In most real world scenarios, there will likely be some kind of
underlying resource that will require tracking. In the case of a file,
mark
will hold the current location within
the file, and reset
will move it back. The
JdbcCursorItemReader
, for example, holds on to
the current row number, and on reset moves the cursor back by calling
the ResultSet
absolute
method, which moves the current
cursor to the row number supplied. The
CustomItemReader
now completely adheres to the
entire ItemReader
contract.
read
will return the appropriates items,
returning null when empty, and reset
returns
the ItemReader
back to it's state as of the
last call to mark
, allowing for correct
support of a rollback. (It's assumed a Step
implementation will call mark
and
reset
).
The final challenge now is to make the
ItemReader
restartable. Currently, if the power
goes out, and processing begins again, the
ItemReader
must start at the beginning. This is
actually valid in many scenarios, but it is sometimes preferable that
a batch job starts off at where it left off. The key discriminant is
often whether the reader is stateful or stateless. A stateless reader
does not need to worry about restartablility, but a stateful one has
to try and reconstitute its last known state on restart. For this
reason, we recommend that you keep custom readers stateless as far as
possible, so you don't have to worry about restartability.
If you do need to store state, then in Spring Batch, this is
implemented with the ItemStream
interface:
public class CustomItemReader implements ItemReader, ItemStream{ List items; int currentIndex = 0; int lastMarkedIndex = 0; private static String CURRENT_INDEX = "current.index"; public CustomItemReader(List items) { this.items = items; } public Object read() throws Exception, UnexpectedInputException, NoWorkFoundException, ParseException { if (currentIndex < items.size()) { return items.get(currentIndex++); } return null; } public void mark() throws MarkFailedException { lastMarkedIndex = currentIndex; }; public void reset() throws ResetFailedException { currentIndex = lastMarkedIndex; } public void open(ExecutionContext executionContext) throws ItemStreamTException { if(executionContext.containsKey(CURRENT_INDEX)){ currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue(); } else{ currentIndex = 0; lastMarkedIndex = 0; } } public void update(ExecutionContext executionContext) throws ItemStreamException { executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue()); }; public void close(ExecutionContext executionContext) throws ItemStreamException {} }
On each call to ItemStream
update
method, the current index of the
ItemReader
will be stored in the provided
ExecutionContext
with a key of 'current.index'.
When the ItemStream
open
method is called, the ExecutionContext
is
checked to see if it contains an entry with that key, and if so the
current index is moved to that location. This is a fairly trivial
example, but it still meets the general contract:
ExecutionContext executionContext = new ExecutionContext(); ((ItemStream)itemReader).open(executionContext); assertEquals("1", itemReader.read()); ((ItemStream)itemReader).update(executionContext); List items = new ArrayList(); items.add("1"); items.add("2"); items.add("3"); itemReader = new CustomItemReader(items); ((ItemStream)itemReader).open(executionContext); assertEquals("2", itemReader.read());
Most ItemReaders have much more sophisticated restart logic. The
DrivingQueryItemReader
, for example, only loads
up the remaining keys to be processed, rather than loading all of them
and then moving to the correct index.
It is also worth noting that the key used within the
ExecutionContext
should not be trivial. That is
because the same ExecutionContext
is used for
all ItemStreams within a Step
. In most cases,
simply prepending the key with the class name should be enough to
guarantee uniqueness. However, in the rare cases where two of the same
type of ItemStream
are used in the same step
(which can happen if two files are need for output) then a more unique
name will be needed. For this reason, many of the Spring Batch
ItemReader and ItemWriters have a setName() property that allows this
key name to be overridden.
Implementing a Custom ItemWriter
is similar
in many ways to the ItemReader
example above, but
differs in enough ways as to warrant its own example. However, adding
restartability is essentially the same, so it won't be covered in this
example. As with the ItemReader
example, a List
will be used in order to keep the example as simple as possible:
public class CustomItemWriter implements ItemWriter{ List output = new ArrayList(); public void write(Object item) throws Exception { output.add(item); } public void clear() throws ClearFailedException { } public void flush() throws FlushFailedException { } }
The example is extremely simple, but it's worth showing to
illustrate an ItemWriter
that doesn't respond
to rollbacks and commits (i.e. clear
and
flush
). If your potential writer is such that
it doesn't need to care about rollback or commit, likely because it's
writing to a database, then there is little value to the
ItemWriter
interface in that scenario other
than using it to meet another class's requirement for an
implementation of the ItemWriter
interface. In
that case, the ItemWriterAdapter
would be a
better solution. However, if it does need to be transactional, then
flush
and clear
should be implemented to allow for a buffering solution:
public class CustomItemWriter implements ItemWriter{ List output = new ArrayList(); List buffer = new ArrayList(); public void write(Object item) throws Exception { buffer.add(item); } public void clear() throws ClearFailedException { buffer.clear(); } public void flush() throws FlushFailedException { for(Iterator it = buffer.iterator(); it.hasNext();){ output.add(it.next()); it.remove(); } } }
The ItemWriter
buffers all output, only
writing to the actual output (in this case by added to a list) when
the ItemWriter
flush()
method is called. The contents of the buffer are thrown away when
ItemWriter
clear
() is
called.
To make the ItemWriter restartable we would follow the same
process as for the ItemReader
, adding and
implementing the ItemStream
interface to
synchronize the execution context. In the example we might have to
count the number of items processed and add that as a footer record.
If we needed to do that, we could implement
ItemStream
in our
ItemWriter
so that the counter was
reconstituted from the execution context if the stream was
re-opened.
In many realistic cases, custom ItemWriters also delegate to
another writer that itself is restartable (e.g. when writing to a
file), or else it writes to a transactional resource so doesn't need
to be restartable because it is stateless. When you have a stateful
writer you should probably also be sure to implement
ItemStream
as well as
ItemWriter
. Remember also that the client of
the writer needs to be aware of the ItemStream
,
so you may need to register it with a factory bean (e.g. one of the
StepFactoryBean
implementations in Spring Batch
Core).