public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel
SubscribableChannel
implementation over ZeroMQ sockets.
It can work in two messaging models:
- push-pull
, where sent messages are distributed to subscribers in a round-robin manner
according a respective ZeroMQ SocketType.PUSH
and SocketType.PULL
socket types logic;
- pub-sub
, where sent messages are distributed to all subscribers;
This message channel can work in local mode, when a pair of ZeroMQ sockets of SocketType.PAIR
type
are connected between publisher (send operation) and subscriber using inter-thread transport binding.
In distributed mode this channel has to be connected to an externally managed ZeroMQ proxy.
The setConnectUrl(String)
has to be as a standard ZeroMQ connect string, but with an extra port
over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy.
For example: tcp://localhost:6001:6002
.
Another option is to provide a reference to the ZeroMqProxy
instance managed in the same application:
frontend and backend ports are evaluated from this proxy and the respective connection string is built from them.
This way sending and receiving operations on this channel are similar to interaction over a messaging broker.
An internal logic of this message channel implementation is based on the project Reactor using its
Mono
, Flux
and Scheduler
API for better thead model and flow control to avoid
concurrency primitives for multi-publisher(subscriber) communication within the same application.
AbstractMessageChannel.ChannelInterceptorList
IntegrationManagement.ManagementOverrides
Modifier and Type | Field and Description |
---|---|
static java.time.Duration |
DEFAULT_CONSUME_DELAY |
interceptors, meters
EXPRESSION_PARSER, logger
INDEFINITE_TIMEOUT
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Constructor and Description |
---|
ZeroMqChannel(org.zeromq.ZContext context)
Create a channel instance based on the provided
ZContext with push/pull
communication model. |
ZeroMqChannel(org.zeromq.ZContext context,
boolean pubSub)
Create a channel instance based on the provided
ZContext and provided
communication model. |
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
protected boolean |
doSend(Message<?> message,
long timeout)
Subclasses must implement this method.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
setConnectUrl(String connectUrl)
Configure a connection to the ZeroMQ proxy with the pair of ports over colon
for proxy frontend and backend sockets.
|
void |
setConsumeDelay(java.time.Duration consumeDelay)
Specify a
Duration to delay consumption when no data received. |
void |
setMessageMapper(BytesMessageMapper messageMapper)
Provide a
BytesMessageMapper to convert to/from messages when send or receive happens
on the sockets. |
void |
setSendSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
The
Consumer callback to configure a publishing socket. |
void |
setSubscribeSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
The
Consumer callback to configure a consuming socket. |
void |
setZeroMqProxy(ZeroMqProxy zeroMqProxy)
Specify a reference to a
ZeroMqProxy instance in the same application
to rely on its ports configuration and make a natural lifecycle dependency without guessing
when the proxy is started. |
boolean |
subscribe(MessageHandler handler) |
boolean |
unsubscribe(MessageHandler handler) |
addInterceptor, addInterceptor, getComponentType, getFullChannelName, getIChannelInterceptorList, getIntegrationPatternType, getInterceptors, getMetricsCaptor, getOverrides, isLoggingEnabled, registerMetricsCaptor, removeInterceptor, removeInterceptor, send, send, setDatatypes, setInterceptors, setLoggingEnabled, setMessageConverter, setShouldTrack
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
send, send
getManagedName, getManagedType, getThisAs, setManagedName, setManagedType
getBeanName, getComponentName
public ZeroMqChannel(org.zeromq.ZContext context)
ZContext
with push/pull
communication model.context
- the ZContext
to use.public ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)
ZContext
and provided
communication model.context
- the ZContext
to use.pubSub
- the communication model: push/pull or pub/sub.public void setConnectUrl(@Nullable String connectUrl)
setZeroMqProxy(ZeroMqProxy)
.connectUrl
- the connection string in format PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT
,
e.g. tcp://localhost:6001:6002
public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy)
ZeroMqProxy
instance in the same application
to rely on its ports configuration and make a natural lifecycle dependency without guessing
when the proxy is started. Mutually exclusive with the setConnectUrl(String)
.zeroMqProxy
- the ZeroMqProxy
instance to usepublic void setConsumeDelay(java.time.Duration consumeDelay)
Duration
to delay consumption when no data received.consumeDelay
- the Duration
to delay consumption when empty;
defaults to DEFAULT_CONSUME_DELAY
.public void setMessageMapper(BytesMessageMapper messageMapper)
BytesMessageMapper
to convert to/from messages when send or receive happens
on the sockets.messageMapper
- the BytesMessageMapper
to use;
defaults to EmbeddedJsonHeadersMessageMapper
.public void setSendSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
Consumer
callback to configure a publishing socket.sendSocketConfigurer
- the Consumer
to use.public void setSubscribeSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
Consumer
callback to configure a consuming socket.subscribeSocketConfigurer
- the Consumer
to use.protected void onInit()
IntegrationObjectSupport
onInit
in class AbstractMessageChannel
protected boolean doSend(Message<?> message, long timeout)
AbstractMessageChannel
doSend
in class AbstractMessageChannel
message
- The message.timeout
- The timeout.public boolean subscribe(MessageHandler handler)
subscribe
in interface SubscribableChannel
public boolean unsubscribe(MessageHandler handler)
unsubscribe
in interface SubscribableChannel
public void destroy()
destroy
in interface DisposableBean
destroy
in interface IntegrationManagement
destroy
in class AbstractMessageChannel