Class DataBufferUtils
DataBuffers
.- Since:
- 5.0
- Author:
- Arjen Poutsma, Brian Clozel
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic interface
Contract to find delimiter(s) against one or more data buffers that can be passed one at a time to theDataBufferUtils.Matcher.match(DataBuffer)
method. -
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionstatic reactor.core.publisher.Mono<DataBuffer>
join
(Publisher<? extends DataBuffer> dataBuffers) Return a newDataBuffer
composed from joining together the givendataBuffers
elements.static reactor.core.publisher.Mono<DataBuffer>
join
(Publisher<? extends DataBuffer> buffers, int maxByteCount) Variant ofjoin(Publisher)
that behaves the same way up until the specified max number of bytes to buffer.static DataBufferUtils.Matcher
matcher
(byte[] delimiter) Return aDataBufferUtils.Matcher
for the given delimiter.static DataBufferUtils.Matcher
matcher
(byte[]... delimiters) Return aDataBufferUtils.Matcher
for the given delimiters.static reactor.core.publisher.Flux<DataBuffer>
read
(Path path, DataBufferFactory bufferFactory, int bufferSize, OpenOption... options) Read bytes from the given filePath
into aFlux
ofDataBuffer
s.static reactor.core.publisher.Flux<DataBuffer>
read
(Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) Read the givenResource
into aFlux
ofDataBuffer
s starting at the given position.static reactor.core.publisher.Flux<DataBuffer>
read
(Resource resource, DataBufferFactory bufferFactory, int bufferSize) Read the givenResource
into aFlux
ofDataBuffer
s.static reactor.core.publisher.Flux<DataBuffer>
readAsynchronousFileChannel
(Callable<AsynchronousFileChannel> channelSupplier, long position, DataBufferFactory bufferFactory, int bufferSize) Obtain anAsynchronousFileChannel
from the given supplier, and read it into aFlux
ofDataBuffer
s, starting at the given position.static reactor.core.publisher.Flux<DataBuffer>
readAsynchronousFileChannel
(Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain aAsynchronousFileChannel
from the given supplier, and read it into aFlux
ofDataBuffer
s.static reactor.core.publisher.Flux<DataBuffer>
readByteChannel
(Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) static reactor.core.publisher.Flux<DataBuffer>
readInputStream
(Callable<InputStream> inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) static boolean
release
(DataBuffer dataBuffer) Release the given data buffer, if it is aPooledDataBuffer
and has been allocated.static Consumer<DataBuffer>
Return a consumer that callsrelease(DataBuffer)
on all passed data buffers.static <T extends DataBuffer>
Tretain
(T dataBuffer) Retain the given data buffer, if it is aPooledDataBuffer
.static reactor.core.publisher.Flux<DataBuffer>
skipUntilByteCount
(Publisher<? extends DataBuffer> publisher, long maxByteCount) Skip buffers from the givenPublisher
until the total byte count reaches the given maximum byte count, or until the publisher is complete.static reactor.core.publisher.Flux<DataBuffer>
takeUntilByteCount
(Publisher<? extends DataBuffer> publisher, long maxByteCount) Relay buffers from the givenPublisher
until the total byte count reaches the given maximum byte count, or until the publisher is complete.static <T extends DataBuffer>
TAssociate the given hint with the data buffer if it is a pooled buffer and supports leak tracking.static reactor.core.publisher.Flux<DataBuffer>
write
(Publisher<? extends DataBuffer> source, AsynchronousFileChannel channel, long position) Write the given stream ofDataBuffers
to the givenAsynchronousFileChannel
.static reactor.core.publisher.Flux<DataBuffer>
write
(Publisher<DataBuffer> source, OutputStream outputStream) Write the given stream ofDataBuffers
to the givenOutputStream
.static reactor.core.publisher.Flux<DataBuffer>
write
(Publisher<DataBuffer> source, AsynchronousFileChannel channel) Write the given stream ofDataBuffers
to the givenAsynchronousFileChannel
.static reactor.core.publisher.Flux<DataBuffer>
write
(Publisher<DataBuffer> source, WritableByteChannel channel) Write the given stream ofDataBuffers
to the givenWritableByteChannel
.static reactor.core.publisher.Mono<Void>
write
(Publisher<DataBuffer> source, Path destination, OpenOption... options) Write the given stream ofDataBuffers
to the given filePath
.
-
Constructor Details
-
DataBufferUtils
public DataBufferUtils()
-
-
Method Details
-
readInputStream
public static reactor.core.publisher.Flux<DataBuffer> readInputStream(Callable<InputStream> inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain anInputStream
from the given supplier, and read it into aFlux
ofDataBuffer
s. Closes the input stream when the Flux is terminated.- Parameters:
inputStreamSupplier
- the supplier for the input stream to read frombufferFactory
- the factory to create data buffers withbufferSize
- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
readByteChannel
public static reactor.core.publisher.Flux<DataBuffer> readByteChannel(Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain aReadableByteChannel
from the given supplier, and read it into aFlux
ofDataBuffer
s. Closes the channel when the Flux is terminated.- Parameters:
channelSupplier
- the supplier for the channel to read frombufferFactory
- the factory to create data buffers withbufferSize
- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
readAsynchronousFileChannel
public static reactor.core.publisher.Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain aAsynchronousFileChannel
from the given supplier, and read it into aFlux
ofDataBuffer
s. Closes the channel when the Flux is terminated.- Parameters:
channelSupplier
- the supplier for the channel to read frombufferFactory
- the factory to create data buffers withbufferSize
- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
readAsynchronousFileChannel
public static reactor.core.publisher.Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, long position, DataBufferFactory bufferFactory, int bufferSize) Obtain anAsynchronousFileChannel
from the given supplier, and read it into aFlux
ofDataBuffer
s, starting at the given position. Closes the channel when the Flux is terminated.- Parameters:
channelSupplier
- the supplier for the channel to read fromposition
- the position to start reading frombufferFactory
- the factory to create data buffers withbufferSize
- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
read
public static reactor.core.publisher.Flux<DataBuffer> read(Path path, DataBufferFactory bufferFactory, int bufferSize, OpenOption... options) Read bytes from the given filePath
into aFlux
ofDataBuffer
s. The method ensures that the file is closed when the flux is terminated.- Parameters:
path
- the path to read bytes frombufferFactory
- the factory to create data buffers withbufferSize
- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
- Since:
- 5.2
-
read
public static reactor.core.publisher.Flux<DataBuffer> read(Resource resource, DataBufferFactory bufferFactory, int bufferSize) Read the givenResource
into aFlux
ofDataBuffer
s.If the resource is a file, it is read into an
AsynchronousFileChannel
and turned toFlux
viareadAsynchronousFileChannel(Callable, DataBufferFactory, int)
or else fall back toreadByteChannel(Callable, DataBufferFactory, int)
. Closes the channel when the flux is terminated.- Parameters:
resource
- the resource to read frombufferFactory
- the factory to create data buffers withbufferSize
- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
read
public static reactor.core.publisher.Flux<DataBuffer> read(Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) Read the givenResource
into aFlux
ofDataBuffer
s starting at the given position.If the resource is a file, it is read into an
AsynchronousFileChannel
and turned toFlux
viareadAsynchronousFileChannel(Callable, DataBufferFactory, int)
or else fall back onreadByteChannel(Callable, DataBufferFactory, int)
. Closes the channel when the flux is terminated.- Parameters:
resource
- the resource to read fromposition
- the position to start reading frombufferFactory
- the factory to create data buffers withbufferSize
- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream outputStream) Write the given stream ofDataBuffers
to the givenOutputStream
. Does not close the output stream when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFlux
with areleaseConsumer()
.Note that the writing process does not start until the returned
Flux
is subscribed to.- Parameters:
source
- the stream of data buffers to be writtenoutputStream
- the output stream to write to- Returns:
- a Flux containing the same buffers as in
source
, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) Write the given stream ofDataBuffers
to the givenWritableByteChannel
. Does not close the channel when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFlux
with areleaseConsumer()
.Note that the writing process does not start until the returned
Flux
is subscribed to.- Parameters:
source
- the stream of data buffers to be writtenchannel
- the channel to write to- Returns:
- a Flux containing the same buffers as in
source
, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel) Write the given stream ofDataBuffers
to the givenAsynchronousFileChannel
. Does not close the channel when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFlux
with areleaseConsumer()
.Note that the writing process does not start until the returned
Flux
is subscribed to.- Parameters:
source
- the stream of data buffers to be writtenchannel
- the channel to write to- Returns:
- a Flux containing the same buffers as in
source
, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal - Since:
- 5.0.10
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<? extends DataBuffer> source, AsynchronousFileChannel channel, long position) Write the given stream ofDataBuffers
to the givenAsynchronousFileChannel
. Does not close the channel when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFlux
with areleaseConsumer()
.Note that the writing process does not start until the returned
Flux
is subscribed to.- Parameters:
source
- the stream of data buffers to be writtenchannel
- the channel to write toposition
- the file position where writing is to begin; must be non-negative- Returns:
- a flux containing the same buffers as in
source
, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal
-
write
public static reactor.core.publisher.Mono<Void> write(Publisher<DataBuffer> source, Path destination, OpenOption... options) Write the given stream ofDataBuffers
to the given filePath
. The optionaloptions
parameter specifies how the file is created or opened (defaults toCREATE
,TRUNCATE_EXISTING
, andWRITE
).- Parameters:
source
- the stream of data buffers to be writtendestination
- the path to the fileoptions
- the options specifying how the file is opened- Returns:
- a
Mono
that indicates completion or error - Since:
- 5.2
-
takeUntilByteCount
public static reactor.core.publisher.Flux<DataBuffer> takeUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) Relay buffers from the givenPublisher
until the total byte count reaches the given maximum byte count, or until the publisher is complete.- Parameters:
publisher
- the publisher to filtermaxByteCount
- the maximum byte count- Returns:
- a flux whose maximum byte count is
maxByteCount
-
skipUntilByteCount
public static reactor.core.publisher.Flux<DataBuffer> skipUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) Skip buffers from the givenPublisher
until the total byte count reaches the given maximum byte count, or until the publisher is complete.- Parameters:
publisher
- the publisher to filtermaxByteCount
- the maximum byte count- Returns:
- a flux with the remaining part of the given publisher
-
retain
Retain the given data buffer, if it is aPooledDataBuffer
.- Parameters:
dataBuffer
- the data buffer to retain- Returns:
- the retained buffer
-
touch
Associate the given hint with the data buffer if it is a pooled buffer and supports leak tracking.- Parameters:
dataBuffer
- the data buffer to attach the hint tohint
- the hint to attach- Returns:
- the input buffer
- Since:
- 5.3.2
-
release
Release the given data buffer, if it is aPooledDataBuffer
and has been allocated.- Parameters:
dataBuffer
- the data buffer to release- Returns:
true
if the buffer was released;false
otherwise.
-
releaseConsumer
Return a consumer that callsrelease(DataBuffer)
on all passed data buffers. -
join
public static reactor.core.publisher.Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) Return a newDataBuffer
composed from joining together the givendataBuffers
elements. Depending on theDataBuffer
type, the returned buffer may be a single buffer containing all data of the provided buffers, or it may be a zero-copy, composite with references to the given buffers.If
dataBuffers
produces an error or if there is a cancel signal, then all accumulated buffers will be released.Note that the given data buffers do not have to be released. They will be released as part of the returned composite.
- Parameters:
dataBuffers
- the data buffers that are to be composed- Returns:
- a buffer that is composed from the
dataBuffers
argument - Since:
- 5.0.3
-
join
public static reactor.core.publisher.Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) Variant ofjoin(Publisher)
that behaves the same way up until the specified max number of bytes to buffer. Once the limit is exceeded,DataBufferLimitException
is raised.- Parameters:
buffers
- the data buffers that are to be composedmaxByteCount
- the max number of bytes to buffer, or -1 for unlimited- Returns:
- a buffer with the aggregated content, possibly an empty Mono if the max number of bytes to buffer is exceeded.
- Throws:
DataBufferLimitException
- if maxByteCount is exceeded- Since:
- 5.1.11
-
matcher
Return aDataBufferUtils.Matcher
for the given delimiter. The matcher can be used to find the delimiters in a stream of data buffers.- Parameters:
delimiter
- the delimiter bytes to find- Returns:
- the matcher
- Since:
- 5.2
-
matcher
Return aDataBufferUtils.Matcher
for the given delimiters. The matcher can be used to find the delimiters in a stream of data buffers.- Parameters:
delimiters
- the delimiters bytes to find- Returns:
- the matcher
- Since:
- 5.2
-