public abstract class MessageBusSupport extends java.lang.Object implements MessageBus, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.InitializingBean
Modifier and Type | Class and Description |
---|---|
static class |
MessageBusSupport.DirectHandler |
protected static class |
MessageBusSupport.PartitioningMetadata |
static class |
MessageBusSupport.SetBuilder |
protected class |
MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>
Looks up or optionally creates a new channel to use.
|
MessageBus.Capability
Modifier and Type | Field and Description |
---|---|
protected static java.util.Set<java.lang.Object> |
CONSUMER_RETRY_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
CONSUMER_STANDARD_PROPERTIES
The set of properties every bus implementation must support (or at least tolerate).
|
protected long |
defaultBackOffInitialInterval |
protected long |
defaultBackOffMaxInterval |
protected double |
defaultBackOffMultiplier |
protected int |
defaultBatchBufferLimit |
protected boolean |
defaultBatchingEnabled |
protected int |
defaultBatchSize |
protected long |
defaultBatchTimeout |
protected boolean |
defaultCompress |
protected int |
defaultConcurrency |
protected boolean |
defaultDurableSubscription |
protected int |
defaultMaxAttempts |
protected MessageBusSupport.SharedChannelProvider<org.springframework.integration.channel.DirectChannel> |
directChannelProvider
Used in the canonical case, when the binding does not involve an alias name.
|
protected org.springframework.expression.EvaluationContext |
evaluationContext |
protected static java.lang.String |
JOB_CHANNEL_TYPE_PREFIX |
protected org.slf4j.Logger |
logger |
protected static java.util.List<org.springframework.util.MimeType> |
MEDIATYPES_MEDIATYPE_ALL |
protected static java.lang.String |
P2P_NAMED_CHANNEL_TYPE_PREFIX |
protected static java.lang.String |
PARTITION_HEADER |
protected static java.util.Set<java.lang.Object> |
PRODUCER_BATCHING_ADVANCED_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
PRODUCER_BATCHING_BASIC_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
PRODUCER_PARTITIONING_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
PRODUCER_STANDARD_PROPERTIES |
protected static java.lang.String |
PUBSUB_NAMED_CHANNEL_TYPE_PREFIX |
protected static java.lang.String |
TAP_TYPE_PREFIX |
Constructor and Description |
---|
MessageBusSupport() |
Modifier and Type | Method and Description |
---|---|
protected void |
addBinding(Binding binding) |
void |
afterPropertiesSet() |
static java.lang.String |
applyPrefix(java.lang.String prefix,
java.lang.String name)
For bus implementations that support a prefix, apply the prefix to the name.
|
static java.lang.String |
applyPubSub(java.lang.String name)
For bus implementations that include a pub/sub component in identifiers, construct the name.
|
static java.lang.String |
applyRequests(java.lang.String name)
Build the requests entity name.
|
org.springframework.messaging.MessageChannel |
bindDynamicProducer(java.lang.String name,
java.util.Properties properties)
Dynamically create a producer for the named channel.
|
org.springframework.messaging.MessageChannel |
bindDynamicPubSubProducer(java.lang.String name,
java.util.Properties properties)
Dynamically create a producer for the named channel.
|
protected void |
bindExistingProducerDirectlyIfPossible(java.lang.String name,
org.springframework.messaging.MessageChannel consumerChannel)
Attempt to bind a producer directly (avoiding the bus) if there is already a local producer.
|
protected boolean |
bindNewProducerDirectlyIfPossible(java.lang.String name,
org.springframework.messaging.SubscribableChannel moduleOutputChannel,
AbstractBusPropertiesAccessor properties)
Attempt to create a direct binding (avoiding the bus) if the consumer is local.
|
protected java.lang.String |
buildPartitionRoutingExpression(java.lang.String expressionRoot) |
protected org.springframework.retry.support.RetryTemplate |
buildRetryTemplateIfRetryEnabled(AbstractBusPropertiesAccessor properties)
Create and configure a retry template if the consumer 'maxAttempts' property is set.
|
static java.lang.String |
constructDLQName(java.lang.String name)
For bus implementations that support dead lettering, construct the name of the dead letter entity for the
underlying pipe name.
|
protected void |
deleteBinding(java.lang.String name,
org.springframework.messaging.MessageChannel channel) |
protected void |
deleteBindings(java.lang.String name) |
protected MessageValues |
deserializePayloadIfNecessary(org.springframework.messaging.Message<?> message) |
protected MessageValues |
deserializePayloadIfNecessary(MessageValues message) |
protected int |
determinePartition(org.springframework.messaging.Message<?> message,
MessageBusSupport.PartitioningMetadata meta)
Determine the partition to which to send this message.
|
protected org.springframework.messaging.MessageChannel |
doBindDynamicProducer(java.lang.String name,
java.lang.String channelName,
java.util.Properties properties)
Create a producer for the named channel and bind it to the bus.
|
protected org.springframework.messaging.MessageChannel |
doBindDynamicPubSubProducer(java.lang.String name,
java.lang.String channelName,
java.util.Properties properties)
Create a producer for the named channel and bind it to the bus.
|
void |
doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeaders)
Perform manual acknowledgement based on the metadata stored in message bus.
|
protected org.springframework.context.support.AbstractApplicationContext |
getApplicationContext() |
protected org.springframework.beans.factory.config.ConfigurableListableBeanFactory |
getBeanFactory() |
protected org.springframework.util.IdGenerator |
getIdGenerator() |
boolean |
isCapable(MessageBus.Capability capability)
Return true if the bus supports the capability.
|
protected boolean |
isNamedChannel(java.lang.String name) |
protected void |
onInit() |
protected MessageValues |
serializePayloadIfNecessary(org.springframework.messaging.Message<?> message) |
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setCodec(org.springframework.integration.codec.Codec codec) |
void |
setDefaultBackOffInitialInterval(long defaultBackOffInitialInterval)
Set the default retry back off initial interval for this bus; can be overridden with consumer
'backOffInitialInterval' property.
|
void |
setDefaultBackOffMaxInterval(long defaultBackOffMaxInterval)
Set the default retry back off max interval for this bus; can be overridden with consumer 'backOffMaxInterval'
property.
|
void |
setDefaultBackOffMultiplier(double defaultBackOffMultiplier)
Set the default retry back off multiplier for this bus; can be overridden with consumer 'backOffMultiplier'
property.
|
void |
setDefaultBatchBufferLimit(int defaultBatchBufferLimit)
Set the default batch buffer limit - used to send a batch early if its size exceeds this.
|
void |
setDefaultBatchingEnabled(boolean defaultBatchingEnabled)
Set whether this bus batches message sends by default.
|
void |
setDefaultBatchSize(int defaultBatchSize)
Set the default batch size; only applies when batching is enabled and the bus supports batching.
|
void |
setDefaultBatchTimeout(long defaultBatchTimeout)
Set the default batch timeout - used to send a batch if no messages arrive during this time.
|
void |
setDefaultCompress(boolean defaultCompress)
Set whether compression will be used by producers, by default.
|
void |
setDefaultConcurrency(int defaultConcurrency)
Set the default concurrency for this bus; can be overridden with consumer 'concurrency' property.
|
void |
setDefaultDurableSubscription(boolean defaultDurableSubscription)
Set whether subscriptions to taps/topics are durable.
|
void |
setDefaultMaxAttempts(int defaultMaxAttempts)
The default maximum delivery attempts for this bus.
|
void |
setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext) |
void |
setPartitionSelector(PartitionSelectorStrategy partitionSelector)
Set the partition strategy to be used by this bus if no partitionExpression is provided for a module.
|
protected void |
stopBindings() |
void |
unbindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel channel)
Unbind a specific p2p or pub/sub message consumer
|
void |
unbindConsumers(java.lang.String name)
Unbind an inbound inter-module channel and stop any active components that use the channel.
|
void |
unbindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel channel)
Unbind a specific p2p or pub/sub message producer
|
void |
unbindProducers(java.lang.String name)
Unbind an outbound inter-module channel and stop any active components that use the channel.
|
protected void |
validateConsumerProperties(java.lang.String name,
java.util.Properties properties,
java.util.Set<java.lang.Object> supported)
Validate the provided deployment properties for the consumer against those supported by this bus implementation.
|
protected void |
validateProducerProperties(java.lang.String name,
java.util.Properties properties,
java.util.Set<java.lang.Object> supported)
Validate the provided deployment properties for the producer against those supported by this bus implementation.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
bindConsumer, bindProducer, bindPubSubConsumer, bindPubSubProducer, bindReplier, bindRequestor
protected static final java.lang.String P2P_NAMED_CHANNEL_TYPE_PREFIX
protected static final java.lang.String TAP_TYPE_PREFIX
protected static final java.lang.String PUBSUB_NAMED_CHANNEL_TYPE_PREFIX
protected static final java.lang.String JOB_CHANNEL_TYPE_PREFIX
protected static final java.lang.String PARTITION_HEADER
protected final org.slf4j.Logger logger
protected static final java.util.List<org.springframework.util.MimeType> MEDIATYPES_MEDIATYPE_ALL
protected static final java.util.Set<java.lang.Object> CONSUMER_STANDARD_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_STANDARD_PROPERTIES
protected static final java.util.Set<java.lang.Object> CONSUMER_RETRY_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_PARTITIONING_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_BATCHING_BASIC_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_BATCHING_ADVANCED_PROPERTIES
protected volatile org.springframework.expression.EvaluationContext evaluationContext
protected final MessageBusSupport.SharedChannelProvider<org.springframework.integration.channel.DirectChannel> directChannelProvider
protected volatile long defaultBackOffInitialInterval
protected volatile long defaultBackOffMaxInterval
protected volatile double defaultBackOffMultiplier
protected volatile int defaultConcurrency
protected volatile int defaultMaxAttempts
protected volatile boolean defaultBatchingEnabled
protected volatile int defaultBatchSize
protected volatile int defaultBatchBufferLimit
protected volatile long defaultBatchTimeout
protected volatile boolean defaultCompress
protected volatile boolean defaultDurableSubscription
public static java.lang.String applyPrefix(java.lang.String prefix, java.lang.String name)
prefix
- the prefix.name
- the name.public static java.lang.String applyPubSub(java.lang.String name)
name
- the name.public static java.lang.String applyRequests(java.lang.String name)
name
- the name.public static java.lang.String constructDLQName(java.lang.String name)
name
- the name.public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
org.springframework.beans.BeansException
protected org.springframework.context.support.AbstractApplicationContext getApplicationContext()
protected org.springframework.beans.factory.config.ConfigurableListableBeanFactory getBeanFactory()
public void setCodec(org.springframework.integration.codec.Codec codec)
protected org.springframework.util.IdGenerator getIdGenerator()
public void setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext)
public void setPartitionSelector(PartitionSelectorStrategy partitionSelector)
partitionSelector
- The selector.public void setDefaultBackOffInitialInterval(long defaultBackOffInitialInterval)
defaultBackOffInitialInterval
- public void setDefaultBackOffMultiplier(double defaultBackOffMultiplier)
defaultBackOffMultiplier
- public void setDefaultBackOffMaxInterval(long defaultBackOffMaxInterval)
defaultBackOffMaxInterval
- public void setDefaultConcurrency(int defaultConcurrency)
defaultConcurrency
- public void setDefaultMaxAttempts(int defaultMaxAttempts)
defaultMaxAttempts
- The default maximum attempts.public void setDefaultBatchingEnabled(boolean defaultBatchingEnabled)
defaultBatchingEnabled
- the defaultBatchingEnabled to set.public void setDefaultBatchSize(int defaultBatchSize)
defaultBatchSize
- the defaultBatchSize to set.public void setDefaultBatchBufferLimit(int defaultBatchBufferLimit)
defaultBatchBufferLimit
- the defaultBatchBufferLimit to set.public void setDefaultBatchTimeout(long defaultBatchTimeout)
defaultBatchTimeout
- the defaultBatchTimeout to set.public void setDefaultCompress(boolean defaultCompress)
defaultCompress
- 'true' to use compression.public void setDefaultDurableSubscription(boolean defaultDurableSubscription)
defaultDurableSubscription
- true for durable (default false).public void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
java.lang.Exception
protected void onInit()
public org.springframework.messaging.MessageChannel bindDynamicProducer(java.lang.String name, java.util.Properties properties)
bindDynamicProducer
in interface MessageBus
name
- The name.properties
- The properties.protected org.springframework.messaging.MessageChannel doBindDynamicProducer(java.lang.String name, java.lang.String channelName, java.util.Properties properties)
name
- The name.channelName
- The name of the channel to be created, and registered as bean.properties
- The properties.public org.springframework.messaging.MessageChannel bindDynamicPubSubProducer(java.lang.String name, java.util.Properties properties)
bindDynamicPubSubProducer
in interface MessageBus
name
- The name.properties
- The properties.protected org.springframework.messaging.MessageChannel doBindDynamicPubSubProducer(java.lang.String name, java.lang.String channelName, java.util.Properties properties)
name
- The name.channelName
- The name of the channel to be created, and registered as bean.properties
- The properties.public void unbindConsumers(java.lang.String name)
MessageBus
unbindConsumers
in interface MessageBus
name
- the channel namepublic void unbindProducers(java.lang.String name)
MessageBus
unbindProducers
in interface MessageBus
name
- the channel namepublic void unbindConsumer(java.lang.String name, org.springframework.messaging.MessageChannel channel)
MessageBus
unbindConsumer
in interface MessageBus
name
- The logical identify of a message sourcechannel
- The channel bound as a consumerpublic void unbindProducer(java.lang.String name, org.springframework.messaging.MessageChannel channel)
MessageBus
unbindProducer
in interface MessageBus
name
- the logical identity of the message targetchannel
- the channel bound as a producerpublic boolean isCapable(MessageBus.Capability capability)
MessageBus
isCapable
in interface MessageBus
capability
- the capability.protected void addBinding(Binding binding)
protected void deleteBindings(java.lang.String name)
protected void deleteBinding(java.lang.String name, org.springframework.messaging.MessageChannel channel)
protected void stopBindings()
protected final MessageValues serializePayloadIfNecessary(org.springframework.messaging.Message<?> message)
protected final MessageValues deserializePayloadIfNecessary(org.springframework.messaging.Message<?> message)
protected final MessageValues deserializePayloadIfNecessary(MessageValues message)
protected int determinePartition(org.springframework.messaging.Message<?> message, MessageBusSupport.PartitioningMetadata meta)
key.hashCode()
, and the result will be the mod of that value.message
- the message.meta
- the partitioning metadata.protected void validateConsumerProperties(java.lang.String name, java.util.Properties properties, java.util.Set<java.lang.Object> supported)
name
- The name.properties
- The properties.supported
- The supported properties.protected void validateProducerProperties(java.lang.String name, java.util.Properties properties, java.util.Set<java.lang.Object> supported)
name
- The name.properties
- The properties.supported
- The supported properties.protected java.lang.String buildPartitionRoutingExpression(java.lang.String expressionRoot)
protected org.springframework.retry.support.RetryTemplate buildRetryTemplateIfRetryEnabled(AbstractBusPropertiesAccessor properties)
properties
- The properties.protected boolean isNamedChannel(java.lang.String name)
protected boolean bindNewProducerDirectlyIfPossible(java.lang.String name, org.springframework.messaging.SubscribableChannel moduleOutputChannel, AbstractBusPropertiesAccessor properties)
name
- The name.moduleOutputChannel
- The channel to bind.properties
- The producer properties.protected void bindExistingProducerDirectlyIfPossible(java.lang.String name, org.springframework.messaging.MessageChannel consumerChannel)
name
- The name.consumerChannel
- The channel to bind the producer to.public void doManualAck(java.util.LinkedList<org.springframework.messaging.MessageHeaders> messageHeaders)