public abstract class MessageBusSupport extends java.lang.Object implements MessageBus, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.InitializingBean, org.springframework.integration.expression.IntegrationEvaluationContextAware
Modifier and Type | Class and Description |
---|---|
static class |
MessageBusSupport.DirectHandler |
protected static class |
MessageBusSupport.PartitioningMetadata |
static class |
MessageBusSupport.SetBuilder |
protected class |
MessageBusSupport.SharedChannelProvider<T extends org.springframework.messaging.MessageChannel>
Looks up or optionally creates a new channel to use.
|
Modifier and Type | Field and Description |
---|---|
protected static java.util.Set<java.lang.Object> |
CONSUMER_RETRY_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
CONSUMER_STANDARD_PROPERTIES
The set of properties every bus implementation must support (or at least tolerate).
|
protected long |
defaultBackOffInitialInterval |
protected long |
defaultBackOffMaxInterval |
protected double |
defaultBackOffMultiplier |
protected int |
defaultBatchBufferLimit |
protected boolean |
defaultBatchingEnabled |
protected int |
defaultBatchSize |
protected long |
defaultBatchTimeout |
protected boolean |
defaultCompress |
protected int |
defaultConcurrency |
protected int |
defaultMaxAttempts |
protected MessageBusSupport.SharedChannelProvider<org.springframework.integration.channel.DirectChannel> |
directChannelProvider
Used in the canonical case, when the binding does not involve an alias name.
|
protected org.springframework.expression.EvaluationContext |
evaluationContext |
protected static java.lang.String |
JOB_CHANNEL_TYPE_PREFIX |
protected org.apache.commons.logging.Log |
logger |
protected static java.util.List<org.springframework.util.MimeType> |
MEDIATYPES_MEDIATYPE_ALL |
protected static java.lang.String |
P2P_NAMED_CHANNEL_TYPE_PREFIX |
protected static java.lang.String |
PARTITION_HEADER |
protected static java.util.Set<java.lang.Object> |
PRODUCER_BATCHING_ADVANCED_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
PRODUCER_BATCHING_BASIC_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
PRODUCER_PARTITIONING_PROPERTIES |
protected static java.util.Set<java.lang.Object> |
PRODUCER_STANDARD_PROPERTIES |
protected static java.lang.String |
PUBSUB_NAMED_CHANNEL_TYPE_PREFIX |
Constructor and Description |
---|
MessageBusSupport() |
Modifier and Type | Method and Description |
---|---|
protected void |
addBinding(Binding binding) |
void |
afterPropertiesSet() |
org.springframework.messaging.MessageChannel |
bindDynamicProducer(java.lang.String name,
java.util.Properties properties)
Dynamically create a producer for the named channel.
|
org.springframework.messaging.MessageChannel |
bindDynamicPubSubProducer(java.lang.String name,
java.util.Properties properties)
Dynamically create a producer for the named channel.
|
protected void |
bindExistingProducerDirectlyIfPossible(java.lang.String name,
org.springframework.messaging.MessageChannel consumerChannel)
Attempt to bind a producer directly (avoiding the bus) if there is already a local producer.
|
protected boolean |
bindNewProducerDirectlyIfPossible(java.lang.String name,
org.springframework.messaging.SubscribableChannel moduleOutputChannel,
AbstractBusPropertiesAccessor properties)
Attempt to create a direct binding (avoiding the bus) if the consumer is local.
|
protected java.lang.String |
buildPartitionRoutingExpression(java.lang.String expressionRoot) |
protected org.springframework.retry.support.RetryTemplate |
buildRetryTemplateIfRetryEnabled(AbstractBusPropertiesAccessor properties)
Create and configure a retry template if the consumer 'maxAttempts' property is set.
|
protected void |
deleteBinding(java.lang.String name,
org.springframework.messaging.MessageChannel channel) |
protected void |
deleteBindings(java.lang.String name) |
protected org.springframework.messaging.Message<?> |
deserializePayloadIfNecessary(org.springframework.messaging.Message<?> message) |
protected int |
determinePartition(org.springframework.messaging.Message<?> message,
MessageBusSupport.PartitioningMetadata meta)
Determine the partition to which to send this message.
|
protected org.springframework.messaging.MessageChannel |
doBindDynamicProducer(java.lang.String name,
java.lang.String channelName,
java.util.Properties properties)
Create a producer for the named channel and bind it to the bus.
|
protected org.springframework.messaging.MessageChannel |
doBindDynamicPubSubProducer(java.lang.String name,
java.lang.String channelName,
java.util.Properties properties)
Create a producer for the named channel and bind it to the bus.
|
protected org.springframework.context.support.AbstractApplicationContext |
getApplicationContext() |
protected org.springframework.beans.factory.config.ConfigurableListableBeanFactory |
getBeanFactory() |
protected org.springframework.util.IdGenerator |
getIdGenerator() |
abstract java.lang.String[] |
getMessageBusSpecificProperties() |
protected boolean |
isNamedChannel(java.lang.String name) |
protected void |
onInit() |
protected org.springframework.messaging.Message<?> |
serializePayloadIfNecessary(org.springframework.messaging.Message<?> message,
org.springframework.util.MimeType to) |
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setCodec(MultiTypeCodec<java.lang.Object> codec) |
void |
setDefaultBackOffInitialInterval(long defaultBackOffInitialInterval)
Set the default retry back off initial interval for this bus; can be overridden
with consumer 'backOffInitialInterval' property.
|
void |
setDefaultBackOffMaxInterval(long defaultBackOffMaxInterval)
Set the default retry back off max interval for this bus; can be overridden
with consumer 'backOffMaxInterval' property.
|
void |
setDefaultBackOffMultiplier(double defaultBackOffMultiplier)
Set the default retry back off multiplier for this bus; can be overridden
with consumer 'backOffMultiplier' property.
|
void |
setDefaultBatchBufferLimit(int defaultBatchBufferLimit)
Set the default batch buffer limit - used to send a batch early if
its size exceeds this.
|
void |
setDefaultBatchingEnabled(boolean defaultBatchingEnabled)
Set whether this bus batches message sends by default.
|
void |
setDefaultBatchSize(int defaultBatchSize)
Set the default batch size; only applies when batching is enabled and
the bus supports batching.
|
void |
setDefaultBatchTimeout(long defaultBatchTimeout)
Set the default batch timeout - used to send a batch if no messages
arrive during this time.
|
void |
setDefaultCompress(boolean defaultCompress)
Set whether compression will be used by producers, by default.
|
void |
setDefaultConcurrency(int defaultConcurrency)
Set the default concurrency for this bus; can be overridden
with consumer 'concurrency' property.
|
void |
setDefaultMaxAttempts(int defaultMaxAttempts)
The default maximum delivery attempts for this bus.
|
void |
setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext) |
void |
setPartitionSelector(PartitionSelectorStrategy partitionSelector)
Set the partition strategy to be used by this bus if no partitionExpression
is provided for a module.
|
protected void |
stopBindings() |
void |
unbindConsumer(java.lang.String name,
org.springframework.messaging.MessageChannel channel)
Unbind a specific p2p or pub/sub message consumer
|
void |
unbindConsumers(java.lang.String name)
Unbind an inbound inter-module channel and stop any active components that use the channel.
|
void |
unbindProducer(java.lang.String name,
org.springframework.messaging.MessageChannel channel)
Unbind a specific p2p or pub/sub message producer
|
void |
unbindProducers(java.lang.String name)
Unbind an outbound inter-module channel and stop any active components that use the channel.
|
protected void |
validateConsumerProperties(java.lang.String name,
java.util.Properties properties,
java.util.Set<java.lang.Object> supported)
Validate the provided deployment properties for the consumer against those supported by
this bus implementation.
|
protected void |
validateProducerProperties(java.lang.String name,
java.util.Properties properties,
java.util.Set<java.lang.Object> supported)
Validate the provided deployment properties for the producer against those supported by
this bus implementation.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
bindConsumer, bindProducer, bindPubSubConsumer, bindPubSubProducer, bindReplier, bindRequestor
protected static final java.lang.String P2P_NAMED_CHANNEL_TYPE_PREFIX
protected static final java.lang.String PUBSUB_NAMED_CHANNEL_TYPE_PREFIX
protected static final java.lang.String JOB_CHANNEL_TYPE_PREFIX
protected static final java.lang.String PARTITION_HEADER
protected final org.apache.commons.logging.Log logger
protected static final java.util.List<org.springframework.util.MimeType> MEDIATYPES_MEDIATYPE_ALL
protected static final java.util.Set<java.lang.Object> CONSUMER_STANDARD_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_STANDARD_PROPERTIES
protected static final java.util.Set<java.lang.Object> CONSUMER_RETRY_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_PARTITIONING_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_BATCHING_BASIC_PROPERTIES
protected static final java.util.Set<java.lang.Object> PRODUCER_BATCHING_ADVANCED_PROPERTIES
protected volatile org.springframework.expression.EvaluationContext evaluationContext
protected final MessageBusSupport.SharedChannelProvider<org.springframework.integration.channel.DirectChannel> directChannelProvider
protected volatile long defaultBackOffInitialInterval
protected volatile long defaultBackOffMaxInterval
protected volatile double defaultBackOffMultiplier
protected volatile int defaultConcurrency
protected volatile int defaultMaxAttempts
protected volatile boolean defaultBatchingEnabled
protected volatile int defaultBatchSize
protected volatile int defaultBatchBufferLimit
protected volatile long defaultBatchTimeout
protected volatile boolean defaultCompress
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
org.springframework.beans.BeansException
protected org.springframework.context.support.AbstractApplicationContext getApplicationContext()
protected org.springframework.beans.factory.config.ConfigurableListableBeanFactory getBeanFactory()
public void setCodec(MultiTypeCodec<java.lang.Object> codec)
protected org.springframework.util.IdGenerator getIdGenerator()
public void setIntegrationEvaluationContext(org.springframework.expression.EvaluationContext evaluationContext)
setIntegrationEvaluationContext
in interface org.springframework.integration.expression.IntegrationEvaluationContextAware
public void setPartitionSelector(PartitionSelectorStrategy partitionSelector)
partitionSelector
- The selector.public void setDefaultBackOffInitialInterval(long defaultBackOffInitialInterval)
defaultBackOffInitialInterval
- public void setDefaultBackOffMultiplier(double defaultBackOffMultiplier)
defaultBackOffMultiplier
- public void setDefaultBackOffMaxInterval(long defaultBackOffMaxInterval)
defaultBackOffMaxInterval
- public void setDefaultConcurrency(int defaultConcurrency)
defaultConcurrency
- public void setDefaultMaxAttempts(int defaultMaxAttempts)
defaultMaxAttempts
- The default maximum attempts.public void setDefaultBatchingEnabled(boolean defaultBatchingEnabled)
defaultBatchingEnabled
- the defaultBatchingEnabled to set.public void setDefaultBatchSize(int defaultBatchSize)
defaultBatchSize
- the defaultBatchSize to set.public void setDefaultBatchBufferLimit(int defaultBatchBufferLimit)
defaultBatchBufferLimit
- the defaultBatchBufferLimit to set.public void setDefaultBatchTimeout(long defaultBatchTimeout)
defaultBatchTimeout
- the defaultBatchTimeout to set.public void setDefaultCompress(boolean defaultCompress)
defaultCompress
- 'true' to use compression.public void afterPropertiesSet() throws java.lang.Exception
afterPropertiesSet
in interface org.springframework.beans.factory.InitializingBean
java.lang.Exception
protected void onInit()
public abstract java.lang.String[] getMessageBusSpecificProperties()
public org.springframework.messaging.MessageChannel bindDynamicProducer(java.lang.String name, java.util.Properties properties)
bindDynamicProducer
in interface MessageBus
name
- The name.properties
- The properties.protected org.springframework.messaging.MessageChannel doBindDynamicProducer(java.lang.String name, java.lang.String channelName, java.util.Properties properties)
name
- The name.channelName
- The name of the channel to be created, and registered as bean.properties
- The properties.public org.springframework.messaging.MessageChannel bindDynamicPubSubProducer(java.lang.String name, java.util.Properties properties)
bindDynamicPubSubProducer
in interface MessageBus
name
- The name.properties
- The properties.protected org.springframework.messaging.MessageChannel doBindDynamicPubSubProducer(java.lang.String name, java.lang.String channelName, java.util.Properties properties)
name
- The name.channelName
- The name of the channel to be created, and registered as bean.properties
- The properties.public void unbindConsumers(java.lang.String name)
MessageBus
unbindConsumers
in interface MessageBus
name
- the channel namepublic void unbindProducers(java.lang.String name)
MessageBus
unbindProducers
in interface MessageBus
name
- the channel namepublic void unbindConsumer(java.lang.String name, org.springframework.messaging.MessageChannel channel)
MessageBus
unbindConsumer
in interface MessageBus
name
- The logical identify of a message sourcechannel
- The channel bound as a consumerpublic void unbindProducer(java.lang.String name, org.springframework.messaging.MessageChannel channel)
MessageBus
unbindProducer
in interface MessageBus
name
- the logical identity of the message targetchannel
- the channel bound as a producerprotected void addBinding(Binding binding)
protected void deleteBindings(java.lang.String name)
protected void deleteBinding(java.lang.String name, org.springframework.messaging.MessageChannel channel)
protected void stopBindings()
protected final org.springframework.messaging.Message<?> serializePayloadIfNecessary(org.springframework.messaging.Message<?> message, org.springframework.util.MimeType to)
protected final org.springframework.messaging.Message<?> deserializePayloadIfNecessary(org.springframework.messaging.Message<?> message)
protected int determinePartition(org.springframework.messaging.Message<?> message, MessageBusSupport.PartitioningMetadata meta)
If no partition expression is provided, the key will be passed to the bus partition
strategy along with the partitionCount.
The default partition strategy uses key.hashCode()
, and the result will
be the mod of that value.
message
- the message.meta
- the partitioning metadata.protected void validateConsumerProperties(java.lang.String name, java.util.Properties properties, java.util.Set<java.lang.Object> supported)
name
- The name.properties
- The properties.supported
- The supported properties.protected void validateProducerProperties(java.lang.String name, java.util.Properties properties, java.util.Set<java.lang.Object> supported)
name
- The name.properties
- The properties.supported
- The supported properties.protected java.lang.String buildPartitionRoutingExpression(java.lang.String expressionRoot)
protected org.springframework.retry.support.RetryTemplate buildRetryTemplateIfRetryEnabled(AbstractBusPropertiesAccessor properties)
properties
- The properties.protected boolean isNamedChannel(java.lang.String name)
protected boolean bindNewProducerDirectlyIfPossible(java.lang.String name, org.springframework.messaging.SubscribableChannel moduleOutputChannel, AbstractBusPropertiesAccessor properties)
name
- The name.moduleOutputChannel
- The channel to bind.properties
- The producer properties.protected void bindExistingProducerDirectlyIfPossible(java.lang.String name, org.springframework.messaging.MessageChannel consumerChannel)
name
- The name.consumerChannel
- The channel to bind the producer to.