@ManagedResource @IntegrationManagedResource public class ZeroMqMessageProducer extends MessageProducerSupport
MessageProducerSupport
implementation for consuming messages from ZeroMq socket.
Only SocketType.PAIR
, SocketType.SUB
and SocketType.PULL
are supported.
This component can bind or connect the socket.
When the SocketType.SUB
is used, the received topic is stored in the ZeroMqHeaders.TOPIC
.
Modifier and Type | Field and Description |
---|---|
static java.time.Duration |
DEFAULT_CONSUME_DELAY |
lifecycleCondition, lifecycleLock
EXPRESSION_PARSER, logger
DEFAULT_PHASE
Constructor and Description |
---|
ZeroMqMessageProducer(org.zeromq.ZContext context) |
ZeroMqMessageProducer(org.zeromq.ZContext context,
org.zeromq.SocketType socketType) |
Modifier and Type | Method and Description |
---|---|
void |
destroy() |
protected void |
doStart()
Take no action by default.
|
protected void |
doStop()
Take no action by default.
|
int |
getBoundPort()
Return the port a socket is bound or 0 if this message producer has not been started yet
or the socket is connected - not bound.
|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
setBindPort(int port)
Configure a port for TCP protocol binding via
ZMQ.Socket.bind(String) . |
void |
setConnectUrl(String connectUrl)
Configure an URL for
ZMQ.Socket.connect(String) . |
void |
setConsumeDelay(java.time.Duration consumeDelay)
Specify a
Duration to delay consumption when no data received. |
void |
setMessageConverter(MessageConverter messageConverter)
Provide a
MessageConverter (as an alternative to messageMapper )
for converting a consumed data into a message to produce. |
void |
setMessageMapper(InboundMessageMapper<byte[]> messageMapper)
Provide an
InboundMessageMapper to convert a consumed data into a message to produce. |
void |
setReceiveRaw(boolean receiveRaw)
Whether raw
ZMsg is present as a payload of message to produce or
it is fully converted to a Message including ZeroMqHeaders.TOPIC header (if any). |
void |
setSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> socketConfigurer)
Provide a
Consumer to configure a socket with arbitrary options, like security. |
void |
setTopics(String... topics)
Specify topics the
SocketType.SUB socket is going to use for subscription. |
void |
subscribeToTopics(String... topics) |
void |
unsubscribeFromTopics(String... topics) |
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getBeanName, getComponentName
public ZeroMqMessageProducer(org.zeromq.ZContext context)
public ZeroMqMessageProducer(org.zeromq.ZContext context, org.zeromq.SocketType socketType)
public void setConsumeDelay(java.time.Duration consumeDelay)
Duration
to delay consumption when no data received.consumeDelay
- the Duration
to delay consumption when empty;
defaults to DEFAULT_CONSUME_DELAY
.public void setMessageMapper(InboundMessageMapper<byte[]> messageMapper)
InboundMessageMapper
to convert a consumed data into a message to produce.
Ignored when setReceiveRaw(boolean)
is true
.messageMapper
- the InboundMessageMapper
to use.public void setMessageConverter(MessageConverter messageConverter)
MessageConverter
(as an alternative to messageMapper
)
for converting a consumed data into a message to produce.
Ignored when setReceiveRaw(boolean)
is true
.messageConverter
- the MessageConverter
to use.public void setReceiveRaw(boolean receiveRaw)
ZMsg
is present as a payload of message to produce or
it is fully converted to a Message
including ZeroMqHeaders.TOPIC
header (if any).receiveRaw
- to convert from ZMsg
or not; defaults to convert.public void setSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> socketConfigurer)
Consumer
to configure a socket with arbitrary options, like security.socketConfigurer
- the configurer for socket options.public void setTopics(String... topics)
SocketType.SUB
socket is going to use for subscription.
It is ignored for all other SocketType
s supported.topics
- the topics to use.public void setConnectUrl(@Nullable String connectUrl)
ZMQ.Socket.connect(String)
.
Mutually exclusive with the setBindPort(int)
.connectUrl
- the URL to connect ZeroMq socket to.public void setBindPort(int port)
ZMQ.Socket.bind(String)
.
Mutually exclusive with the setConnectUrl(String)
.port
- the port to bind ZeroMq socket to over TCP.public int getBoundPort()
public String getComponentType()
IntegrationObjectSupport
getComponentType
in interface NamedComponent
getComponentType
in class IntegrationObjectSupport
protected void onInit()
IntegrationObjectSupport
onInit
in class MessageProducerSupport
@ManagedOperation public void subscribeToTopics(String... topics)
@ManagedOperation public void unsubscribeFromTopics(String... topics)
protected void doStart()
MessageProducerSupport
doStart
in class MessageProducerSupport
protected void doStop()
MessageProducerSupport
doStop
in class MessageProducerSupport
public void destroy()
destroy
in interface DisposableBean
destroy
in class AbstractEndpoint