Class ZeroMqChannel
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,ExpressionCapable
,IntegrationPattern
,NamedComponent
,IntegrationManagement
,TrackableComponent
,MessageChannel
,SubscribableChannel
,InterceptableChannel
public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel
SubscribableChannel
implementation over ZeroMQ sockets.
It can work in two messaging models:
- push-pull
, where sent messages are distributed to subscribers in a round-robin manner
according a respective ZeroMQ SocketType.PUSH
and SocketType.PULL
socket types logic;
- pub-sub
, where sent messages are distributed to all subscribers;
This message channel can work in local mode, when a pair of ZeroMQ sockets of SocketType.PAIR
type
are connected between publisher (send operation) and subscriber using inter-thread transport binding.
In distributed mode this channel has to be connected to an externally managed ZeroMQ proxy.
The setConnectUrl(String)
has to be as a standard ZeroMQ connect string, but with an extra port
over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy.
For example: tcp://localhost:6001:6002
.
Another option is to provide a reference to the ZeroMqProxy
instance managed in the same application:
frontend and backend ports are evaluated from this proxy and the respective connection string is built from them.
This way sending and receiving operations on this channel are similar to interaction over a messaging broker.
An internal logic of this message channel implementation is based on the project Reactor using its
Mono
, Flux
and Scheduler
API for better thead model and flow control to avoid
concurrency primitives for multi-publisher(subscriber) communication within the same application.
- Since:
- 5.4
- Author:
- Artem Bilan
-
Nested Class Summary
Nested classes/interfaces inherited from class org.springframework.integration.channel.AbstractMessageChannel
AbstractMessageChannel.ChannelInterceptorList
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields Modifier and Type Field Description static Duration
DEFAULT_CONSUME_DELAY
Fields inherited from class org.springframework.integration.channel.AbstractMessageChannel
interceptors, meters
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
-
Constructor Summary
Constructors Constructor Description ZeroMqChannel(org.zeromq.ZContext context)
Create a channel instance based on the providedZContext
with push/pull communication model.ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)
Create a channel instance based on the providedZContext
and provided communication model. -
Method Summary
Modifier and Type Method Description void
destroy()
protected boolean
doSend(Message<?> message, long timeout)
Subclasses must implement this method.protected void
onInit()
Subclasses may implement this for initialization logic.void
setConnectUrl(String connectUrl)
Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets.void
setConsumeDelay(Duration consumeDelay)
Specify aDuration
to delay consumption when no data received.void
setMessageMapper(BytesMessageMapper messageMapper)
Provide aBytesMessageMapper
to convert to/from messages when send or receive happens on the sockets.void
setSendSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
TheConsumer
callback to configure a publishing socket.void
setSubscribeSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
TheConsumer
callback to configure a consuming socket.void
setZeroMqProxy(ZeroMqProxy zeroMqProxy)
Specify a reference to aZeroMqProxy
instance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started.boolean
subscribe(MessageHandler handler)
boolean
unsubscribe(MessageHandler handler)
Methods inherited from class org.springframework.integration.channel.AbstractMessageChannel
addInterceptor, addInterceptor, getComponentType, getFullChannelName, getIChannelInterceptorList, getIntegrationPatternType, getInterceptors, getMetricsCaptor, getOverrides, isLoggingEnabled, registerMetricsCaptor, removeInterceptor, removeInterceptor, send, send, setDatatypes, setInterceptors, setLoggingEnabled, setMessageConverter, setShouldTrack
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getManagedName, getManagedType, getThisAs, setManagedName, setManagedType
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Field Details
-
Constructor Details
-
ZeroMqChannel
public ZeroMqChannel(org.zeromq.ZContext context)Create a channel instance based on the providedZContext
with push/pull communication model.- Parameters:
context
- theZContext
to use.
-
ZeroMqChannel
public ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)Create a channel instance based on the providedZContext
and provided communication model.- Parameters:
context
- theZContext
to use.pubSub
- the communication model: push/pull or pub/sub.
-
-
Method Details
-
setConnectUrl
Configure a connection to the ZeroMQ proxy with the pair of ports over colon for proxy frontend and backend sockets. Mutually exclusive with thesetZeroMqProxy(ZeroMqProxy)
.- Parameters:
connectUrl
- the connection string in formatPROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT
, e.g.tcp://localhost:6001:6002
-
setZeroMqProxy
Specify a reference to aZeroMqProxy
instance in the same application to rely on its ports configuration and make a natural lifecycle dependency without guessing when the proxy is started. Mutually exclusive with thesetConnectUrl(String)
.- Parameters:
zeroMqProxy
- theZeroMqProxy
instance to use
-
setConsumeDelay
Specify aDuration
to delay consumption when no data received.- Parameters:
consumeDelay
- theDuration
to delay consumption when empty; defaults toDEFAULT_CONSUME_DELAY
.
-
setMessageMapper
Provide aBytesMessageMapper
to convert to/from messages when send or receive happens on the sockets.- Parameters:
messageMapper
- theBytesMessageMapper
to use; defaults toEmbeddedJsonHeadersMessageMapper
.
-
setSendSocketConfigurer
TheConsumer
callback to configure a publishing socket.- Parameters:
sendSocketConfigurer
- theConsumer
to use.
-
setSubscribeSocketConfigurer
public void setSubscribeSocketConfigurer(Consumer<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)TheConsumer
callback to configure a consuming socket.- Parameters:
subscribeSocketConfigurer
- theConsumer
to use.
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classAbstractMessageChannel
-
doSend
Description copied from class:AbstractMessageChannel
Subclasses must implement this method. A non-negative timeout indicates how long to wait if the channel is at capacity (if the value is 0, it must return immediately with or without success). A negative timeout value indicates that the method should block until either the message is accepted or the blocking thread is interrupted.- Specified by:
doSend
in classAbstractMessageChannel
- Parameters:
message
- The message.timeout
- The timeout.- Returns:
- true if the send was successful.
-
subscribe
- Specified by:
subscribe
in interfaceSubscribableChannel
-
unsubscribe
- Specified by:
unsubscribe
in interfaceSubscribableChannel
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
- Specified by:
destroy
in interfaceIntegrationManagement
- Overrides:
destroy
in classAbstractMessageChannel
-