Data Buffers and Codecs
Java NIO provides ByteBuffer
but many libraries build their own byte buffer API on top,
especially for network operations where reusing buffers and/or using direct buffers is
beneficial for performance. For example Netty has the ByteBuf
hierarchy, Undertow uses
XNIO, Jetty uses pooled byte buffers with a callback to be released, and so on.
The spring-core
module provides a set of abstractions to work with various byte buffer
APIs as follows:
-
DataBufferFactory
abstracts the creation of a data buffer. -
DataBuffer
represents a byte buffer, which may be pooled. -
DataBufferUtils
offers utility methods for data buffers. -
Codecs decode or encode data buffer streams into higher level objects.
DataBufferFactory
DataBufferFactory
is used to create data buffers in one of two ways:
-
Allocate a new data buffer, optionally specifying capacity upfront, if known, which is more efficient even though implementations of
DataBuffer
can grow and shrink on demand. -
Wrap an existing
byte[]
orjava.nio.ByteBuffer
, which decorates the given data with aDataBuffer
implementation and that does not involve allocation.
Note that WebFlux applications do not create a DataBufferFactory
directly but instead
access it through the ServerHttpResponse
or the ClientHttpRequest
on the client side.
The type of factory depends on the underlying client or server, for example,
NettyDataBufferFactory
for Reactor Netty, DefaultDataBufferFactory
for others.
DataBuffer
The DataBuffer
interface offers similar operations as java.nio.ByteBuffer
but also
brings a few additional benefits some of which are inspired by the Netty ByteBuf
.
Below is a partial list of benefits:
-
Read and write with independent positions, i.e. not requiring a call to
flip()
to alternate between read and write. -
Capacity expanded on demand as with
java.lang.StringBuilder
. -
Pooled buffers and reference counting via
PooledDataBuffer
. -
View a buffer as
java.nio.ByteBuffer
,InputStream
, orOutputStream
. -
Determine the index, or the last index, for a given byte.
PooledDataBuffer
As explained in the Javadoc for ByteBuffer, byte buffers can be direct or non-direct. Direct buffers may reside outside the Java heap which eliminates the need for copying for native I/O operations. That makes direct buffers particularly useful for receiving and sending data over a socket, but they’re also more expensive to create and release, which leads to the idea of pooling buffers.
PooledDataBuffer
is an extension of DataBuffer
that helps with reference counting which
is essential for byte buffer pooling. How does it work? When a PooledDataBuffer
is
allocated the reference count is at 1. Calls to retain()
increment the count, while
calls to release()
decrement it. As long as the count is above 0, the buffer is
guaranteed not to be released. When the count is decreased to 0, the pooled buffer can be
released, which in practice could mean the reserved memory for the buffer is returned to
the memory pool.
Note that instead of operating on PooledDataBuffer
directly, in most cases it’s better
to use the convenience methods in DataBufferUtils
that apply release or retain to a
DataBuffer
only if it is an instance of PooledDataBuffer
.
DataBufferUtils
DataBufferUtils
offers a number of utility methods to operate on data buffers:
-
Join a stream of data buffers into a single buffer possibly with zero copy, for example, via composite buffers, if that’s supported by the underlying byte buffer API.
-
Turn
InputStream
or NIOChannel
intoFlux<DataBuffer>
, and vice versa aPublisher<DataBuffer>
intoOutputStream
or NIOChannel
. -
Methods to release or retain a
DataBuffer
if the buffer is an instance ofPooledDataBuffer
. -
Skip or take from a stream of bytes until a specific byte count.
Codecs
The org.springframework.core.codec
package provides the following strategy interfaces:
-
Encoder
to encodePublisher<T>
into a stream of data buffers. -
Decoder
to decodePublisher<DataBuffer>
into a stream of higher level objects.
The spring-core
module provides byte[]
, ByteBuffer
, DataBuffer
, Resource
, and
String
encoder and decoder implementations. The spring-web
module adds Jackson JSON,
Jackson Smile, JAXB2, Protocol Buffers and other encoders and decoders. See
Codecs in the WebFlux section.
Using DataBuffer
When working with data buffers, special care must be taken to ensure buffers are released since they may be pooled. We’ll use codecs to illustrate how that works but the concepts apply more generally. Let’s see what codecs must do internally to manage data buffers.
A Decoder
is the last to read input data buffers, before creating higher level
objects, and therefore it must release them as follows:
-
If a
Decoder
simply reads each input buffer and is ready to release it immediately, it can do so viaDataBufferUtils.release(dataBuffer)
. -
If a
Decoder
is usingFlux
orMono
operators such asflatMap
,reduce
, and others that prefetch and cache data items internally, or is using operators such asfilter
,skip
, and others that leave out items, thendoOnDiscard(DataBuffer.class, DataBufferUtils::release)
must be added to the composition chain to ensure such buffers are released prior to being discarded, possibly also as a result of an error or cancellation signal. -
If a
Decoder
holds on to one or more data buffers in any other way, it must ensure they are released when fully read, or in case of an error or cancellation signals that take place before the cached data buffers have been read and released.
Note that DataBufferUtils#join
offers a safe and efficient way to aggregate a data
buffer stream into a single data buffer. Likewise skipUntilByteCount
and
takeUntilByteCount
are additional safe methods for decoders to use.
An Encoder
allocates data buffers that others must read (and release). So an Encoder
doesn’t have much to do. However an Encoder
must take care to release a data buffer if
a serialization error occurs while populating the buffer with data. For example:
-
Java
-
Kotlin
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
// serialize and populate buffer..
release = false
} finally {
if (release) {
DataBufferUtils.release(buffer)
}
}
return buffer
The consumer of an Encoder
is responsible for releasing the data buffers it receives.
In a WebFlux application, the output of the Encoder
is used to write to the HTTP server
response, or to the client HTTP request, in which case releasing the data buffers is the
responsibility of the code writing to the server response, or to the client request.
Note that when running on Netty, there are debugging options for troubleshooting buffer leaks.