@ManagedResource @IntegrationManagedResource public class ZeroMqMessageProducer extends MessageProducerSupport
MessageProducerSupport implementation for consuming messages from ZeroMq socket.
Only SocketType.PAIR, SocketType.SUB and SocketType.PULL are supported.
This component can bind or connect the socket.
When the SocketType.SUB is used, the received topic is stored in the ZeroMqHeaders.TOPIC.
| Modifier and Type | Field and Description |
|---|---|
static java.time.Duration |
DEFAULT_CONSUME_DELAY |
lifecycleCondition, lifecycleLockEXPRESSION_PARSER, loggerDEFAULT_PHASE| Constructor and Description |
|---|
ZeroMqMessageProducer(org.zeromq.ZContext context) |
ZeroMqMessageProducer(org.zeromq.ZContext context,
org.zeromq.SocketType socketType) |
| Modifier and Type | Method and Description |
|---|---|
void |
destroy() |
protected void |
doStart()
Take no action by default.
|
protected void |
doStop()
Take no action by default.
|
int |
getBoundPort()
Return the port a socket is bound or 0 if this message producer has not been started yet
or the socket is connected - not bound.
|
String |
getComponentType()
Subclasses may implement this method to provide component type information.
|
protected void |
onInit()
Subclasses may implement this for initialization logic.
|
void |
setBindPort(int port)
Configure a port for TCP protocol binding via
ZMQ.Socket.bind(String). |
void |
setConnectUrl(String connectUrl)
Configure an URL for
ZMQ.Socket.connect(String). |
void |
setConsumeDelay(java.time.Duration consumeDelay)
Specify a
Duration to delay consumption when no data received. |
void |
setMessageConverter(MessageConverter messageConverter)
Provide a
MessageConverter (as an alternative to messageMapper)
for converting a consumed data into a message to produce. |
void |
setMessageMapper(InboundMessageMapper<byte[]> messageMapper)
Provide an
InboundMessageMapper to convert a consumed data into a message to produce. |
void |
setReceiveRaw(boolean receiveRaw)
Whether raw
ZMsg is present as a payload of message to produce or
it is fully converted to a Message including ZeroMqHeaders.TOPIC header (if any). |
void |
setSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> socketConfigurer)
Provide a
Consumer to configure a socket with arbitrary options, like security. |
void |
setTopics(String... topics)
Specify topics the
SocketType.SUB socket is going to use for subscription. |
void |
subscribeToTopics(String... topics) |
void |
unsubscribeFromTopics(String... topics) |
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageAttributes, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherdoStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitgetBeanName, getComponentNamepublic ZeroMqMessageProducer(org.zeromq.ZContext context)
public ZeroMqMessageProducer(org.zeromq.ZContext context,
org.zeromq.SocketType socketType)
public void setConsumeDelay(java.time.Duration consumeDelay)
Duration to delay consumption when no data received.consumeDelay - the Duration to delay consumption when empty;
defaults to DEFAULT_CONSUME_DELAY.public void setMessageMapper(InboundMessageMapper<byte[]> messageMapper)
InboundMessageMapper to convert a consumed data into a message to produce.
Ignored when setReceiveRaw(boolean) is true.messageMapper - the InboundMessageMapper to use.public void setMessageConverter(MessageConverter messageConverter)
MessageConverter (as an alternative to messageMapper)
for converting a consumed data into a message to produce.
Ignored when setReceiveRaw(boolean) is true.messageConverter - the MessageConverter to use.public void setReceiveRaw(boolean receiveRaw)
ZMsg is present as a payload of message to produce or
it is fully converted to a Message including ZeroMqHeaders.TOPIC header (if any).receiveRaw - to convert from ZMsg or not; defaults to convert.public void setSocketConfigurer(java.util.function.Consumer<org.zeromq.ZMQ.Socket> socketConfigurer)
Consumer to configure a socket with arbitrary options, like security.socketConfigurer - the configurer for socket options.public void setTopics(String... topics)
SocketType.SUB socket is going to use for subscription.
It is ignored for all other SocketTypes supported.topics - the topics to use.public void setConnectUrl(@Nullable String connectUrl)
ZMQ.Socket.connect(String).
Mutually exclusive with the setBindPort(int).connectUrl - the URL to connect ZeroMq socket to.public void setBindPort(int port)
ZMQ.Socket.bind(String).
Mutually exclusive with the setConnectUrl(String).port - the port to bind ZeroMq socket to over TCP.public int getBoundPort()
public String getComponentType()
IntegrationObjectSupportgetComponentType in interface NamedComponentgetComponentType in class IntegrationObjectSupportprotected void onInit()
IntegrationObjectSupportonInit in class MessageProducerSupport@ManagedOperation public void subscribeToTopics(String... topics)
@ManagedOperation public void unsubscribeFromTopics(String... topics)
protected void doStart()
MessageProducerSupportdoStart in class MessageProducerSupportprotected void doStop()
MessageProducerSupportdoStop in class MessageProducerSupportpublic void destroy()
destroy in interface DisposableBeandestroy in class AbstractEndpoint