public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
MessageHandler
that handles messages by
forwarding them to a STOMP broker.
For each new CONNECT
message, an independent TCP
connection to the broker is opened and used exclusively for all messages from the
client that originated the CONNECT message. Messages from the same client are
identified through the session id message header. Reversely, when the STOMP broker
sends messages back on the TCP connection, those messages are enriched with the
session id of the client and sent back downstream through the MessageChannel
provided to the constructor.
This class also automatically opens a default "system" TCP connection to the message broker that is used for sending messages that originate from the server application (as opposed to from a client). Such messages are not associated with any client and therefore do not have a session id header. The "system" connection is effectively shared and cannot be used to receive messages. Several properties are provided to configure the "system" connection including:
Modifier and Type | Class and Description |
---|---|
private class |
StompBrokerRelayMessageHandler.Stats |
private class |
StompBrokerRelayMessageHandler.StompConnectionHandler |
private class |
StompBrokerRelayMessageHandler.SystemStompConnectionHandler |
private static class |
StompBrokerRelayMessageHandler.VoidCallable |
Modifier and Type | Field and Description |
---|---|
private java.lang.String |
clientLogin |
private java.lang.String |
clientPasscode |
private java.util.Map<java.lang.String,StompBrokerRelayMessageHandler.StompConnectionHandler> |
connectionHandlers |
private static byte[] |
EMPTY_PAYLOAD |
private static ListenableFutureTask<java.lang.Void> |
EMPTY_TASK |
private MessageHeaderInitializer |
headerInitializer |
private static Message<byte[]> |
HEARTBEAT_MESSAGE |
private static long |
HEARTBEAT_MULTIPLIER |
private static int |
MAX_TIME_TO_CONNECTED_FRAME
A heartbeat is setup once a CONNECTED frame is received which contains the heartbeat settings
we need.
|
private java.lang.String |
relayHost |
private int |
relayPort |
private StompBrokerRelayMessageHandler.Stats |
stats |
static java.lang.String |
SYSTEM_SESSION_ID |
private long |
systemHeartbeatReceiveInterval |
private long |
systemHeartbeatSendInterval |
private java.lang.String |
systemLogin |
private java.lang.String |
systemPasscode |
private java.util.Map<java.lang.String,MessageHandler> |
systemSubscriptions |
private TcpOperations<byte[]> |
tcpClient |
private java.lang.String |
virtualHost |
logger
Constructor and Description |
---|
StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel,
MessageChannel outboundChannel,
SubscribableChannel brokerChannel,
java.util.Collection<java.lang.String> destinationPrefixes)
Create a StompBrokerRelayMessageHandler instance with the given message channels
and destination prefixes.
|
Modifier and Type | Method and Description |
---|---|
java.lang.String |
getClientLogin()
Return the configured login to use for connections to the STOMP broker
on behalf of connected clients.
|
java.lang.String |
getClientPasscode()
Return the configured passcode to use for connections to the STOMP broker on
behalf of connected clients.
|
int |
getConnectionCount()
Return the current count of TCP connection to the broker.
|
MessageHeaderInitializer |
getHeaderInitializer()
Return the configured header initializer.
|
java.lang.String |
getRelayHost()
Return the STOMP message broker host.
|
int |
getRelayPort()
Return the STOMP message broker port.
|
java.lang.String |
getStatsInfo()
Return a String describing internal state and counters.
|
long |
getSystemHeartbeatReceiveInterval()
Return the interval, in milliseconds, at which the "system" connection expects
to receive heartbeats from the STOMP broker.
|
long |
getSystemHeartbeatSendInterval()
Return the interval, in milliseconds, at which the "system" connection will
send heartbeats to the STOMP broker.
|
java.lang.String |
getSystemLogin()
Return the login used for the shared "system" connection to the STOMP broker.
|
java.lang.String |
getSystemPasscode()
Return the passcode used for the shared "system" connection to the STOMP broker.
|
java.util.Map<java.lang.String,MessageHandler> |
getSystemSubscriptions()
Return the configured map with subscriptions on the "system" connection.
|
TcpOperations<byte[]> |
getTcpClient()
Get the configured TCP client (never
null unless not configured
invoked and this method is invoked before the handler is started and
hence a default implementation initialized). |
java.lang.String |
getVirtualHost()
Return the configured virtual host value.
|
protected void |
handleMessageInternal(Message<?> message) |
void |
setClientLogin(java.lang.String clientLogin)
Set the login to use when creating connections to the STOMP broker on
behalf of connected clients.
|
void |
setClientPasscode(java.lang.String clientPasscode)
Set the client passcode to use to create connections to the STOMP broker on
behalf of connected clients.
|
void |
setHeaderInitializer(MessageHeaderInitializer headerInitializer)
Configure a
MessageHeaderInitializer to apply to the headers of all
messages created through the StompBrokerRelayMessageHandler that
are sent to the client outbound message channel. |
void |
setRelayHost(java.lang.String relayHost)
Set the STOMP message broker host.
|
void |
setRelayPort(int relayPort)
Set the STOMP message broker port.
|
void |
setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval)
Set the maximum interval, in milliseconds, at which the "system" connection
expects, in the absence of any other data, to receive a heartbeat from the STOMP
broker.
|
void |
setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval)
Set the interval, in milliseconds, at which the "system" connection will, in the
absence of any other data being sent, send a heartbeat to the STOMP broker.
|
void |
setSystemLogin(java.lang.String systemLogin)
Set the login for the shared "system" connection used to send messages to
the STOMP broker from within the application, i.e.
|
void |
setSystemPasscode(java.lang.String systemPasscode)
Set the passcode for the shared "system" connection used to send messages to
the STOMP broker from within the application, i.e.
|
void |
setSystemSubscriptions(java.util.Map<java.lang.String,MessageHandler> subscriptions)
Configure one more destinations to subscribe to on the shared "system"
connection along with MessageHandler's to handle received messages.
|
void |
setTcpClient(TcpOperations<byte[]> tcpClient)
Configure a TCP client for managing TCP connections to the STOMP broker.
|
void |
setVirtualHost(java.lang.String virtualHost)
Set the value of the "host" header to use in STOMP CONNECT frames.
|
protected void |
startInternal() |
protected void |
stopInternal() |
java.lang.String |
toString() |
checkDestinationPrefix, getApplicationEventPublisher, getBrokerChannel, getClientInboundChannel, getClientOutboundChannel, getDestinationPrefixes, getPhase, handleMessage, isAutoStartup, isBrokerAvailable, isRunning, publishBrokerAvailableEvent, publishBrokerUnavailableEvent, setApplicationEventPublisher, setAutoStartup, start, stop, stop
public static final java.lang.String SYSTEM_SESSION_ID
private static final long HEARTBEAT_MULTIPLIER
private static final int MAX_TIME_TO_CONNECTED_FRAME
private static final byte[] EMPTY_PAYLOAD
private static final ListenableFutureTask<java.lang.Void> EMPTY_TASK
private static final Message<byte[]> HEARTBEAT_MESSAGE
private java.lang.String relayHost
private int relayPort
private java.lang.String clientLogin
private java.lang.String clientPasscode
private java.lang.String systemLogin
private java.lang.String systemPasscode
private long systemHeartbeatSendInterval
private long systemHeartbeatReceiveInterval
private final java.util.Map<java.lang.String,MessageHandler> systemSubscriptions
@Nullable private java.lang.String virtualHost
@Nullable private TcpOperations<byte[]> tcpClient
@Nullable private MessageHeaderInitializer headerInitializer
private final StompBrokerRelayMessageHandler.Stats stats
private final java.util.Map<java.lang.String,StompBrokerRelayMessageHandler.StompConnectionHandler> connectionHandlers
public StompBrokerRelayMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, SubscribableChannel brokerChannel, java.util.Collection<java.lang.String> destinationPrefixes)
inboundChannel
- the channel for receiving messages from clients (e.g. WebSocket clients)outboundChannel
- the channel for sending messages to clients (e.g. WebSocket clients)brokerChannel
- the channel for the application to send messages to the brokerdestinationPrefixes
- the broker supported destination prefixes; destinations
that do not match the given prefix are ignored.public void setRelayHost(java.lang.String relayHost)
public java.lang.String getRelayHost()
public void setRelayPort(int relayPort)
public int getRelayPort()
public void setClientLogin(java.lang.String clientLogin)
By default this is set to "guest".
setSystemLogin(String)
public java.lang.String getClientLogin()
getSystemLogin()
public void setClientPasscode(java.lang.String clientPasscode)
By default this is set to "guest".
setSystemPasscode(java.lang.String)
public java.lang.String getClientPasscode()
getSystemPasscode()
public void setSystemLogin(java.lang.String systemLogin)
By default this is set to "guest".
public java.lang.String getSystemLogin()
public void setSystemPasscode(java.lang.String systemPasscode)
By default this is set to "guest".
public java.lang.String getSystemPasscode()
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval)
The default value is 10000.
See class-level documentation for more information on the "system" connection.
public long getSystemHeartbeatSendInterval()
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval)
The default value is 10000.
See class-level documentation for more information on the "system" connection.
public long getSystemHeartbeatReceiveInterval()
public void setSystemSubscriptions(@Nullable java.util.Map<java.lang.String,MessageHandler> subscriptions)
This is for internal use in a multi-application server scenario where servers forward messages to each other (e.g. unresolved user destinations).
subscriptions
- the destinations to subscribe to.public java.util.Map<java.lang.String,MessageHandler> getSystemSubscriptions()
public void setVirtualHost(@Nullable java.lang.String virtualHost)
By default this property is not set.
@Nullable public java.lang.String getVirtualHost()
public void setTcpClient(@Nullable TcpOperations<byte[]> tcpClient)
By default ReactorNettyTcpClient
is used.
@Nullable public TcpOperations<byte[]> getTcpClient()
null
unless not configured
invoked and this method is invoked before the handler is started and
hence a default implementation initialized).public void setHeaderInitializer(@Nullable MessageHeaderInitializer headerInitializer)
MessageHeaderInitializer
to apply to the headers of all
messages created through the StompBrokerRelayMessageHandler
that
are sent to the client outbound message channel.
By default this property is not set.
@Nullable public MessageHeaderInitializer getHeaderInitializer()
public java.lang.String getStatsInfo()
public int getConnectionCount()
protected void startInternal()
startInternal
in class AbstractBrokerMessageHandler
protected void stopInternal()
stopInternal
in class AbstractBrokerMessageHandler
protected void handleMessageInternal(Message<?> message)
handleMessageInternal
in class AbstractBrokerMessageHandler
public java.lang.String toString()
toString
in class java.lang.Object