Class ZeroMqMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractReactiveMessageHandler
org.springframework.integration.zeromq.outbound.ZeroMqMessageHandler
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,ApplicationContextAware
,Lifecycle
,Ordered
,ComponentSourceAware
,ExpressionCapable
,Orderable
,IntegrationPattern
,NamedComponent
,IntegrationManagement
,ManageableLifecycle
,TrackableComponent
,ReactiveMessageHandler
public class ZeroMqMessageHandler
extends AbstractReactiveMessageHandler
implements ManageableLifecycle
The
AbstractReactiveMessageHandler
implementation for publishing messages over ZeroMq socket.
Only SocketType.PAIR
, SocketType.PUB
and SocketType.PUSH
are supported.
This component can bind or connect the socket.
When the SocketType.PUB
is used, the topicExpression
is evaluated against a
request message to inject a topic frame into a ZeroMq message if it is not null
.
The subscriber side must receive the topic frame first before parsing the actual data.
When the payload of the request message is a ZMsg
, no any conversion and topic extraction happen:
the ZMsg
is sent into a socket as is, and it is not destroyed for possible further reusing.
- Since:
- 5.4
- Author:
- Artem Bilan, Alessio Matricardi
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
-
Constructor Summary
ConstructorDescriptionZeroMqMessageHandler
(org.zeromq.ZContext context) Create an instance based on the providedZContext
.ZeroMqMessageHandler
(org.zeromq.ZContext context, int port) Create an instance based on the providedZContext
and binding port.ZeroMqMessageHandler
(org.zeromq.ZContext context, int port, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
, binding port andSocketType
.ZeroMqMessageHandler
(org.zeromq.ZContext context, String connectUrl) Create an instance based on the providedZContext
and connection string.ZeroMqMessageHandler
(org.zeromq.ZContext context, String connectUrl, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
, connection string andSocketType
.ZeroMqMessageHandler
(org.zeromq.ZContext context, Supplier<String> connectUrl) Create an instance based on the providedZContext
and connection string supplier.ZeroMqMessageHandler
(org.zeromq.ZContext context, Supplier<String> connectUrl, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
, connection string supplier andSocketType
.ZeroMqMessageHandler
(org.zeromq.ZContext context, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
andSocketType
. -
Method Summary
Modifier and TypeMethodDescriptionvoid
destroy()
int
Return the port a socket is bound or 0 if this message producer has not been started yet or the socket is connected - not bound.Subclasses may implement this method to provide component type information.protected reactor.core.publisher.Mono<Void>
handleMessageInternal
(Message<?> message) boolean
protected void
onInit()
Subclasses may implement this for initialization logic.void
setMessageConverter
(MessageConverter messageConverter) Provide aMessageConverter
(as an alternative tomessageMapper
) for converting a request message intobyte[]
for sending into ZeroMq socket.void
setMessageMapper
(OutboundMessageMapper<byte[]> messageMapper) Provide anOutboundMessageMapper
to convert a request message intobyte[]
for sending into ZeroMq socket.void
setSocketConfigurer
(Consumer<org.zeromq.ZMQ.Socket> socketConfigurer) Provide aConsumer
to configure a socket with arbitrary options, like security.void
Specify a topic theSocketType.PUB
socket is going to use for distributing messages into the subscriptions.void
setTopicExpression
(Expression topicExpression) Specify a SpEL expression to evaluate a topic aSocketType.PUB
is going to use for distributing messages into the subscriptions.It is ignored for all otherSocketType
s supported.void
start()
void
stop()
void
wrapTopic
(boolean wrapTopic) Specify if the topic thatSocketType.PUB
socket is going to use for distributing messages into the subscriptions must be wrapped with an additional empty frame.Methods inherited from class org.springframework.integration.handler.AbstractReactiveMessageHandler
handleMessage
Methods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, getIntegrationPatternType, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAs
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
ZeroMqMessageHandler
public ZeroMqMessageHandler(org.zeromq.ZContext context) Create an instance based on the providedZContext
.- Parameters:
context
- theZContext
to use for creating sockets.- Since:
- 6.4
-
ZeroMqMessageHandler
public ZeroMqMessageHandler(org.zeromq.ZContext context, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
andSocketType
.- Parameters:
context
- theZContext
to use for creating sockets.socketType
- theSocketType
to use; onlySocketType.PAIR
,SocketType.PUB
andSocketType.PUSH
are supported.
-
ZeroMqMessageHandler
Create an instance based on the providedZContext
and connection string.- Parameters:
context
- theZContext
to use for creating sockets.connectUrl
- the URL to connect the socket to.
-
ZeroMqMessageHandler
public ZeroMqMessageHandler(org.zeromq.ZContext context, int port) Create an instance based on the providedZContext
and binding port.- Parameters:
context
- theZContext
to use for creating sockets.port
- the port to bind ZeroMq socket to over TCP.- Since:
- 6.4
-
ZeroMqMessageHandler
Create an instance based on the providedZContext
and connection string supplier.- Parameters:
context
- theZContext
to use for creating sockets.connectUrl
- the supplier for URL to connect the socket to.- Since:
- 5.5.9
-
ZeroMqMessageHandler
public ZeroMqMessageHandler(org.zeromq.ZContext context, String connectUrl, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
, connection string andSocketType
.- Parameters:
context
- theZContext
to use for creating sockets.connectUrl
- the URL to connect the socket to.socketType
- theSocketType
to use; onlySocketType.PAIR
,SocketType.PUB
andSocketType.PUSH
are supported.
-
ZeroMqMessageHandler
public ZeroMqMessageHandler(org.zeromq.ZContext context, int port, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
, binding port andSocketType
.- Parameters:
context
- theZContext
to use for creating sockets.port
- the port to bind ZeroMq socket to over TCP.socketType
- theSocketType
to use; onlySocketType.PAIR
,SocketType.PUB
andSocketType.PUSH
are supported.- Since:
- 6.4
-
ZeroMqMessageHandler
public ZeroMqMessageHandler(org.zeromq.ZContext context, Supplier<String> connectUrl, org.zeromq.SocketType socketType) Create an instance based on the providedZContext
, connection string supplier andSocketType
.- Parameters:
context
- theZContext
to use for creating sockets.connectUrl
- the supplier for URL to connect the socket to.socketType
- theSocketType
to use; onlySocketType.PAIR
,SocketType.PUB
andSocketType.PUSH
are supported.- Since:
- 5.5.9
-
-
Method Details
-
setMessageMapper
Provide anOutboundMessageMapper
to convert a request message intobyte[]
for sending into ZeroMq socket. Ignored whenMessage.getPayload()
is an instance ofZMsg
.- Parameters:
messageMapper
- theOutboundMessageMapper
to use.
-
setMessageConverter
Provide aMessageConverter
(as an alternative tomessageMapper
) for converting a request message intobyte[]
for sending into ZeroMq socket. Ignored whenMessage.getPayload()
is an instance ofZMsg
.- Parameters:
messageConverter
- theMessageConverter
to use.
-
setSocketConfigurer
Provide aConsumer
to configure a socket with arbitrary options, like security.- Parameters:
socketConfigurer
- the configurer for socket options.
-
setTopic
Specify a topic theSocketType.PUB
socket is going to use for distributing messages into the subscriptions. It is ignored for all otherSocketType
s supported.- Parameters:
topic
- the topic to use.
-
setTopicExpression
Specify a SpEL expression to evaluate a topic aSocketType.PUB
is going to use for distributing messages into the subscriptions.It is ignored for all otherSocketType
s supported.- Parameters:
topicExpression
- the expression to evaluate topic for publishing.
-
wrapTopic
public void wrapTopic(boolean wrapTopic) Specify if the topic thatSocketType.PUB
socket is going to use for distributing messages into the subscriptions must be wrapped with an additional empty frame. It is ignored for all otherSocketType
s supported. This attribute is set totrue
by default.- Parameters:
wrapTopic
- true if the topic must be wrapped with an additional empty frame.- Since:
- 6.2.6
-
getBoundPort
public int getBoundPort()Return the port a socket is bound or 0 if this message producer has not been started yet or the socket is connected - not bound.- Returns:
- the port for a socket or 0.
- Since:
- 6.4
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classMessageHandlerSupport
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classIntegrationObjectSupport
-
start
public void start()- Specified by:
start
in interfaceLifecycle
- Specified by:
start
in interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stop
in interfaceLifecycle
- Specified by:
stop
in interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunning
in interfaceLifecycle
- Specified by:
isRunning
in interfaceManageableLifecycle
-
handleMessageInternal
- Specified by:
handleMessageInternal
in classAbstractReactiveMessageHandler
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
- Specified by:
destroy
in interfaceIntegrationManagement
- Overrides:
destroy
in classMessageHandlerSupport
-