Class BaseIntegrationFlowDefinition<B extends BaseIntegrationFlowDefinition<B>>
java.lang.Object
org.springframework.integration.dsl.BaseIntegrationFlowDefinition<B>
- Type Parameters:
B
- theBaseIntegrationFlowDefinition
implementation type.
- Direct Known Subclasses:
IntegrationFlowDefinition
@IntegrationDsl
public abstract class BaseIntegrationFlowDefinition<B extends BaseIntegrationFlowDefinition<B>>
extends Object
The
Builder
pattern implementation for the EIP-method chain.
Provides a variety of methods to populate Spring Integration components
to an IntegrationFlow
for the future registration in the
application context.- Since:
- 5.2.1
- Author:
- Artem Bilan, Gary Russell, Gabriele Del Prete, Tim Feuerbach, Ngoc Nhan
- See Also:
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic final class
-
Field Summary
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionprotected final B
_this()
protected B
addComponent
(Object component) protected B
addComponent
(Object component, String beanName) protected B
addComponents
(Map<Object, String> components) Populate theAggregatingMessageHandler
with default options.A short-cut for theaggregate((aggregator) -> aggregator.processor(aggregatorProcessor))
.aggregate
(Consumer<AggregatorSpec> aggregator) Populate theAggregatingMessageHandler
with provided options fromAggregatorSpec
.barrier
(long timeout) Populate aBarrierMessageHandler
instance for provided timeout.barrier
(long timeout, Consumer<BarrierSpec> barrierConfigurer) Populate aBarrierMessageHandler
instance for provided timeout and options fromBarrierSpec
and endpoint options fromGenericEndpointSpec
.bridge()
Populate aBridgeHandler
to the current integration flow position.bridge
(Consumer<GenericEndpointSpec<BridgeHandler>> endpointConfigurer) Populate aBridgeHandler
to the current integration flow position.Populate aMessageChannelReference
instance at the currentIntegrationFlow
chain position.channel
(Function<Channels, MessageChannelSpec<?, ?>> channels) Populate aMessageChannel
instance at the currentIntegrationFlow
chain position using theChannels
factory fluent API.channel
(MessageChannelSpec<?, ?> messageChannelSpec) Populate aMessageChannel
instance at the currentIntegrationFlow
chain position using theMessageChannelSpec
fluent API.channel
(MessageChannel messageChannel) Populate the providedMessageChannel
instance at the currentIntegrationFlow
chain position.protected void
checkReuse
(MessageProducer replyHandler) claimCheckIn
(MessageStore messageStore) claimCheckIn
(MessageStore messageStore, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) claimCheckOut
(MessageStore messageStore) Populate theMessageTransformingHandler
for theClaimCheckOutTransformer
with providedMessageStore
.claimCheckOut
(MessageStore messageStore, boolean removeMessage) Populate theMessageTransformingHandler
for theClaimCheckOutTransformer
with providedMessageStore
andremoveMessage
flag.claimCheckOut
(MessageStore messageStore, boolean removeMessage, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate theMessageTransformingHandler
for theClaimCheckOutTransformer
with providedMessageStore
andremoveMessage
flag.Deprecated.controlBus
(Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Deprecated.in favor ofcontrolBusOnRegistry(Consumer)
- will be restored in next version.Populate theControl Bus
EI Pattern specificMessageHandler
implementation at the currentIntegrationFlow
chain position.controlBusOnRegistry
(Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate theControl Bus
EI Pattern specificMessageHandler
implementation at the currentIntegrationFlow
chain position.<P> B
Populate theMessageTransformingHandler
instance for the providedpayloadType
to convert at runtime.<P> B
convert
(Class<P> payloadType, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate theMessageTransformingHandler
instance for the providedpayloadType
to convert at runtime.protected B
currentComponent
(Object component) protected InterceptableChannel
Return the current channel if it is anInterceptableChannel
, otherwise register a new implicitDirectChannel
in the flow and return that one.protected B
currentMessageChannel
(MessageChannel currentMessageChannel) Populate aDelayHandler
to the current integration flow position with default options.delay
(Consumer<DelayerEndpointSpec> endpointConfigurer) Populate aDelayHandler
to the current integration flow position.enrich
(Consumer<EnricherSpec> enricherConfigurer) Populate aContentEnricher
to the current integration flow position with provided options.enrichHeaders
(Consumer<HeaderEnricherSpec> headerEnricherConfigurer) enrichHeaders
(Map<String, Object> headers) enrichHeaders
(Map<String, Object> headers, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) enrichHeaders
(MapBuilder<?, String, Object> headers) Populate aMessageTransformingHandler
for aHeaderEnricher
using header values from providedMapBuilder
.enrichHeaders
(MapBuilder<?, String, Object> headers, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate aMessageTransformingHandler
for aHeaderEnricher
using header values from providedMapBuilder
.protected static Object
extractProxyTarget
(Object target) <P> B
filter
(Class<P> expectedType, GenericSelector<P> genericSelector) <P> B
filter
(Class<P> expectedType, GenericSelector<P> genericSelector, Consumer<FilterEndpointSpec> endpointConfigurer) Populate aMessageFilter
withMethodInvokingSelector
for the discovered method of the provided service.Populate aMessageFilter
withMethodInvokingSelector
for the method of the provided service.filter
(Object service, String methodName, Consumer<FilterEndpointSpec> endpointConfigurer) Populate aMessageFilter
withMethodInvokingSelector
for the method of the provided service.Populate aMessageFilter
withMessageSelector
for the provided SpEL expression.filter
(String expression, Consumer<FilterEndpointSpec> endpointConfigurer) Populate aMessageFilter
withMessageSelector
for the provided SpEL expression.filter
(MessageProcessorSpec<?> messageProcessorSpec) Populate aMessageFilter
withMethodInvokingSelector
for theMessageProcessor
from the providedMessageProcessorSpec
.filter
(MessageProcessorSpec<?> messageProcessorSpec, Consumer<FilterEndpointSpec> endpointConfigurer) Populate aMessageFilter
withMethodInvokingSelector
for theMessageProcessor
from the providedMessageProcessorSpec
.Populate anFixedSubscriberChannel
instance at the currentIntegrationFlow
chain position.fixedSubscriberChannel
(String messageChannelName) Populate anFixedSubscriberChannel
instance at the currentIntegrationFlow
chain position.<I,
O> B fluxTransform
(Function<? super reactor.core.publisher.Flux<Message<I>>, ? extends org.reactivestreams.Publisher<O>> fluxFunction) Populate aFluxMessageChannel
to start a reactive processing for upstream data, wrap it to aFlux
, apply providedFunction
viaFlux.transform(Function)
and emit the result to one moreFluxMessageChannel
, subscribed in the downstream flow.Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with default options.gateway
(String requestChannel, Consumer<GatewayEndpointSpec> endpointConfigurer) Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with options fromGatewayEndpointSpec
.gateway
(IntegrationFlow flow) Populate the "artificial"GatewayMessageHandler
for the providedsubflow
.gateway
(IntegrationFlow flow, Consumer<GatewayEndpointSpec> endpointConfigurer) Populate the "artificial"GatewayMessageHandler
for the providedsubflow
with options fromGatewayEndpointSpec
.gateway
(MessageChannel requestChannel) Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with default options.gateway
(MessageChannel requestChannel, Consumer<GatewayEndpointSpec> endpointConfigurer) Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with options fromGatewayEndpointSpec
.protected StandardIntegrationFlow
get()
protected Object
protected MessageChannel
<H extends MessageHandler>
Bhandle
(H messageHandler, Consumer<GenericEndpointSpec<H>> endpointConfigurer) Populate aServiceActivatingHandler
for the providedMessageHandler
implementation.<P> B
handle
(Class<P> expectedType, GenericHandler<P> handler) Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke the providedGenericHandler
at runtime.<P> B
handle
(Class<P> expectedType, GenericHandler<P> handler, Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke the providedGenericHandler
at runtime.Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke the discoveredmethod
for providedservice
at runtime.Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime.handle
(Object service, String methodName, Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime.Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime.handle
(String beanName, String methodName, Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime.<H extends MessageHandler>
Bhandle
(MessageHandlerSpec<?, H> messageHandlerSpec) Populate aServiceActivatingHandler
for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.<H extends MessageHandler>
Bhandle
(MessageHandlerSpec<?, H> messageHandlerSpec, Consumer<GenericEndpointSpec<H>> endpointConfigurer) Populate aServiceActivatingHandler
for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.handle
(MessageProcessorSpec<?> messageProcessorSpec) Populate aServiceActivatingHandler
for theMessageProcessor
from the providedMessageProcessorSpec
.handle
(MessageProcessorSpec<?> messageProcessorSpec, Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMessageProcessor
from the providedMessageProcessorSpec
.handle
(MessageHandler messageHandler) Populate aServiceActivatingHandler
for the providedMessageHandler
implementation.<H extends ReactiveMessageHandler>
IntegrationFlowhandleReactive
(ReactiveMessageHandlerSpec<?, H> messageHandlerSpec) Populate a terminal consumer endpoint for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.<H extends ReactiveMessageHandler>
IntegrationFlowhandleReactive
(ReactiveMessageHandlerSpec<?, H> messageHandlerSpec, Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) Populate a terminal consumer endpoint for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.handleReactive
(ReactiveMessageHandler reactiveMessageHandler) Add aReactiveMessageHandler
as a terminalIntegrationFlow
operator.handleReactive
(ReactiveMessageHandler reactiveMessageHandler, Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) Add aReactiveMessageHandler
as a terminalIntegrationFlow
operator.headerFilter
(String... headersToRemove) Provide theHeaderFilter
to the currentStandardIntegrationFlow
.headerFilter
(Consumer<HeaderFilterSpec> headerFilter) Provide theHeaderFilter
options via fluent API of theHeaderFilterSpec
.headerFilter
(HeaderFilter headerFilter, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate the providedMessageTransformingHandler
for the providedHeaderFilter
.intercept
(ChannelInterceptor... interceptorArray) Add one or moreChannelInterceptor
implementations to the currentcurrentMessageChannel
, in the given order, after any interceptors already registered.protected boolean
protected boolean
log()
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
logging level andorg.springframework.integration.handler.LoggingHandler
as a default logging category.Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the provided logging category andINFO
logging level.<P> B
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, the provided logging category andFunction
for the log message.log
(String category, Expression logExpression) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
LoggingHandler.Level
logging level, the provided logging category and SpEL expression to evaluate logger message at runtime against the requestMessage
.<P> B
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category andFunction
for the log message.log
(Expression logExpression) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category and SpEL expression to evaluate logger message at runtime against the requestMessage
.log
(LoggingHandler.Level level) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for providedLoggingHandler.Level
logging level andorg.springframework.integration.handler.LoggingHandler
as a default logging category.log
(LoggingHandler.Level level, String category) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level and logging category.log
(LoggingHandler.Level level, String category, String logExpression) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, logging category and SpEL expression for the log message.<P> B
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, logging category andFunction
for the log message.log
(LoggingHandler.Level level, String category, Expression logExpression) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, logging category and SpEL expression for the log message.<P> B
log
(LoggingHandler.Level level, Function<Message<P>, Object> function) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category andFunction
for the log message.log
(LoggingHandler.Level level, Expression logExpression) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category and SpEL expression to evaluate logger message at runtime against the requestMessage
.Add a "nullChannel" bean into this flow definition as a terminal operator.protected MessageChannel
publishSubscribeChannel
(Executor executor, Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) ThePublishSubscribeChannel
channel(java.lang.String)
method specific implementation to allow the use of the 'subflow' subscriber capability.publishSubscribeChannel
(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) ThePublishSubscribeChannel
channel(java.lang.String)
method specific implementation to allow the use of the 'subflow' subscriber capability.publishSubscribeChannel
(BroadcastCapableChannel broadcastCapableChannel, Consumer<BroadcastPublishSubscribeSpec> publishSubscribeChannelConfigurer) TheBroadcastCapableChannel
channel(java.lang.String)
method specific implementation to allow the use of the 'subflow' subscriber capability.protected <S extends ConsumerEndpointSpec<? super S,
? extends MessageHandler>>
Bprotected B
registerOutputChannelIfCan
(MessageChannel outputChannel) Populate theResequencingMessageHandler
with default options.resequence
(Consumer<ResequencerSpec> resequencer) Populate theResequencingMessageHandler
with provided options fromResequencerSpec
.<P,
T> B route
(Class<P> expectedType, Function<P, T> router, Consumer<RouterSpec<T, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for providedFunction
and payload type and options fromRouterSpec
.<S,
T> B Populate theMethodInvokingRouter
for providedFunction
and payload type with default options.Populate theMethodInvokingRouter
for the discovered method of the provided service and its method with default options.Populate theMethodInvokingRouter
for the method of the provided service and its method with default options.route
(Object service, String methodName, Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for the method of the provided service and its method with provided options fromRouterSpec
.Populate theExpressionEvaluatingRouter
for provided SpEL expression with default options.Populate theMethodInvokingRouter
for provided bean and its method with default options.route
(String beanName, String method, Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for provided bean and its method with provided options fromRouterSpec
.<T> B
route
(String expression, Consumer<RouterSpec<T, ExpressionEvaluatingRouter>> routerConfigurer) Populate theExpressionEvaluatingRouter
for provided SpEL expression with provided options fromRouterSpec
.route
(MessageProcessorSpec<?> messageProcessorSpec) Populate theMethodInvokingRouter
for theMessageProcessor
from the providedMessageProcessorSpec
with default options.route
(MessageProcessorSpec<?> messageProcessorSpec, Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for theMessageProcessor
from the providedMessageProcessorSpec
with default options.route
(AbstractMessageRouter router) Populate the providedAbstractMessageRouter
implementation to the current integration flow position.<R extends AbstractMessageRouter>
Broute
(R router, Consumer<GenericEndpointSpec<R>> endpointConfigurer) Populate the providedAbstractMessageRouter
implementation to the current integration flow position.protected <R extends AbstractMessageRouter,
S extends AbstractRouterSpec<? super S, R>>
BrouteByException
(Consumer<RouterSpec<Class<? extends Throwable>, ErrorMessageExceptionTypeRouter>> routerConfigurer) Populate theErrorMessageExceptionTypeRouter
with options from theRouterSpec
.routeToRecipients
(Consumer<RecipientListRouterSpec> routerConfigurer) Populate theRecipientListRouter
with options from theRecipientListRouterSpec
.scatterGather
(Consumer<RecipientListRouterSpec> scatterer) Populate aScatterGatherHandler
to the current integration flow position based on the providedRecipientListRouterSpec
for scattering function and defaultAggregatorSpec
for gathering function.scatterGather
(Consumer<RecipientListRouterSpec> scatterer, Consumer<AggregatorSpec> gatherer) Populate aScatterGatherHandler
to the current integration flow position based on the providedRecipientListRouterSpec
for scattering function andAggregatorSpec
for gathering function.scatterGather
(Consumer<RecipientListRouterSpec> scatterer, Consumer<AggregatorSpec> gatherer, Consumer<ScatterGatherSpec> scatterGather) Populate aScatterGatherHandler
to the current integration flow position based on the providedRecipientListRouterSpec
for scattering function andAggregatorSpec
for gathering function.scatterGather
(MessageChannel scatterChannel) Populate aScatterGatherHandler
to the current integration flow position based on the providedMessageChannel
for scattering function and defaultAggregatorSpec
for gathering function.scatterGather
(MessageChannel scatterChannel, Consumer<AggregatorSpec> gatherer) Populate aScatterGatherHandler
to the current integration flow position based on the providedMessageChannel
for scattering function andAggregatorSpec
for gathering function.scatterGather
(MessageChannel scatterChannel, Consumer<AggregatorSpec> gatherer, Consumer<ScatterGatherSpec> scatterGather) Populate aScatterGatherHandler
to the current integration flow position based on the providedMessageChannel
for scattering function andAggregatorSpec
for gathering function.protected void
setImplicitChannel
(boolean implicitChannel) split()
Populate theDefaultMessageSplitter
with default options to the current integration flow position.<P> B
Populate theMethodInvokingSplitter
to evaluate the providedFunction
at runtime.Populate theExpressionEvaluatingSplitter
with provided SpEL expression.<S extends AbstractMessageSplitter>
Bsplit
(MessageHandlerSpec<?, S> splitterMessageHandlerSpec) Populate the providedAbstractMessageSplitter
to the current integration flow position.split
(MessageProcessorSpec<?> messageProcessorSpec) Populate theMethodInvokingSplitter
to evaluate theMessageProcessor
at runtime from providedMessageProcessorSpec
.split
(AbstractMessageSplitter splitter) Populate the providedAbstractMessageSplitter
to the current integration flow position.splitWith
(Consumer<SplitterSpec> splitterConfigurer) Populate the splitter with provided options to the current integration flow position:to
(IntegrationFlow other) Finish this flow with delegation to otherIntegrationFlow
instance.protected <T> org.reactivestreams.Publisher<Message<T>>
Represent an Integration Flow as a Reactive StreamsPublisher
bean.protected <T> org.reactivestreams.Publisher<Message<T>>
toReactivePublisher
(boolean autoStartOnSubscribe) Represent an Integration Flow as a Reactive StreamsPublisher
bean.<P,
T> B transform
(Class<P> expectedType, GenericTransformer<P, T> genericTransformer) Populate theMessageTransformingHandler
instance for the providedGenericTransformer
for the specificexpectedType
to convert at runtime.Populate theMessageTransformingHandler
for theMethodInvokingTransformer
to invoke the discovered service method at runtime.Populate theMessageTransformingHandler
for theMethodInvokingTransformer
to invoke the service method at runtime.Populate theMessageTransformingHandler
for theMethodInvokingTransformer
to invoke the bean method at runtime.transform
(MessageProcessorSpec<?> messageProcessorSpec) Populate theMessageTransformingHandler
instance for theMessageProcessor
from providedMessageProcessorSpec
.transformWith
(Consumer<TransformerEndpointSpec> transformerConfigurer) Populate aMessageTransformingHandler
into the endpoint with providedTransformerEndpointSpec
options.Populate aServiceActivatingHandler
instance to performMessageTriggerAction
.trigger
(String triggerActionId, Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
instance to performMessageTriggerAction
and endpoint options fromGenericEndpointSpec
.trigger
(MessageTriggerAction triggerAction) Populate aServiceActivatingHandler
instance to performMessageTriggerAction
.trigger
(MessageTriggerAction triggerAction, Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
instance to performMessageTriggerAction
and endpoint options fromGenericEndpointSpec
.Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.wireTap
(String wireTapChannel, Consumer<WireTapSpec> wireTapConfigurer) Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.wireTap
(IntegrationFlow flow) Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.wireTap
(IntegrationFlow flow, Consumer<WireTapSpec> wireTapConfigurer) Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.wireTap
(WireTapSpec wireTapSpec) Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.wireTap
(MessageChannel wireTapChannel) Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.wireTap
(MessageChannel wireTapChannel, Consumer<WireTapSpec> wireTapConfigurer) Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.
-
Field Details
-
PARSER
-
integrationComponents
-
-
Constructor Details
-
BaseIntegrationFlowDefinition
protected BaseIntegrationFlowDefinition()
-
-
Method Details
-
addComponent
-
addComponent
-
addComponents
-
getIntegrationComponents
-
currentComponent
-
getCurrentComponent
-
currentMessageChannel
-
getCurrentMessageChannel
-
currentInterceptableChannel
Return the current channel if it is anInterceptableChannel
, otherwise register a new implicitDirectChannel
in the flow and return that one.- Returns:
- the current channel after the operation
-
setImplicitChannel
protected void setImplicitChannel(boolean implicitChannel) -
isImplicitChannel
protected boolean isImplicitChannel() -
fixedSubscriberChannel
Populate anFixedSubscriberChannel
instance at the currentIntegrationFlow
chain position. The 'bean name' will be generated during the bean registration phase.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
fixedSubscriberChannel
Populate anFixedSubscriberChannel
instance at the currentIntegrationFlow
chain position. The providedmessageChannelName
is used for the bean registration.- Parameters:
messageChannelName
- the bean name to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
channel
Populate aMessageChannelReference
instance at the currentIntegrationFlow
chain position. The providedmessageChannelName
is used for the bean registration (DirectChannel
), if there is no such a bean in the application context. Otherwise, the existingMessageChannel
bean is used to wire integration endpoints.- Parameters:
messageChannelName
- the bean name to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
channel
Populate aMessageChannel
instance at the currentIntegrationFlow
chain position using theMessageChannelSpec
fluent API.- Parameters:
messageChannelSpec
- theMessageChannelSpec
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
channel
Populate the providedMessageChannel
instance at the currentIntegrationFlow
chain position. ThemessageChannel
can be an existing bean, or fresh instance, in which case theIntegrationFlowBeanPostProcessor
will populate it as a bean with a generated name.- Parameters:
messageChannel
- theMessageChannel
to populate.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
channel
Populate aMessageChannel
instance at the currentIntegrationFlow
chain position using theChannels
factory fluent API.- Parameters:
channels
- theFunction
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
publishSubscribeChannel
ThePublishSubscribeChannel
channel(java.lang.String)
method specific implementation to allow the use of the 'subflow' subscriber capability.- Parameters:
publishSubscribeChannelConfigurer
- theConsumer
to specifyPublishSubscribeSpec
options including 'subflow' definition.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
publishSubscribeChannel
public B publishSubscribeChannel(@Nullable Executor executor, Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) ThePublishSubscribeChannel
channel(java.lang.String)
method specific implementation to allow the use of the 'subflow' subscriber capability. Use the providedExecutor
for the target subscribers.- Parameters:
executor
- theExecutor
to use.publishSubscribeChannelConfigurer
- theConsumer
to specifyPublishSubscribeSpec
options including 'subflow' definition.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
publishSubscribeChannel
public B publishSubscribeChannel(BroadcastCapableChannel broadcastCapableChannel, Consumer<BroadcastPublishSubscribeSpec> publishSubscribeChannelConfigurer) TheBroadcastCapableChannel
channel(java.lang.String)
method specific implementation to allow the use of the 'subflow' subscriber capability.- Parameters:
broadcastCapableChannel
- theBroadcastCapableChannel
to subscriber sub-flows to.publishSubscribeChannelConfigurer
- theConsumer
to specifyBroadcastPublishSubscribeSpec
'subflow' definitions.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 5.3
-
wireTap
Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
. It is useful when an implicitMessageChannel
is used between endpoints:.filter("World"::equals) .wireTap(sf -> sf.<String, String>transform(String::toUpperCase)) .handle(p -> process(p))
channel(java.lang.String)
for explicitMessageChannel
, but with the caution do not impact existingChannelInterceptor
s.- Parameters:
flow
- theIntegrationFlow
for wire-tap subflow as an alternative to thewireTapChannel
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
wireTap
Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
. It is useful when an implicitMessageChannel
is used between endpoints:f -> f.wireTap("tapChannel") .handle(p -> process(p))
channel(java.lang.String)
for explicitMessageChannel
, but with the caution do not impact existingChannelInterceptor
s.- Parameters:
wireTapChannel
- theMessageChannel
bean name to wire-tap.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
wireTap
Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
. It is useful when an implicitMessageChannel
is used between endpoints:.transform("payload") .wireTap(tapChannel()) .channel("foo")
channel(java.lang.String)
for explicitMessageChannel
, but with the caution do not impact existingChannelInterceptor
s.- Parameters:
wireTapChannel
- theMessageChannel
to wire-tap.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
wireTap
Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
. It is useful when an implicitMessageChannel
is used between endpoints:.transform("payload") .wireTap(sf -> sf.<String, String>transform(String::toUpperCase), wt -> wt.selector("payload == 'foo'")) .channel("foo")
channel(java.lang.String)
for explicitMessageChannel
, but with the caution do not impact existingChannelInterceptor
s.- Parameters:
flow
- theIntegrationFlow
for wire-tap subflow as an alternative to thewireTapChannel
.wireTapConfigurer
- theConsumer
to accept options for theWireTap
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
obtainInputChannelFromFlow
-
wireTap
Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
. It is useful when an implicitMessageChannel
is used between endpoints:.transform("payload") .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo"))) .channel("foo")
channel(java.lang.String)
for explicitMessageChannel
, but with the caution do not impact existingChannelInterceptor
s.- Parameters:
wireTapChannel
- theMessageChannel
bean name to wire-tap.wireTapConfigurer
- theConsumer
to accept options for theWireTap
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
wireTap
Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
. It is useful when an implicitMessageChannel
is used between endpoints:.transform("payload") .wireTap(tapChannel(), wt -> wt.selector(m -> m.getPayload().equals("foo"))) .channel("foo")
channel(java.lang.String)
for explicitMessageChannel
, but with the caution do not impact existingChannelInterceptor
s.- Parameters:
wireTapChannel
- theMessageChannel
to wire-tap.wireTapConfigurer
- theConsumer
to accept options for theWireTap
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
wireTap
Populate theWire Tap
EI Pattern specificChannelInterceptor
implementation to the currentcurrentMessageChannel
.It is useful when an implicit
MessageChannel
is used between endpoints:.transform("payload") .wireTap(new WireTap(tapChannel()).selector(m -> m.getPayload().equals("foo"))) .channel("foo")
channel(java.lang.String)
for explicitMessageChannel
, but with the caution do not impact existingChannelInterceptor
s.- Parameters:
wireTapSpec
- theWireTapSpec
to use.When this EIP-method is used in the end of flow, it appends a
nullChannel
to terminate flow properly, Otherwise aDispatcher has no subscribers
exception is thrown for implicitDirectChannel
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
controlBusOnRegistry
Populate theControl Bus
EI Pattern specificMessageHandler
implementation at the currentIntegrationFlow
chain position.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.4
- See Also:
-
controlBusOnRegistry
public B controlBusOnRegistry(@Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate theControl Bus
EI Pattern specificMessageHandler
implementation at the currentIntegrationFlow
chain position.- Parameters:
endpointConfigurer
- theConsumer
to accept integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.4
- See Also:
-
controlBus
Deprecated.in favor ofcontrolBusOnRegistry()
- will be restored in next version.Populate theControl Bus
EI Pattern specificMessageHandler
implementation at the currentIntegrationFlow
chain position.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
controlBus
@Deprecated(since="6.4") public B controlBus(@Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Deprecated.in favor ofcontrolBusOnRegistry(Consumer)
- will be restored in next version.Populate theControl Bus
EI Pattern specificMessageHandler
implementation at the currentIntegrationFlow
chain position.- Parameters:
endpointConfigurer
- theConsumer
to accept integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
transform
Populate theTransformer
EI Pattern specificMessageHandler
implementation for the SpELExpression
. Shortcut for:.transformWith((transformerSpec) -> transformerSpec.expression(expression))
- Parameters:
expression
- theTransformer
Expression
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
transform
Populate theMessageTransformingHandler
for theMethodInvokingTransformer
to invoke the discovered service method at runtime. Shortcut for:.transformWith((transformerSpec) -> transformerSpec.ref(service))
- Parameters:
service
- the service to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
transform
Populate theMessageTransformingHandler
for theMethodInvokingTransformer
to invoke the service method at runtime..transformWith((transformerSpec) -> transformerSpec.ref(service).method(methodName))
- Parameters:
service
- the service to use.methodName
- the method to invoke.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
transform
Populate theMessageTransformingHandler
for theMethodInvokingTransformer
to invoke the bean method at runtime..transformWith((transformerSpec) -> transformerSpec.refName(beanName).method(methodName))
- Parameters:
beanName
- the name for bean to resolve lazily.methodName
- the method to invoke.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.2
- See Also:
-
transform
Populate theMessageTransformingHandler
instance for theMessageProcessor
from providedMessageProcessorSpec
..transform(Scripts.script("classpath:myScript.py").variable("foo", bar()))
.transformWith((transformerSpec) -> transformerSpec.processor(messageProcessorSpec))
- Parameters:
messageProcessorSpec
- theMessageProcessorSpec
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
convert
Populate theMessageTransformingHandler
instance for the providedpayloadType
to convert at runtime.- Type Parameters:
P
- the payload type - 'convert to'.- Parameters:
payloadType
- theClass
for expected payload type.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 5.1
- See Also:
-
transform
public <P,T> B transform(@Nullable Class<P> expectedType, GenericTransformer<P, T> genericTransformer) Populate theMessageTransformingHandler
instance for the providedGenericTransformer
for the specificexpectedType
to convert at runtime.- Type Parameters:
P
- the payload type - 'transform from' orMessage.class
.T
- the target type - 'transform to'.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the transformer. Conversion to this type will be attempted, if necessary.genericTransformer
- theGenericTransformer
to populate.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
convert
public <P> B convert(Class<P> payloadType, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate theMessageTransformingHandler
instance for the providedpayloadType
to convert at runtime. In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Type Parameters:
P
- the payload type - 'transform to'.- Parameters:
payloadType
- theClass
for expected payload type.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 5.1
- See Also:
-
transformWith
Populate aMessageTransformingHandler
into the endpoint with providedTransformerEndpointSpec
options. One of the 'expression', 'ref', 'refName', 'processor' or 'function' must be provided.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.2
-
filter
Populate aMessageFilter
withMessageSelector
for the provided SpEL expression.- Parameters:
expression
- the SpEL expression.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
filter
Populate aMessageFilter
withMessageSelector
for the provided SpEL expression. In addition, accept options for the integration endpoint usingFilterEndpointSpec
:.filter("payload.hot"), e -> e.autoStartup(false))
- Parameters:
expression
- the SpEL expression.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
filter
Populate aMessageFilter
withMethodInvokingSelector
for the discovered method of the provided service.- Parameters:
service
- the service to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
filter
Populate aMessageFilter
withMethodInvokingSelector
for the method of the provided service.- Parameters:
service
- the service to use.methodName
- the method to invoke- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
filter
public B filter(Object service, @Nullable String methodName, @Nullable Consumer<FilterEndpointSpec> endpointConfigurer) Populate aMessageFilter
withMethodInvokingSelector
for the method of the provided service.- Parameters:
service
- the service to use.methodName
- the method to invokeendpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
filter
Populate aMessageFilter
withMethodInvokingSelector
for theMessageProcessor
from the providedMessageProcessorSpec
..filter(Scripts.script(scriptResource).lang("ruby"))
- Parameters:
messageProcessorSpec
- theMessageProcessorSpec
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
filter
public B filter(MessageProcessorSpec<?> messageProcessorSpec, @Nullable Consumer<FilterEndpointSpec> endpointConfigurer) Populate aMessageFilter
withMethodInvokingSelector
for theMessageProcessor
from the providedMessageProcessorSpec
. In addition, accept options for the integration endpoint usingFilterEndpointSpec
..filter(Scripts.script(scriptResource).lang("ruby"), e -> e.autoStartup(false))
- Parameters:
messageProcessorSpec
- theMessageProcessorSpec
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
filter
Populate aMessageFilter
withMethodInvokingSelector
for the providedGenericSelector
. Typically, used with a Java 8 Lambda expression:.filter(Date.class, p -> p.after(new Date()))
- Type Parameters:
P
- the source payload type orMessage.class
.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the selector. Conversion to this type will be attempted, if necessary.genericSelector
- theGenericSelector
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
filter
public <P> B filter(@Nullable Class<P> expectedType, GenericSelector<P> genericSelector, @Nullable Consumer<FilterEndpointSpec> endpointConfigurer) Populate aMessageFilter
withMethodInvokingSelector
for the providedGenericSelector
. In addition, accept options for the integration endpoint usingFilterEndpointSpec
. Typically, used with a Java 8 Lambda expression:.filter(Date.class, p -> p.after(new Date()), e -> e.autoStartup(false))
- Type Parameters:
P
- the source payload type orMessage.class
.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the selector. Conversion to this type will be attempted, if necessary.genericSelector
- theGenericSelector
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
handle
Populate aServiceActivatingHandler
for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.Http, Kafka, Files
):.handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey"))
- Type Parameters:
H
- the targetMessageHandler
type.- Parameters:
messageHandlerSpec
- theMessageHandlerSpec
to configure protocol specificMessageHandler
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
Populate aServiceActivatingHandler
for the providedMessageHandler
implementation. Can be used as Lambda expression:.handle(m -> logger.info(m.getPayload())
- Parameters:
messageHandler
- theMessageHandler
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime.- Parameters:
beanName
- the bean name to use.methodName
- the method to invoke.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
public B handle(String beanName, @Nullable String methodName, @Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime. In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Parameters:
beanName
- the bean name to use.methodName
- the method to invoke.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke the discoveredmethod
for providedservice
at runtime.- Parameters:
service
- the service object to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime. In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Parameters:
service
- the service object to use.methodName
- the method to invoke.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
public B handle(Object service, @Nullable String methodName, @Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke themethod
for providedbean
at runtime. In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Parameters:
service
- the service object to use.methodName
- the method to invoke.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke the providedGenericHandler
at runtime. Typically, used with a Lambda expression:.handle(Integer.class, (p, h) -> p / 2)
- Type Parameters:
P
- the payload type to expect, orMessage.class
.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the handler. Conversion to this type will be attempted, if necessary.handler
- the handler to invoke.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
handle
public <P> B handle(@Nullable Class<P> expectedType, GenericHandler<P> handler, @Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMethodInvokingMessageProcessor
to invoke the providedGenericHandler
at runtime. In addition, accept options for the integration endpoint usingGenericEndpointSpec
. Typically, used with a Lambda expression:.handle(Integer.class, (p, h) -> p / 2, e -> e.autoStartup(false))
- Type Parameters:
P
- the payload type to expect orMessage.class
.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the handler. Conversion to this type will be attempted, if necessary.handler
- the handler to invoke.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
handle
Populate aServiceActivatingHandler
for theMessageProcessor
from the providedMessageProcessorSpec
..handle(Scripts.script("classpath:myScript.ruby"))
- Parameters:
messageProcessorSpec
- theMessageProcessorSpec
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
public B handle(MessageProcessorSpec<?> messageProcessorSpec, @Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
for theMessageProcessor
from the providedMessageProcessorSpec
. In addition, accept options for the integration endpoint usingGenericEndpointSpec
..handle(Scripts.script("classpath:myScript.ruby"), e -> e.autoStartup(false))
- Parameters:
messageProcessorSpec
- theMessageProcessorSpec
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
public <H extends MessageHandler> B handle(MessageHandlerSpec<?, H> messageHandlerSpec, @Nullable Consumer<GenericEndpointSpec<H>> endpointConfigurer) Populate aServiceActivatingHandler
for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.Http, Kafka, Files
). In addition, accept options for the integration endpoint usingGenericEndpointSpec
. Typically, used with a Lambda expression:.handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey"), e -> e.autoStartup(false))
- Type Parameters:
H
- theMessageHandler
type.- Parameters:
messageHandlerSpec
- theMessageHandlerSpec
to configure protocol specificMessageHandler
.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
handle
public <H extends MessageHandler> B handle(H messageHandler, @Nullable Consumer<GenericEndpointSpec<H>> endpointConfigurer) Populate aServiceActivatingHandler
for the providedMessageHandler
implementation. In addition, accept options for the integration endpoint usingGenericEndpointSpec
. Can be used as Lambda expression:.handle(m -> logger.info(m.getPayload()), e -> e.autoStartup(false))
- Type Parameters:
H
- theMessageHandler
type.- Parameters:
messageHandler
- theMessageHandler
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
bridge
Populate aBridgeHandler
to the current integration flow position.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
bridge
Populate aBridgeHandler
to the current integration flow position. Typically, used with a Lambda expression:.bridge(s -> s.poller(Pollers.fixedDelay(100)) .autoStartup(false) .id("priorityChannelBridge"))
- Parameters:
endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
delay
Populate aDelayHandler
to the current integration flow position with default options. Shortcut for:.delay(delayer -> delayer.messageGroupId(groupId))
- Parameters:
groupId
- thegroupId
for delayed messages in theMessageGroupStore
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
delay
Populate aDelayHandler
to the current integration flow position. TheDelayerEndpointSpec.messageGroupId(String)
is required option.- Parameters:
endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.2
- See Also:
-
enrich
Populate aContentEnricher
to the current integration flow position with provided options. Typically, used with a Lambda expression:.enrich(e -> e.requestChannel("enrichChannel") .requestPayload(Message::getPayload) .shouldClonePayload(false) .autoStartup(false) .<Map<String, String>>headerFunction("foo", m -> m.getPayload().get("name")))
- Parameters:
enricherConfigurer
- theConsumer
to provideContentEnricher
options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
enrichHeaders
Populate aMessageTransformingHandler
for aHeaderEnricher
using header values from providedMapBuilder
. Can be used together with a namespace factory:.enrichHeaders(Mail.headers() .subjectFunction(m -> "foo") .from("foo@bar") .toFunction(m -> new String[] {"bar@baz"}))
- Parameters:
headers
- theMapBuilder
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
enrichHeaders
public B enrichHeaders(MapBuilder<?, String, Object> headers, @Nullable Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate aMessageTransformingHandler
for aHeaderEnricher
using header values from providedMapBuilder
. In addition, accept options for the integration endpoint usingGenericEndpointSpec
. Can be used together with a namespace factory:.enrichHeaders(Mail.headers() .subjectFunction(m -> "foo") .from("foo@bar") .toFunction(m -> new String[] {"bar@baz"}), e -> e.autoStartup(false))
- Parameters:
headers
- theMapBuilder
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
enrichHeaders
Accept aMap
of values to be used for theMessage
header enrichment.values
can apply anExpression
to be evaluated against a requestMessage
.- Parameters:
headers
- the Map of headers to enrich.- Returns:
- the current
IntegrationFlowDefinition
.
-
enrichHeaders
public B enrichHeaders(Map<String, Object> headers, @Nullable Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Accept aMap
of values to be used for theMessage
header enrichment.values
can apply anExpression
to be evaluated against a requestMessage
.- Parameters:
headers
- the Map of headers to enrich.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
enrichHeaders
Populate aMessageTransformingHandler
for aHeaderEnricher
as the result of providedConsumer
. Typically, used with a Lambda expression:.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.sitest") .header("directory", new File(tmpDir, "fileWritingFlow")))
- Parameters:
headerEnricherConfigurer
- theConsumer
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
split
Populate theDefaultMessageSplitter
with default options to the current integration flow position.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
splitWith
Populate the splitter with provided options to the current integration flow position:.splitWith(s -> s.applySequence(false).delimiters(","))
.splitWith(s -> s.ref("someService").method("someMethod"))
- Parameters:
splitterConfigurer
- theConsumer
to provide options splitter endpoint.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.2
- See Also:
-
split
Populate theExpressionEvaluatingSplitter
with provided SpEL expression.- Parameters:
expression
- the splitter SpEL expression. and forExpressionEvaluatingSplitter
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
split
- Parameters:
service
- the service to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
split
- Parameters:
service
- the service to use.methodName
- the method to invoke.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
split
- Parameters:
beanName
- the bean name to use.methodName
- the method to invoke at runtime.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
split
Populate theMethodInvokingSplitter
to evaluate theMessageProcessor
at runtime from providedMessageProcessorSpec
..split(Scripts.script("classpath:myScript.ruby"))
- Parameters:
messageProcessorSpec
- the splitterMessageProcessorSpec
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
split
Populate theMethodInvokingSplitter
to evaluate the providedFunction
at runtime. Typically, used with a Lambda expression:.split(String.class, p -> jdbcTemplate.execute("SELECT * from FOO", (PreparedStatement ps) -> new ResultSetIterator<Foo>(ps.executeQuery(), (rs, rowNum) -> new Foo(rs.getInt(1), rs.getString(2)))))
- Type Parameters:
P
- the payload type orMessage.class
.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the splitter. Conversion to this type will be attempted, if necessary.splitter
- the splitterFunction
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
split
public <S extends AbstractMessageSplitter> B split(MessageHandlerSpec<?, S> splitterMessageHandlerSpec) Populate the providedAbstractMessageSplitter
to the current integration flow position.- Type Parameters:
S
- theAbstractMessageSplitter
- Parameters:
splitterMessageHandlerSpec
- theMessageHandlerSpec
to populate.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
split
Populate the providedAbstractMessageSplitter
to the current integration flow position.- Parameters:
splitter
- theAbstractMessageSplitter
to populate.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
headerFilter
Provide theHeaderFilter
to the currentStandardIntegrationFlow
.- Parameters:
headersToRemove
- the array of headers (or patterns) to remove fromMessageHeaders
.- Returns:
- this
BaseIntegrationFlowDefinition
.
-
headerFilter
Provide theHeaderFilter
options via fluent API of theHeaderFilterSpec
.- Parameters:
headerFilter
- theConsumer
to provide header filter and its endpoint options.- Returns:
- this
BaseIntegrationFlowDefinition
. - Since:
- 6.2
-
headerFilter
public B headerFilter(HeaderFilter headerFilter, @Nullable Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate the providedMessageTransformingHandler
for the providedHeaderFilter
.- Parameters:
headerFilter
- theHeaderFilter
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
claimCheckIn
- Parameters:
messageStore
- theMessageStore
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
claimCheckIn
public B claimCheckIn(MessageStore messageStore, @Nullable Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate theMessageTransformingHandler
for theClaimCheckInTransformer
with providedMessageStore
. In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Parameters:
messageStore
- theMessageStore
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
claimCheckOut
Populate theMessageTransformingHandler
for theClaimCheckOutTransformer
with providedMessageStore
. TheremoveMessage
option ofClaimCheckOutTransformer
is tofalse
.- Parameters:
messageStore
- theMessageStore
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
claimCheckOut
Populate theMessageTransformingHandler
for theClaimCheckOutTransformer
with providedMessageStore
andremoveMessage
flag.- Parameters:
messageStore
- theMessageStore
to use.removeMessage
- the removeMessage boolean flag.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
claimCheckOut
public B claimCheckOut(MessageStore messageStore, boolean removeMessage, @Nullable Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) Populate theMessageTransformingHandler
for theClaimCheckOutTransformer
with providedMessageStore
andremoveMessage
flag. In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Parameters:
messageStore
- theMessageStore
to use.removeMessage
- the removeMessage boolean flag.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
resequence
Populate theResequencingMessageHandler
with default options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
resequence
Populate theResequencingMessageHandler
with provided options fromResequencerSpec
. In addition, accept options for the integration endpoint usingGenericEndpointSpec
. Typically, used with a Lambda expression:.resequence(r -> r.releasePartialSequences(true) .correlationExpression("'foo'") .phase(100))
- Parameters:
resequencer
- theConsumer
to provideResequencingMessageHandler
options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
aggregate
Populate theAggregatingMessageHandler
with default options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
aggregate
A short-cut for theaggregate((aggregator) -> aggregator.processor(aggregatorProcessor))
.- Parameters:
aggregatorProcessor
- the POJO representing aggregation strategies.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 5.5
- See Also:
-
aggregate
Populate theAggregatingMessageHandler
with provided options fromAggregatorSpec
. In addition, accept options for the integration endpoint usingGenericEndpointSpec
. Typically, used with a Lambda expression:.aggregate(a -> a.correlationExpression("1") .releaseStrategy(g -> g.size() == 25) .phase(100))
- Parameters:
aggregator
- theConsumer
to provideAggregatingMessageHandler
options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
route
Populate theMethodInvokingRouter
for provided bean and its method with default options.- Parameters:
beanName
- the bean to use.method
- the method to invoke at runtime.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
route
public B route(String beanName, @Nullable String method, @Nullable Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for provided bean and its method with provided options fromRouterSpec
.- Parameters:
beanName
- the bean to use.method
- the method to invoke at runtime.routerConfigurer
- theConsumer
to provideMethodInvokingRouter
options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
route
Populate theMethodInvokingRouter
for the discovered method of the provided service and its method with default options.- Parameters:
service
- the bean to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
route
Populate theMethodInvokingRouter
for the method of the provided service and its method with default options.- Parameters:
service
- the service to use.methodName
- the method to invoke.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
route
public B route(Object service, @Nullable String methodName, @Nullable Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for the method of the provided service and its method with provided options fromRouterSpec
.- Parameters:
service
- the service to use.methodName
- the method to invoke.routerConfigurer
- theConsumer
to provideMethodInvokingRouter
options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
route
Populate theExpressionEvaluatingRouter
for provided SpEL expression with default options.- Parameters:
expression
- the expression to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
route
public <T> B route(String expression, @Nullable Consumer<RouterSpec<T, ExpressionEvaluatingRouter>> routerConfigurer) Populate theExpressionEvaluatingRouter
for provided SpEL expression with provided options fromRouterSpec
.- Type Parameters:
T
- the target result type.- Parameters:
expression
- the expression to use.routerConfigurer
- theConsumer
to provideExpressionEvaluatingRouter
options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
route
Populate theMethodInvokingRouter
for providedFunction
and payload type with default options. Typically, used with a Lambda expression:.route(Integer.class, p -> p % 2 == 0)
- Type Parameters:
S
- the source payload type orMessage.class
.T
- the target result type.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the router. Conversion to this type will be attempted, if necessary.router
- theFunction
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
route
public <P,T> B route(@Nullable Class<P> expectedType, Function<P, T> router, @Nullable Consumer<RouterSpec<T, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for providedFunction
and payload type and options fromRouterSpec
. In addition, accept options for the integration endpoint usingGenericEndpointSpec
. Typically, used with a Lambda expression:.route(Integer.class, p -> p % 2 == 0, m -> m.channelMapping("true", "evenChannel") .subFlowMapping("false", f -> f.<Integer>handle((p, h) -> p * 3)) .applySequence(false))
- Type Parameters:
P
- the source payload type orMessage.class
.T
- the target result type.- Parameters:
expectedType
- theClass
for expected payload type. It can also beMessage.class
if you wish to access the entire message in the router. Conversion to this type will be attempted, if necessary.router
- theFunction
to use.routerConfigurer
- theConsumer
to provideMethodInvokingRouter
options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
route
Populate theMethodInvokingRouter
for theMessageProcessor
from the providedMessageProcessorSpec
with default options..route(Scripts.script(myScriptResource).lang("groovy").refreshCheckDelay(1000))
- Parameters:
messageProcessorSpec
- theMessageProcessorSpec
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
route
public B route(MessageProcessorSpec<?> messageProcessorSpec, @Nullable Consumer<RouterSpec<Object, MethodInvokingRouter>> routerConfigurer) Populate theMethodInvokingRouter
for theMessageProcessor
from the providedMessageProcessorSpec
with default options..route(Scripts.script(myScriptResource).lang("groovy").refreshCheckDelay(1000), m -> m.channelMapping("true", "evenChannel") .subFlowMapping("false", f -> f.<Integer>handle((p, h) -> p * 3)))
- Parameters:
messageProcessorSpec
- theMessageProcessorSpec
to use.routerConfigurer
- theConsumer
to provideMethodInvokingRouter
options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
route
protected <R extends AbstractMessageRouter,S extends AbstractRouterSpec<? super S, B routeR>> (S routerSpec, @Nullable Consumer<S> routerConfigurer) -
routeToRecipients
Populate theRecipientListRouter
with options from theRecipientListRouterSpec
. Typically, used with a Lambda expression:.routeToRecipients(r -> r .recipient("bar-channel", m -> m.getHeaders().containsKey("recipient") && (boolean) m.getHeaders().get("recipient")) .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload", f -> f.transform(String.class, p -> p.toUpperCase()) .channel(c -> c.queue("recipientListSubFlow1Result"))))
- Parameters:
routerConfigurer
- theConsumer
to provideRecipientListRouter
options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
routeByException
public B routeByException(Consumer<RouterSpec<Class<? extends Throwable>, ErrorMessageExceptionTypeRouter>> routerConfigurer) Populate theErrorMessageExceptionTypeRouter
with options from theRouterSpec
. Typically, used with a Lambda expression:.routeByException(r -> r .channelMapping(IllegalArgumentException.class, "illegalArgumentChannel") .subFlowMapping(MessageHandlingException.class, sf -> sf.handle(...)) )
- Parameters:
routerConfigurer
- theConsumer
to provideErrorMessageExceptionTypeRouter
options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
route
Populate the providedAbstractMessageRouter
implementation to the current integration flow position.- Parameters:
router
- theAbstractMessageRouter
to populate.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
route
public <R extends AbstractMessageRouter> B route(R router, @Nullable Consumer<GenericEndpointSpec<R>> endpointConfigurer) Populate the providedAbstractMessageRouter
implementation to the current integration flow position. In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Type Parameters:
R
- theAbstractMessageRouter
type.- Parameters:
router
- theAbstractMessageRouter
to populate.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
gateway
Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with default options. UsesRequestReplyExchanger
Proxy on the background.- Parameters:
requestChannel
- theMessageChannel
bean name.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
gateway
Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with options fromGatewayEndpointSpec
. UsesRequestReplyExchanger
Proxy on the background.- Parameters:
requestChannel
- theMessageChannel
bean name.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
gateway
Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with default options. UsesRequestReplyExchanger
Proxy on the background.- Parameters:
requestChannel
- theMessageChannel
to use.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
gateway
public B gateway(MessageChannel requestChannel, @Nullable Consumer<GatewayEndpointSpec> endpointConfigurer) Populate the "artificial"GatewayMessageHandler
for the providedrequestChannel
to send a request with options fromGatewayEndpointSpec
. UsesRequestReplyExchanger
Proxy on the background.- Parameters:
requestChannel
- theMessageChannel
to use.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
gateway
Populate the "artificial"GatewayMessageHandler
for the providedsubflow
. Typically, used with aLambda expression:.gateway(f -> f.transform("From Gateway SubFlow: "::concat))
- Parameters:
flow
- theIntegrationFlow
to to send a request message and wait for reply.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
gateway
Populate the "artificial"GatewayMessageHandler
for the providedsubflow
with options fromGatewayEndpointSpec
. Typically, used with a Lambda expression:.gateway(f -> f.transform("From Gateway SubFlow: "::concat), e -> e.replyTimeout(100L))
- Parameters:
flow
- theIntegrationFlow
to to send a request message and wait for reply.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
logging level andorg.springframework.integration.handler.LoggingHandler
as a default logging category.The full request
Message
will be logged.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for providedLoggingHandler.Level
logging level andorg.springframework.integration.handler.LoggingHandler
as a default logging category.The full request
Message
will be logged.- Parameters:
level
- theLoggingHandler.Level
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the provided logging category andINFO
logging level.The full request
Message
will be logged.- Parameters:
category
- the logging category to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level and logging category.The full request
Message
will be logged.- Parameters:
level
- theLoggingHandler.Level
.category
- the logging category to use.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, logging category and SpEL expression for the log message.- Parameters:
level
- theLoggingHandler.Level
.category
- the logging category.logExpression
- the SpEL expression to evaluate logger message at runtime against the requestMessage
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category andFunction
for the log message.- Type Parameters:
P
- the expected payload type. against the requestMessage
.- Parameters:
function
- the function to evaluate logger message at runtime- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category and SpEL expression to evaluate logger message at runtime against the requestMessage
.- Parameters:
logExpression
- theExpression
to evaluate logger message at runtime against the requestMessage
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category and SpEL expression to evaluate logger message at runtime against the requestMessage
.- Parameters:
level
- theLoggingHandler.Level
.logExpression
- theExpression
to evaluate logger message at runtime against the requestMessage
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for theINFO
LoggingHandler.Level
logging level, the provided logging category and SpEL expression to evaluate logger message at runtime against the requestMessage
.- Parameters:
category
- the logging category.logExpression
- theExpression
to evaluate logger message at runtime against the requestMessage
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, theorg.springframework.integration.handler.LoggingHandler
as a default logging category andFunction
for the log message.- Type Parameters:
P
- the expected payload type. against the requestMessage
.- Parameters:
level
- theLoggingHandler.Level
.function
- the function to evaluate logger message at runtime- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, the provided logging category andFunction
for the log message.- Type Parameters:
P
- the expected payload type. against the requestMessage
.- Parameters:
category
- the logging category.function
- the function to evaluate logger message at runtime- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
public <P> B log(LoggingHandler.Level level, @Nullable String category, Function<Message<P>, Object> function) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, logging category andFunction
for the log message.- Type Parameters:
P
- the expected payload type. against the requestMessage
.- Parameters:
level
- theLoggingHandler.Level
.category
- the logging category.function
- the function to evaluate logger message at runtime- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
log
public B log(LoggingHandler.Level level, @Nullable String category, @Nullable Expression logExpression) Populate aWireTap
for thecurrentMessageChannel
with theLoggingHandler
subscriber for the providedLoggingHandler.Level
logging level, logging category and SpEL expression for the log message.- Parameters:
level
- theLoggingHandler.Level
.category
- the logging category.logExpression
- theExpression
to evaluate logger message at runtime against the requestMessage
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - See Also:
-
scatterGather
Populate aScatterGatherHandler
to the current integration flow position based on the providedMessageChannel
for scattering function and defaultAggregatorSpec
for gathering function.- Parameters:
scatterChannel
- theMessageChannel
for scatting requests.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
scatterGather
Populate aScatterGatherHandler
to the current integration flow position based on the providedMessageChannel
for scattering function andAggregatorSpec
for gathering function.- Parameters:
scatterChannel
- theMessageChannel
for scatting requests.gatherer
- theConsumer
forAggregatorSpec
to configure gatherer. Can benull
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
scatterGather
public B scatterGather(MessageChannel scatterChannel, @Nullable Consumer<AggregatorSpec> gatherer, @Nullable Consumer<ScatterGatherSpec> scatterGather) Populate aScatterGatherHandler
to the current integration flow position based on the providedMessageChannel
for scattering function andAggregatorSpec
for gathering function.- Parameters:
scatterChannel
- theMessageChannel
for scatting requests.gatherer
- theConsumer
forAggregatorSpec
to configure gatherer. Can benull
.scatterGather
- theConsumer
forScatterGatherSpec
to configureScatterGatherHandler
and its endpoint. Can benull
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
scatterGather
Populate aScatterGatherHandler
to the current integration flow position based on the providedRecipientListRouterSpec
for scattering function and defaultAggregatorSpec
for gathering function.- Parameters:
scatterer
- theConsumer
forRecipientListRouterSpec
to configure scatterer.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
scatterGather
public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Consumer<AggregatorSpec> gatherer) Populate aScatterGatherHandler
to the current integration flow position based on the providedRecipientListRouterSpec
for scattering function andAggregatorSpec
for gathering function.- Parameters:
scatterer
- theConsumer
forRecipientListRouterSpec
to configure scatterer. Can benull
.gatherer
- theConsumer
forAggregatorSpec
to configure gatherer. Can benull
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
scatterGather
public B scatterGather(Consumer<RecipientListRouterSpec> scatterer, @Nullable Consumer<AggregatorSpec> gatherer, @Nullable Consumer<ScatterGatherSpec> scatterGather) Populate aScatterGatherHandler
to the current integration flow position based on the providedRecipientListRouterSpec
for scattering function andAggregatorSpec
for gathering function. For convenience, theAbstractRouterSpec.applySequence(boolean)
is set to true by default.- Parameters:
scatterer
- theConsumer
forRecipientListRouterSpec
to configure scatterer.gatherer
- theConsumer
forAggregatorSpec
to configure gatherer.scatterGather
- theConsumer
forScatterGatherSpec
to configureScatterGatherHandler
and its endpoint. Can benull
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
barrier
Populate aBarrierMessageHandler
instance for provided timeout.- Parameters:
timeout
- the timeout in milliseconds.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
barrier
Populate aBarrierMessageHandler
instance for provided timeout and options fromBarrierSpec
and endpoint options fromGenericEndpointSpec
.- Parameters:
timeout
- the timeout in milliseconds.barrierConfigurer
- theConsumer
to provideBarrierMessageHandler
options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
trigger
Populate aServiceActivatingHandler
instance to performMessageTriggerAction
.- Parameters:
triggerActionId
- theMessageTriggerAction
bean id.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
trigger
public B trigger(String triggerActionId, @Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
instance to performMessageTriggerAction
and endpoint options fromGenericEndpointSpec
.- Parameters:
triggerActionId
- theMessageTriggerAction
bean id.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
trigger
Populate aServiceActivatingHandler
instance to performMessageTriggerAction
.- Parameters:
triggerAction
- theMessageTriggerAction
.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
trigger
public B trigger(MessageTriggerAction triggerAction, @Nullable Consumer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) Populate aServiceActivatingHandler
instance to performMessageTriggerAction
and endpoint options fromGenericEndpointSpec
.- Parameters:
triggerAction
- theMessageTriggerAction
.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
intercept
Add one or moreChannelInterceptor
implementations to the currentcurrentMessageChannel
, in the given order, after any interceptors already registered.- Parameters:
interceptorArray
- one or moreChannelInterceptor
s.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Throws:
IllegalArgumentException
- if one or more null arguments are provided- Since:
- 5.3
-
fluxTransform
public <I,O> B fluxTransform(Function<? super reactor.core.publisher.Flux<Message<I>>, ? extends org.reactivestreams.Publisher<O>> fluxFunction) Populate aFluxMessageChannel
to start a reactive processing for upstream data, wrap it to aFlux
, apply providedFunction
viaFlux.transform(Function)
and emit the result to one moreFluxMessageChannel
, subscribed in the downstream flow.- Type Parameters:
I
- the input payload type.O
- the output type.- Parameters:
fluxFunction
- theFunction
to process data reactive manner.- Returns:
- the current
BaseIntegrationFlowDefinition
.
-
nullChannel
Add a "nullChannel" bean into this flow definition as a terminal operator.- Returns:
- The
IntegrationFlow
instance based on this definition. - Since:
- 5.1
-
handleReactive
public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive(ReactiveMessageHandlerSpec<?, H> messageHandlerSpec) Populate a terminal consumer endpoint for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.Http, Kafka, Files
). In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Type Parameters:
H
- theMessageHandler
type.- Parameters:
messageHandlerSpec
- theMessageHandlerSpec
to configure the protocol specificMessageHandler
.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.1
-
handleReactive
public <H extends ReactiveMessageHandler> IntegrationFlow handleReactive(ReactiveMessageHandlerSpec<?, H> messageHandlerSpec, @Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) Populate a terminal consumer endpoint for the selected protocol specificMessageHandler
implementation from the respective namespace factory (e.g.Http, Kafka, Files
). In addition, accept options for the integration endpoint usingGenericEndpointSpec
.- Type Parameters:
H
- theMessageHandler
type.- Parameters:
messageHandlerSpec
- theMessageHandlerSpec
to configure the protocol specificMessageHandler
.endpointConfigurer
- theConsumer
to provide integration endpoint options.- Returns:
- the current
BaseIntegrationFlowDefinition
. - Since:
- 6.1
-
handleReactive
Add aReactiveMessageHandler
as a terminalIntegrationFlow
operator.- Parameters:
reactiveMessageHandler
- theReactiveMessageHandler
to finish the flow.- Returns:
- The
IntegrationFlow
instance based on this definition. - Since:
- 6.1
-
handleReactive
public IntegrationFlow handleReactive(ReactiveMessageHandler reactiveMessageHandler, @Nullable Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> endpointConfigurer) Add aReactiveMessageHandler
as a terminalIntegrationFlow
operator.- Parameters:
reactiveMessageHandler
- theReactiveMessageHandler
to finish the flow.endpointConfigurer
- theConsumer
to configure a target endpoint for the handler.- Returns:
- The
IntegrationFlow
instance based on this definition. - Since:
- 6.1
-
to
Finish this flow with delegation to otherIntegrationFlow
instance.- Parameters:
other
- theIntegrationFlow
to compose with.- Returns:
- The
IntegrationFlow
instance based on this definition. - Since:
- 5.5.4
-
toReactivePublisher
Represent an Integration Flow as a Reactive StreamsPublisher
bean.- Type Parameters:
T
- the expectedpayload
type- Returns:
- the Reactive Streams
Publisher
-
toReactivePublisher
protected <T> org.reactivestreams.Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) Represent an Integration Flow as a Reactive StreamsPublisher
bean.- Type Parameters:
T
- the expectedpayload
type- Parameters:
autoStartOnSubscribe
- start message production and consumption in the flow, when a subscription to the publisher is initiated. If this set to true, the flow is marked to not start automatically by the application context.- Returns:
- the Reactive Streams
Publisher
- Since:
- 5.5.6
-
register
protected <S extends ConsumerEndpointSpec<? super S,? extends MessageHandler>> B register(S endpointSpec, @Nullable Consumer<? super S> endpointConfigurer) -
registerOutputChannelIfCan
-
isOutputChannelRequired
protected boolean isOutputChannelRequired() -
_this
-
get
-
checkReuse
-
extractProxyTarget
-
controlBusOnRegistry()
- will be restored in next version.