Class ZeroMqMessageProducer
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.zeromq.inbound.ZeroMqMessageProducer
- All Implemented Interfaces:
Aware
,BeanFactoryAware
,BeanNameAware
,DisposableBean
,InitializingBean
,SmartInitializingSingleton
,ApplicationContextAware
,Lifecycle
,Phased
,SmartLifecycle
,ComponentSourceAware
,ExpressionCapable
,MessageProducer
,IntegrationPattern
,NamedComponent
,IntegrationInboundManagement
,IntegrationManagement
,ManageableLifecycle
,ManageableSmartLifecycle
,TrackableComponent
@ManagedResource
@IntegrationManagedResource
public class ZeroMqMessageProducer
extends MessageProducerSupport
A
MessageProducerSupport
implementation for consuming messages from ZeroMq socket.
Only SocketType.PAIR
, SocketType.SUB
and SocketType.PULL
are supported.
This component can bind or connect the socket.
When the SocketType.SUB
is used, the received topic is stored in the ZeroMqHeaders.TOPIC
.
- Since:
- 5.4
- Author:
- Artem Bilan, Alessio Matricardi
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides
-
Field Summary
Fields inherited from class org.springframework.integration.endpoint.AbstractEndpoint
lifecycleCondition, lifecycleLock
Fields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, logger
Fields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
Fields inherited from interface org.springframework.context.SmartLifecycle
DEFAULT_PHASE
-
Constructor Summary
ConstructorDescriptionZeroMqMessageProducer
(org.zeromq.ZContext context) ZeroMqMessageProducer
(org.zeromq.ZContext context, org.zeromq.SocketType socketType) -
Method Summary
Modifier and TypeMethodDescriptionvoid
destroy()
protected void
doStart()
Take no action by default.protected void
doStop()
Take no action by default.int
Return the port a socket is bound or 0 if this message producer has not been started yet or the socket is connected - not bound.Subclasses may implement this method to provide component type information.protected void
onInit()
Subclasses may implement this for initialization logic.void
setBindPort
(int port) Configure a port for TCP protocol binding viaZMQ.Socket.bind(String)
.void
setConnectUrl
(String connectUrl) Configure an URL forZMQ.Socket.connect(String)
.void
setConsumeDelay
(Duration consumeDelay) Specify aDuration
to delay consumption when no data received.void
setMessageConverter
(MessageConverter messageConverter) Provide aMessageConverter
(as an alternative tomessageMapper
) for converting a consumed data into a message to produce.void
setMessageMapper
(InboundMessageMapper<byte[]> messageMapper) Provide anInboundMessageMapper
to convert a consumed data into a message to produce.void
setReceiveRaw
(boolean receiveRaw) Whether rawZMsg
is present as a payload of message to produce or it is fully converted to aMessage
includingZeroMqHeaders.TOPIC
header (if any).void
setSocketConfigurer
(Consumer<org.zeromq.ZMQ.Socket> socketConfigurer) Provide aConsumer
to configure a socket with arbitrary options, like security.void
Specify topics theSocketType.SUB
socket is going to use for subscription.void
subscribeToTopics
(String... topics) void
unsubscribeFromTopics
(String... topics) void
unwrapTopic
(boolean unwrapTopic) Specify if the topic thatSocketType.SUB
socket is going to receive is wrapped with an additional empty frame.Methods inherited from class org.springframework.integration.endpoint.MessageProducerSupport
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageAttributes, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
Methods inherited from class org.springframework.integration.endpoint.AbstractEndpoint
doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
Methods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedType
Methods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Field Details
-
DEFAULT_CONSUME_DELAY
-
-
Constructor Details
-
ZeroMqMessageProducer
public ZeroMqMessageProducer(org.zeromq.ZContext context) -
ZeroMqMessageProducer
public ZeroMqMessageProducer(org.zeromq.ZContext context, org.zeromq.SocketType socketType)
-
-
Method Details
-
setConsumeDelay
Specify aDuration
to delay consumption when no data received.- Parameters:
consumeDelay
- theDuration
to delay consumption when empty; defaults toDEFAULT_CONSUME_DELAY
.
-
setMessageMapper
Provide anInboundMessageMapper
to convert a consumed data into a message to produce. Ignored whensetReceiveRaw(boolean)
istrue
.- Parameters:
messageMapper
- theInboundMessageMapper
to use.
-
setMessageConverter
Provide aMessageConverter
(as an alternative tomessageMapper
) for converting a consumed data into a message to produce. Ignored whensetReceiveRaw(boolean)
istrue
.- Parameters:
messageConverter
- theMessageConverter
to use.
-
setReceiveRaw
public void setReceiveRaw(boolean receiveRaw) Whether rawZMsg
is present as a payload of message to produce or it is fully converted to aMessage
includingZeroMqHeaders.TOPIC
header (if any).- Parameters:
receiveRaw
- to convert fromZMsg
or not; defaults to convert.
-
setSocketConfigurer
Provide aConsumer
to configure a socket with arbitrary options, like security.- Parameters:
socketConfigurer
- the configurer for socket options.
-
setTopics
Specify topics theSocketType.SUB
socket is going to use for subscription. It is ignored for all otherSocketType
s supported.- Parameters:
topics
- the topics to use.
-
setConnectUrl
Configure an URL forZMQ.Socket.connect(String)
. Mutually exclusive with thesetBindPort(int)
.- Parameters:
connectUrl
- the URL to connect ZeroMq socket to.
-
setBindPort
public void setBindPort(int port) Configure a port for TCP protocol binding viaZMQ.Socket.bind(String)
. Mutually exclusive with thesetConnectUrl(String)
.- Parameters:
port
- the port to bind ZeroMq socket to over TCP.
-
getBoundPort
public int getBoundPort()Return the port a socket is bound or 0 if this message producer has not been started yet or the socket is connected - not bound.- Returns:
- the port for a socket or 0.
-
unwrapTopic
public void unwrapTopic(boolean unwrapTopic) Specify if the topic thatSocketType.SUB
socket is going to receive is wrapped with an additional empty frame. It is ignored for all otherSocketType
s supported. This attribute is set totrue
by default.- Parameters:
unwrapTopic
- true if the received topic is wrapped with an additional empty frame.- Since:
- 6.2.6
-
getComponentType
Description copied from class:IntegrationObjectSupport
Subclasses may implement this method to provide component type information.- Specified by:
getComponentType
in interfaceNamedComponent
- Overrides:
getComponentType
in classIntegrationObjectSupport
-
onInit
protected void onInit()Description copied from class:IntegrationObjectSupport
Subclasses may implement this for initialization logic.- Overrides:
onInit
in classMessageProducerSupport
-
subscribeToTopics
-
unsubscribeFromTopics
-
doStart
protected void doStart()Description copied from class:MessageProducerSupport
Take no action by default. Subclasses may override this if they need lifecycle-managed behavior. Protected by 'lifecycleLock'.- Overrides:
doStart
in classMessageProducerSupport
-
doStop
protected void doStop()Description copied from class:MessageProducerSupport
Take no action by default. Subclasses may override this if they need lifecycle-managed behavior.- Overrides:
doStop
in classMessageProducerSupport
-
destroy
public void destroy()- Specified by:
destroy
in interfaceDisposableBean
- Specified by:
destroy
in interfaceIntegrationManagement
- Overrides:
destroy
in classAbstractEndpoint
-