This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.4.0! |
About Non-blocking I/O (NIO)
Using NIO (see using-nio
in IP Configuration Attributes) avoids dedicating a thread to read from each socket.
For a small number of sockets, you are likely to find that not using NIO, together with an asynchronous hand-off (such as to a QueueChannel
), performs as well as or better than using NIO.
You should consider using NIO when handling a large number of connections. However, the use of NIO has some other ramifications. A pool of threads (in the task executor) is shared across all the sockets. Each incoming message is assembled and sent to the configured channel as a separate unit of work on a thread selected from that pool. Two sequential messages arriving at the same socket might be processed by different threads. This means that the order in which the messages are sent to the channel is indeterminate. Strict ordering of the messages arriving at the socket is not maintained.
For some applications, this is not an issue.
For others, it is a problem.
If you require strict ordering, consider setting using-nio
to false
and using an asynchronous hand-off.
Alternatively, you can insert a resequencer downstream of the inbound endpoint to return the messages to their proper sequence.
If you set apply-sequence
to true
on the connection factory, messages arriving at a TCP connection have sequenceNumber
and correlationId
headers set.
The resequencer uses these headers to return the messages to their proper sequence.
Starting with version 5.1.4, priority is given to accepting new connections over reading from existing connections.
This should, generally, have little impact unless you have a very high rate of new incoming connections.
If you wish to revert to the previous behavior of giving reads priority, set the multiAccept property on the TcpNioServerConnectionFactory to false .
|
Pool Size
The pool size attribute is no longer used.
Previously, it specified the size of the default thread pool when a task-executor was not specified.
It was also used to set the connection backlog on server sockets.
The first function is no longer needed (see the next paragraph).
The second function is replaced by the backlog
attribute.
Previously, when using a fixed thread pool task executor (which was the default) with NIO, it was possible to get a deadlock and processing would stop. The problem occurred when a buffer was full, a thread reading from the socket was trying to add more data to the buffer, and no threads were available to make space in the buffer. This only occurred with a very small pool size, but it could be possible under extreme conditions. Since 2.2, two changes have eliminated this problem. First, the default task executor is a cached thread pool executor. Second, deadlock detection logic has been added such that, if thread starvation occurs, instead of deadlocking, an exception is thrown, thus releasing the deadlocked resources.
Now that the default task executor is unbounded, it is possible that an out-of-memory condition might occur with high rates of incoming messages, if message processing takes extended time. If your application exhibits this type of behavior, you should use a pooled task executor with an appropriate pool size, but see the next section. |
Thread Pool Task Executor with CALLER_RUNS
Policy
You should keep in mind some important considerations when you use a fixed thread pool with the CallerRunsPolicy
(CALLER_RUNS
when using the <task/>
namespace) and the queue capacity is small.
The following does not apply if you do not use a fixed thread pool.
With NIO connections, there are three distinct task types. The I/O selector processing is performed on one dedicated thread (detecting events, accepting new connections, and dispatching the I/O read operations to other threads by using the task executor). When an I/O reader thread (to which the read operation is dispatched) reads data, it hands off to another thread to assemble the incoming message. Large messages can take several reads to complete. These “assembler” threads can block while waiting for data. When a new read event occurs, the reader determines if this socket already has an assembler and, if not, runs a new one. When the assembly process is complete, the assembler thread is returned to the pool.
This can cause a deadlock when the pool is exhausted, the CALLER_RUNS
rejection policy is in use, and the task queue is full.
When the pool is empty and there is no room in the queue, the IO selector thread receives an OP_READ
event and dispatches the read by using the executor.
The queue is full, so the selector thread itself starts the read process.
Now it detects that there is no assembler for this socket and, before it does the read, fires off an assembler.
Again, the queue is full, and the selector thread becomes the assembler.
The assembler is now blocked, waiting for the data to be read, which never happens.
The connection factory is now deadlocked because the selector thread cannot handle new events.
To avoid this deadlock, we must avoid the selector (or reader) threads performing the assembly task. We want to use separate pools for the IO and assembly operations.
The framework provides a CompositeExecutor
, which allows the configuration of two distinct executors: one for performing IO operations and one for message assembly.
In this environment, an IO thread can never become an assembler thread, and the deadlock cannot occur.
In addition, the task executors should be configured to use an AbortPolicy
(ABORT
when using <task>
).
When an I/O task cannot be completed, it is deferred for a short time and continually retried until it can be completed and have an assembler allocated.
By default, the delay is 100ms, but you can change it by setting the readDelay
property on the connection factory (read-delay
when configuring with the XML namespace).
The following three examples shows how to configure the composite executor:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>