In many cases application data is obtained from a stream. It is not recommended to send a reference to a Stream as a message payload to a consumer. Instead messages are created from data that is read from an input stream and message payloads are written to an output stream one by one.
Spring Integration provides two adapters for streams.
Both ByteStreamReadingMessageSource
and CharacterStreamReadingMessageSource
implement MessageSource
.
By configuring one of these within a channel-adapter element, the polling period can be configured, and the Message Bus can automatically detect and schedule them.
The byte stream version requires an InputStream
, and the character stream version requires a Reader
as the single constructor argument.
The ByteStreamReadingMessageSource
also accepts the bytesPerMessage property to determine how many bytes it will attempt to read into each Message
.
The default value is 1024.
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource"> <constructor-arg ref="someInputStream"/> <property name="bytesPerMessage" value="2048"/> </bean> <bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource"> <constructor-arg ref="someReader"/> </bean>
The CharacterStreamReadingMessageSource
wraps the reader in a BufferedReader
(if it’s not one already).
You can set the buffer size used by the buffered reader in the second constructor argument.
Starting with version 5.0, a third constructor argument (blockToDetectEOF
) controls the behavior of the CharacterStreamReadingMessageSource
.
When false
(default), the receive()
method checks if the reader is ready()
and returns null if not.
EOF is not detected in this case.
When true
, the receive()
method blocks until data is available, or EOF is detected on the underlying stream.
When EOF is detected, a StreamClosedEvent
(application event) is published; you can consume this event with a bean implementing ApplicationListener<StreamClosedEvent>
.
Note | |
---|---|
To facilitate EOF detection, the poller thread will block in the |
Important | |
---|---|
The poller will continue to publish an event on each poll once EOF has been detected; the application listener can stop the adapter to prevent this.
The event is published on the poller thread and stopping the adapter will cause the thread to be interrupted.
If you intend to perform some interruptible task after stopping the adapter, you must either perform the |
This facilitates "piping" or redirecting data to stdin
, such as…
cat foo.txt | java -jar my.jar
or
java -jar my.jar < foo.txt
allowing the application to terminate when the pipe is closed.
Four convenient factory methods are available:
public static final CharacterStreamReadingMessageSource stdin() { ... } public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... } public static final CharacterStreamReadingMessageSource stdinPipe() { ... } public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
For target streams, there are also two implementations: ByteStreamWritingMessageHandler
and CharacterStreamWritingMessageHandler
.
Each requires a single constructor argument - OutputStream
for byte streams or Writer
for character streams, and each provides a second constructor that adds the optional bufferSize.
Since both of these ultimately implement the MessageHandler
interface, they can be referenced from a channel-adapter configuration as described in more detail in Section 4.3, “Channel Adapter”.
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler"> <constructor-arg ref="someOutputStream"/> <constructor-arg value="1024"/> </bean> <bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler"> <constructor-arg ref="someWriter"/> </bean>
To reduce the configuration needed for stream related channel adapters there is a namespace defined. The following schema locations are needed to use it.
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
To configure the inbound channel adapter the following code snippet shows the different configuration options that are supported.
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/> <int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
Starting with version 5.0 you can set the detect-eof
attribute which sets the blockToDetectEOF
property - see Section 30.2, “Reading from streams” for more information.
To configure the outbound channel adapter you can use the namespace support as well. The following code snippet shows the different configuration for an outbound channel adapters.
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset" channel="testChannel"/> <int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8" channel="testChannel"/> <int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/> <int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true" channel="testChannel"/>