public abstract class AbstractBrokerMessageHandler extends Object implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle
MessageHandler
that broker messages to
registered subscribers.Modifier and Type | Field and Description |
---|---|
protected Log |
logger |
DEFAULT_PHASE
Constructor and Description |
---|
AbstractBrokerMessageHandler(SubscribableChannel inboundChannel,
MessageChannel outboundChannel,
SubscribableChannel brokerChannel)
Constructor with no destination prefixes (matches all destinations).
|
AbstractBrokerMessageHandler(SubscribableChannel inboundChannel,
MessageChannel outboundChannel,
SubscribableChannel brokerChannel,
Collection<String> destinationPrefixes)
Constructor with destination prefixes to match to destinations of messages.
|
Modifier and Type | Method and Description |
---|---|
protected boolean |
checkDestinationPrefix(String destination)
Whether a message with the given destination should be processed.
|
ApplicationEventPublisher |
getApplicationEventPublisher() |
SubscribableChannel |
getBrokerChannel() |
SubscribableChannel |
getClientInboundChannel() |
MessageChannel |
getClientOutboundChannel() |
protected MessageChannel |
getClientOutboundChannelForSession(String sessionId)
Get the MessageChannel to use for sending messages to clients, possibly
a per-session wrapper when
preservePublishOrder=true . |
Collection<String> |
getDestinationPrefixes()
Return destination prefixes prefixes to use to filter messages to forward
to the broker.
|
void |
handleMessage(Message<?> message)
Handle the given message.
|
protected abstract void |
handleMessageInternal(Message<?> message) |
boolean |
isAutoStartup()
Returns
true if this Lifecycle component should get
started automatically by the container at the time that the containing
ApplicationContext gets refreshed. |
boolean |
isBrokerAvailable()
Whether the message broker is currently available and able to process messages.
|
boolean |
isPreservePublishOrder()
Whether to ensure messages are received in the order of publication.
|
boolean |
isRunning()
Check whether this message handler is currently running.
|
protected void |
publishBrokerAvailableEvent() |
protected void |
publishBrokerUnavailableEvent() |
void |
setApplicationEventPublisher(ApplicationEventPublisher publisher)
Set the ApplicationEventPublisher that this object runs in.
|
void |
setAutoStartup(boolean autoStartup) |
void |
setPreservePublishOrder(boolean preservePublishOrder)
Whether the client must receive messages in the order of publication.
|
void |
setUserDestinationPredicate(Predicate<String> predicate)
Configure a Predicate to identify messages with a user destination.
|
void |
start()
Start this component.
|
protected void |
startInternal() |
void |
stop()
Stop this component, typically in a synchronous fashion, such that the component is
fully stopped upon return of this method.
|
void |
stop(Runnable callback)
Indicates that a Lifecycle component must stop if it is currently running.
|
protected void |
stopInternal() |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getPhase
protected final Log logger
public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, SubscribableChannel brokerChannel)
inboundChannel
- the channel for receiving messages from clients (e.g. WebSocket clients)outboundChannel
- the channel for sending messages to clients (e.g. WebSocket clients)brokerChannel
- the channel for the application to send messages to the brokerpublic AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, SubscribableChannel brokerChannel, @Nullable Collection<String> destinationPrefixes)
inboundChannel
- the channel for receiving messages from clients (e.g. WebSocket clients)outboundChannel
- the channel for sending messages to clients (e.g. WebSocket clients)brokerChannel
- the channel for the application to send messages to the brokerdestinationPrefixes
- prefixes to use to filter out messagespublic SubscribableChannel getClientInboundChannel()
public MessageChannel getClientOutboundChannel()
public SubscribableChannel getBrokerChannel()
public Collection<String> getDestinationPrefixes()
By default this is not set.
public void setUserDestinationPredicate(@Nullable Predicate<String> predicate)
destination prefixes
are configured,
this helps to recognize and skip user destination messages that need to
be pre-processed by the
UserDestinationMessageHandler
before they reach the broker.predicate
- the predicate to identify user messages with a non-null
destination as messages with a user destinations.public void setPreservePublishOrder(boolean preservePublishOrder)
By default messages sent to the "clientOutboundChannel"
may
not be processed in the same order because the channel is backed by a
ThreadPoolExecutor that in turn does not guarantee processing in order.
When this flag is set to true
messages within the same session
will be sent to the "clientOutboundChannel"
one at a time in
order to preserve the order of publication. Enable this only if needed
since there is some performance overhead to keep messages in order.
preservePublishOrder
- whether to publish in orderpublic boolean isPreservePublishOrder()
public void setApplicationEventPublisher(@Nullable ApplicationEventPublisher publisher)
ApplicationEventPublisherAware
Invoked after population of normal bean properties but before an init callback like InitializingBean's afterPropertiesSet or a custom init-method. Invoked before ApplicationContextAware's setApplicationContext.
setApplicationEventPublisher
in interface ApplicationEventPublisherAware
publisher
- event publisher to be used by this object@Nullable public ApplicationEventPublisher getApplicationEventPublisher()
public void setAutoStartup(boolean autoStartup)
public boolean isAutoStartup()
SmartLifecycle
true
if this Lifecycle
component should get
started automatically by the container at the time that the containing
ApplicationContext
gets refreshed.
A value of false
indicates that the component is intended to
be started through an explicit Lifecycle.start()
call instead, analogous
to a plain Lifecycle
implementation.
The default implementation returns true
.
isAutoStartup
in interface SmartLifecycle
Lifecycle.start()
,
SmartLifecycle.getPhase()
,
LifecycleProcessor.onRefresh()
,
ConfigurableApplicationContext.refresh()
public void start()
Lifecycle
Should not throw an exception if the component is already running.
In the case of a container, this will propagate the start signal to all components that apply.
start
in interface Lifecycle
SmartLifecycle.isAutoStartup()
protected void startInternal()
public void stop()
Lifecycle
SmartLifecycle
and its stop(Runnable)
variant when asynchronous stop behavior is necessary.
Note that this stop notification is not guaranteed to come before destruction:
On regular shutdown, Lifecycle
beans will first receive a stop notification
before the general destruction callbacks are being propagated; however, on hot
refresh during a context's lifetime or on aborted refresh attempts, a given bean's
destroy method will be called without any consideration of stop signals upfront.
Should not throw an exception if the component is not running (not started yet).
In the case of a container, this will propagate the stop signal to all components that apply.
stop
in interface Lifecycle
SmartLifecycle.stop(Runnable)
,
DisposableBean.destroy()
protected void stopInternal()
public final void stop(Runnable callback)
SmartLifecycle
The provided callback is used by the LifecycleProcessor
to support
an ordered, and potentially concurrent, shutdown of all components having a
common shutdown order value. The callback must be executed after
the SmartLifecycle
component does indeed stop.
The LifecycleProcessor
will call only this variant of the
stop
method; i.e. Lifecycle.stop()
will not be called for
SmartLifecycle
implementations unless explicitly delegated to within
the implementation of this method.
The default implementation delegates to Lifecycle.stop()
and immediately
triggers the given callback in the calling thread. Note that there is no
synchronization between the two, so custom implementations may at least
want to put the same steps within their common lifecycle monitor (if any).
stop
in interface SmartLifecycle
Lifecycle.stop()
,
SmartLifecycle.getPhase()
public final boolean isRunning()
Note that even when this message handler is running the
isBrokerAvailable()
flag may still independently alternate between
being on and off depending on the concrete subclass implementation.
public boolean isBrokerAvailable()
Note that this is in addition to the isRunning()
flag, which
indicates whether this message handler is running. In other words the message
handler must first be running and then the #isBrokerAvailable()
flag
may still independently alternate between being on and off depending on the
concrete subclass implementation.
Application components may implement
org.springframework.context.ApplicationListener<BrokerAvailabilityEvent>
to receive notifications when broker becomes available and unavailable.
public void handleMessage(Message<?> message)
MessageHandler
handleMessage
in interface MessageHandler
message
- the message to be handledprotected abstract void handleMessageInternal(Message<?> message)
protected boolean checkDestinationPrefix(@Nullable String destination)
destination prefixes
.
userDestinationPredicate
.
destination
- the destination to checkprotected void publishBrokerAvailableEvent()
protected void publishBrokerUnavailableEvent()
protected MessageChannel getClientOutboundChannelForSession(String sessionId)
preservePublishOrder=true
.