public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
MessageHandler
that handles messages by
forwarding them to a STOMP broker.
For each new CONNECT
message, an independent TCP
connection to the broker is opened and used exclusively for all messages from the
client that originated the CONNECT message. Messages from the same client are
identified through the session id message header. Reversely, when the STOMP broker
sends messages back on the TCP connection, those messages are enriched with the
session id of the client and sent back downstream through the MessageChannel
provided to the constructor.
This class also automatically opens a default "system" TCP connection to the message broker that is used for sending messages that originate from the server application (as opposed to from a client). Such messages are not associated with any client and therefore do not have a session id header. The "system" connection is effectively shared and cannot be used to receive messages. Several properties are provided to configure the "system" connection including:
Modifier and Type | Class and Description |
---|---|
static interface |
StompBrokerRelayMessageHandler.Stats
Contract for access to session counters.
|
Modifier and Type | Field and Description |
---|---|
static String |
SYSTEM_SESSION_ID
The system session ID.
|
logger
DEFAULT_PHASE
Constructor and Description |
---|
StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel,
MessageChannel outboundChannel,
SubscribableChannel brokerChannel,
Collection<String> destinationPrefixes)
Create a StompBrokerRelayMessageHandler instance with the given message channels
and destination prefixes.
|
Modifier and Type | Method and Description |
---|---|
String |
getClientLogin()
Return the configured login to use for connections to the STOMP broker
on behalf of connected clients.
|
String |
getClientPasscode()
Return the configured passcode to use for connections to the STOMP broker on
behalf of connected clients.
|
int |
getConnectionCount()
Return the current count of TCP connection to the broker.
|
MessageHeaderInitializer |
getHeaderInitializer()
Return the configured header initializer.
|
String |
getRelayHost()
Return the STOMP message broker host.
|
int |
getRelayPort()
Return the STOMP message broker port.
|
StompBrokerRelayMessageHandler.Stats |
getStats()
Return a structured object with internal state and counters.
|
String |
getStatsInfo()
Return a String describing internal state and counters.
|
long |
getSystemHeartbeatReceiveInterval()
Return the interval, in milliseconds, at which the "system" connection expects
to receive heartbeats from the STOMP broker.
|
long |
getSystemHeartbeatSendInterval()
Return the interval, in milliseconds, at which the "system" connection will
send heartbeats to the STOMP broker.
|
String |
getSystemLogin()
Return the login used for the shared "system" connection to the STOMP broker.
|
String |
getSystemPasscode()
Return the passcode used for the shared "system" connection to the STOMP broker.
|
Map<String,MessageHandler> |
getSystemSubscriptions()
Return the configured map with subscriptions on the "system" connection.
|
TaskScheduler |
getTaskScheduler() |
TcpOperations<byte[]> |
getTcpClient()
Get the configured TCP client (never
null unless not configured
invoked and this method is invoked before the handler is started and
hence a default implementation initialized). |
String |
getVirtualHost()
Return the configured virtual host value.
|
protected void |
handleMessageInternal(Message<?> message) |
void |
setClientLogin(String clientLogin)
Set the login to use when creating connections to the STOMP broker on
behalf of connected clients.
|
void |
setClientPasscode(String clientPasscode)
Set the client passcode to use to create connections to the STOMP broker on
behalf of connected clients.
|
void |
setHeaderInitializer(MessageHeaderInitializer headerInitializer)
Configure a
MessageHeaderInitializer to apply to the headers of all
messages created through the StompBrokerRelayMessageHandler that
are sent to the client outbound message channel. |
void |
setRelayHost(String relayHost)
Set the STOMP message broker host.
|
void |
setRelayPort(int relayPort)
Set the STOMP message broker port.
|
void |
setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval)
Set the maximum interval, in milliseconds, at which the "system" connection
expects, in the absence of any other data, to receive a heartbeat from the STOMP
broker.
|
void |
setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval)
Set the interval, in milliseconds, at which the "system" connection will, in the
absence of any other data being sent, send a heartbeat to the STOMP broker.
|
void |
setSystemLogin(String systemLogin)
Set the login for the shared "system" connection used to send messages to
the STOMP broker from within the application, i.e.
|
void |
setSystemPasscode(String systemPasscode)
Set the passcode for the shared "system" connection used to send messages to
the STOMP broker from within the application, i.e.
|
void |
setSystemSubscriptions(Map<String,MessageHandler> subscriptions)
Configure one more destinations to subscribe to on the shared "system"
connection along with MessageHandler's to handle received messages.
|
void |
setTaskScheduler(TaskScheduler taskScheduler)
Configure the
TaskScheduler to use to reset client-to-broker
message count in the current heartbeat period. |
void |
setTcpClient(TcpOperations<byte[]> tcpClient)
Configure a TCP client for managing TCP connections to the STOMP broker.
|
void |
setVirtualHost(String virtualHost)
Set the value of the "host" header to use in STOMP CONNECT frames.
|
protected void |
startInternal() |
protected void |
stopInternal() |
String |
toString() |
checkDestinationPrefix, getApplicationEventPublisher, getBrokerChannel, getClientInboundChannel, getClientOutboundChannel, getClientOutboundChannelForSession, getDestinationPrefixes, handleMessage, isAutoStartup, isBrokerAvailable, isPreservePublishOrder, isRunning, publishBrokerAvailableEvent, publishBrokerUnavailableEvent, setApplicationEventPublisher, setAutoStartup, setPreservePublishOrder, start, stop, stop
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getPhase
public static final String SYSTEM_SESSION_ID
public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, SubscribableChannel brokerChannel, Collection<String> destinationPrefixes)
inboundChannel
- the channel for receiving messages from clients (e.g. WebSocket clients)outboundChannel
- the channel for sending messages to clients (e.g. WebSocket clients)brokerChannel
- the channel for the application to send messages to the brokerdestinationPrefixes
- the broker supported destination prefixes; destinations
that do not match the given prefix are ignored.public void setRelayHost(String relayHost)
public String getRelayHost()
public void setRelayPort(int relayPort)
public int getRelayPort()
public void setClientLogin(String clientLogin)
By default this is set to "guest".
setSystemLogin(String)
public String getClientLogin()
getSystemLogin()
public void setClientPasscode(String clientPasscode)
By default this is set to "guest".
setSystemPasscode(java.lang.String)
public String getClientPasscode()
getSystemPasscode()
public void setSystemLogin(String systemLogin)
By default this is set to "guest".
public String getSystemLogin()
public void setSystemPasscode(String systemPasscode)
By default this is set to "guest".
public String getSystemPasscode()
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval)
The default value is 10000.
See class-level documentation for more information on the "system" connection.
public long getSystemHeartbeatSendInterval()
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval)
The default value is 10000.
See class-level documentation for more information on the "system" connection.
public long getSystemHeartbeatReceiveInterval()
public void setSystemSubscriptions(@Nullable Map<String,MessageHandler> subscriptions)
This is for internal use in a multi-application server scenario where servers forward messages to each other (e.g. unresolved user destinations).
subscriptions
- the destinations to subscribe to.public Map<String,MessageHandler> getSystemSubscriptions()
public void setVirtualHost(@Nullable String virtualHost)
By default this property is not set.
public void setTcpClient(@Nullable TcpOperations<byte[]> tcpClient)
By default ReactorNettyTcpClient
is used.
Note: when this property is used, any
host
or port
specified are effectively ignored.
@Nullable public TcpOperations<byte[]> getTcpClient()
null
unless not configured
invoked and this method is invoked before the handler is started and
hence a default implementation initialized).public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer)
MessageHeaderInitializer
to apply to the headers of all
messages created through the StompBrokerRelayMessageHandler
that
are sent to the client outbound message channel.
By default this property is not set.
@Nullable public MessageHeaderInitializer getHeaderInitializer()
public String getStatsInfo()
toString()
on getStats()
.public StompBrokerRelayMessageHandler.Stats getStats()
public int getConnectionCount()
public void setTaskScheduler(@Nullable TaskScheduler taskScheduler)
TaskScheduler
to use to reset client-to-broker
message count in the current heartbeat period. For more details, see
StompBrokerRelayRegistration.setTaskScheduler(TaskScheduler)
.taskScheduler
- the scheduler to use@Nullable public TaskScheduler getTaskScheduler()
protected void startInternal()
startInternal
in class AbstractBrokerMessageHandler
protected void stopInternal()
stopInternal
in class AbstractBrokerMessageHandler
protected void handleMessageInternal(Message<?> message)
handleMessageInternal
in class AbstractBrokerMessageHandler