This sink application writes the content of each message it receives into Cassandra.
It expects a payload of JSON String and uses it’s properties to map to table columns.
Headers:
Content-Type: application/json
Payload:
A JSON String or byte array representing the entity (or a list of entities) to be persisted
The cassandra sink has the following options:
- cassandra.async
- Async mode for CassandraMessageHandler. (Boolean, default:
true
) - cassandra.cluster.create-keyspace
- Flag to create (or not) keyspace on application startup. (Boolean, default:
false
) - cassandra.cluster.entity-base-packages
- Base packages to scan for entities annotated with Table annotations. (String[], default:
[]
) - cassandra.cluster.init-script
- Resource with CQL scripts (delimited by ';') to initialize keyspace schema. (Resource, default:
<none>
) - cassandra.cluster.metrics-enabled
- Enable/disable metrics collection for the created cluster. (Boolean, default:
<none>
) - cassandra.cluster.skip-ssl-validation
- Flag to validate the Servers' SSL certs (Boolean, default:
false
) - cassandra.consistency-level
- The consistency level for write operation. (ConsistencyLevel, default:
<none>
, possible values: ANY
,ONE
,TWO
,THREE
,QUORUM
,ALL
,LOCAL_QUORUM
,EACH_QUORUM
,SERIAL
,LOCAL_SERIAL
,LOCAL_ONE
) - cassandra.ingest-query
- Ingest Cassandra query. (String, default:
<none>
) - cassandra.query-type
- QueryType for Cassandra Sink. (Type, default:
<none>
, possible values: INSERT
,UPDATE
,DELETE
,STATEMENT
) - cassandra.statement-expression
- Expression in Cassandra query DSL style. (Expression, default:
<none>
) - cassandra.ttl
- Time-to-live option of WriteOptions. (Integer, default:
0
) - spring.data.cassandra.cluster-name
- Name of the Cassandra cluster. (String, default:
<none>
) - spring.data.cassandra.compression
- Compression supported by the Cassandra binary protocol. (Compression, default:
none
, possible values: `,`snappy
,lz4
) - spring.data.cassandra.connect-timeout
- Socket option: connection time out. (Duration, default:
<none>
) - spring.data.cassandra.consistency-level
- Queries consistency level. (ConsistencyLevel, default:
<none>
, possible values: ANY
,ONE
,TWO
,THREE
,QUORUM
,ALL
,LOCAL_QUORUM
,EACH_QUORUM
,SERIAL
,LOCAL_SERIAL
,LOCAL_ONE
) - spring.data.cassandra.contact-points
- Cluster node addresses. (List<String>, default:
[localhost]
) - spring.data.cassandra.fetch-size
- Queries default fetch size. (Integer, default:
<none>
) - spring.data.cassandra.jmx-enabled
- Whether to enable JMX reporting. Default to false as Cassandra JMX reporting is not compatible with Dropwizard Metrics. (Boolean, default:
false
) - spring.data.cassandra.keyspace-name
- Keyspace name to use. (String, default:
<none>
) - spring.data.cassandra.load-balancing-policy
- Class name of the load balancing policy. The class must have a default constructor. (Class<LoadBalancingPolicy>, default:
<none>
) - spring.data.cassandra.password
- Login password of the server. (String, default:
<none>
) - spring.data.cassandra.port
- Port of the Cassandra server. (Integer, default:
<none>
) - spring.data.cassandra.read-timeout
- Socket option: read time out. (Duration, default:
<none>
) - spring.data.cassandra.reconnection-policy
- Class name of the reconnection policy. The class must have a default constructor. (Class<ReconnectionPolicy>, default:
<none>
) - spring.data.cassandra.retry-policy
- Class name of the retry policy. The class must have a default constructor. (Class<RetryPolicy>, default:
<none>
) - spring.data.cassandra.schema-action
- Schema action to take at startup. (String, default:
none
) - spring.data.cassandra.serial-consistency-level
- Queries serial consistency level. (ConsistencyLevel, default:
<none>
, possible values: ANY
,ONE
,TWO
,THREE
,QUORUM
,ALL
,LOCAL_QUORUM
,EACH_QUORUM
,SERIAL
,LOCAL_SERIAL
,LOCAL_ONE
) - spring.data.cassandra.ssl
- Enable SSL support. (Boolean, default:
false
) - spring.data.cassandra.username
- Login user of the server. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
The following example assumes a JSON payload is sent to a default destination called input
, the sink parses some of its properties (id,time,customer_id,value) and persists them into a table called orders
.
java -jar cassandra_sink.jar --cassandra.cluster.keyspace=test
--cassandra.ingest-query="insert into orders(id,time,customer_id, value) values (?,?,?,?)"
Counter that compute multiple counters from the received messages. It leverages the Micrometer library and can use various popular TSDB to persist the counter values.
By default the Counter Sink increments the message
.name
counter on every received message. The message-counter-enabled
allows you to disable this counter when required.
If tag expressions are provided (via the counter.tag.expression.<tagKey>=<tagValue SpEL expression> property) then the `name
counter is incremented. Note that each SpEL expression can evaluate into multiple values resulting into multiple counter increments (one fore every value resolved).
If fixed tags are provided they are include in all message and expression counter increment measurements.
Counter’s implementation is based on the Micrometer library which is a Vendor-neutral application metrics facade that supports the most popular monitoring systems.
See the Micrometer documentation for the list of supported monitoring systems. Starting with Spring Boot 2.0, Micrometer is the instrumentation library powering the delivery of application metrics from Spring Boot.
All Spring Cloud Stream App Starters are configured to support two of the most popular monitoring systems, Prometheus and InfluxDB. You can declaratively select which monitoring system to use.
If you are not using Prometheus or InfluxDB, you can customise the App starters to use a different monitoring system as well as include your preferred micrometer monitoring system library in your own custom applications.
Grafana is a popular platform for building visualization dashboards.
To enable Micrometer’s Prometheus meter registry for Spring Cloud Stream application starters, set the following properties.
management.metrics.export.prometheus.enabled=true
management.endpoints.web.exposure.include=prometheus
and disable the application’s security which allows for a simple Prometheus configuration to scrape counter information by setting the following property.
spring.cloud.streamapp.security.enabled=false
To enable Micrometer’s Influx meter registry for Spring Cloud Stream application starters, set the following property.
management.metrics.export.influx.enabled=true
management.metrics.export.influx.uri={influxdb-server-url}
Following diagram illustrates Counter’s information collection and processing flow.
- counter.message-counter-enabled
- Enables counting the number of messages processed. Uses the 'message.' counter name prefix to distinct it form the expression based counter. The message counter includes the fixed tags when provided. (Boolean, default:
true
) - counter.name
- The name of the counter to increment. (String, default:
<none>
) - counter.name-expression
- A SpEL expression (against the incoming Message) to derive the name of the counter to increment. (Expression, default:
<none>
) - counter.tag.expression
- Computes tags from SpEL expression. Single SpEL expression can produce an array of values, which in turn means distinct name/value tags. Every name/value tag will produce a separate counter increment. Tag expression format is: counter.tag.expression.[tag-name]=[SpEL expression] (Map<String, Expression>, default:
<none>
) - counter.tag.fixed
- Custom tags assigned to every counter increment measurements. This is a map so the property convention fixed tags is: counter.tag.fixed.[tag-name]=[tag-value] (Map<String, String>, default:
<none>
)
This module writes each message it receives to a file.
java.io.File
java.io.InputStream
byte[]
String
N/A (writes to the file system).
The file sink has the following options:
- file.binary
- A flag to indicate whether adding a newline after the write should be suppressed. (Boolean, default:
false
) - file.charset
- The charset to use when writing text content. (String, default:
UTF-8
) - file.directory
- The parent directory of the target file. (String, default:
<none>
) - file.directory-expression
- The expression to evaluate for the parent directory of the target file. (Expression, default:
<none>
) - file.mode
- The FileExistsMode to use if the target file already exists. (FileExistsMode, default:
<none>
, possible values: APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - file.name
- The name of the target file. (String, default:
file-sink
) - file.name-expression
- The expression to evaluate for the name of the target file. (String, default:
<none>
) - file.suffix
- The suffix to append to file name. (String, default:
<empty string>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar file_sink.jar --file.directory=/tmp/bar
FTP sink is a simple option to push files to an FTP server from incoming messages.
It uses an ftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
![[Note]](images/note.png) | Note |
---|
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file. |
file_name
(See note above)
java.io.File
java.io.InputStream
byte[]
String
N/A (writes to the FTP server).
The ftp sink has the following options:
- ftp.auto-create-dir
- Whether or not to create the remote directory. (Boolean, default:
true
) - ftp.factory.cache-sessions
- <documentation missing> (Boolean, default:
<none>
) - ftp.factory.client-mode
- The client mode to use for the FTP session. (ClientMode, default:
<none>
, possible values: ACTIVE
,PASSIVE
) - ftp.factory.host
- <documentation missing> (String, default:
<none>
) - ftp.factory.password
- <documentation missing> (String, default:
<none>
) - ftp.factory.port
- The port of the server. (Integer, default:
21
) - ftp.factory.username
- <documentation missing> (String, default:
<none>
) - ftp.filename-expression
- A SpEL expression to generate the remote file name. (Expression, default:
<none>
) - ftp.mode
- Action to take if the remote file already exists. (FileExistsMode, default:
<none>
, possible values: APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - ftp.remote-dir
- The remote FTP directory. (String, default:
/
) - ftp.remote-file-separator
- The remote file separator. (String, default:
/
) - ftp.temporary-remote-dir
- A temporary directory where the file will be written if '#isUseTemporaryFilename()'
is true. (String, default:
/
) - ftp.tmp-file-suffix
- The suffix to use while the transfer is in progress. (String, default:
.tmp
) - ftp.use-temporary-filename
- Whether or not to write to a temporary file and rename. (Boolean, default:
true
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar ftp_sink.jar --ftp.remote-dir=bar --ftp.factory.host=ftpserver \
--ftp.factory.username=user --ftp.factory.password=pw
The Gemfire sink allows one to write message payloads to a Gemfire server.
To enable SSL communication between Geode Sink and the Geode cluster you need to provide the URIs of the
Keystore and Truststore files using the gemfire.security.ssl.keystore-uri
and gemfire.security.ssl.truststore-uri
properties.
(If a single file is used for both stores then point both URIs to it).
content-type: application/x-java-serialized-object
The gemfire sink has the following options:
- gemfire.json
- Indicates if the Gemfire region stores json objects as native Gemfire PdxInstance (Boolean, default:
false
) - gemfire.key-expression
- SpEL expression to use as a cache key (String, default:
<none>
) - gemfire.pool.connect-type
- Specifies connection type: 'server' or 'locator'. (ConnectType, default:
<none>
, possible values: locator
,server
) - gemfire.pool.host-addresses
- Specifies one or more Gemfire locator or server addresses formatted as [host]:[port]. (InetSocketAddress[], default:
<none>
) - gemfire.pool.subscription-enabled
- Set to true to enable subscriptions for the client pool. Required to sync updates to the client cache. (Boolean, default:
false
) - gemfire.region.region-name
- The region name. (String, default:
<none>
) - gemfire.security.password
- The cache password. (String, default:
<none>
) - gemfire.security.ssl.ciphers
- Configures the SSL ciphers used for secure Socket connections as an array of valid cipher names. (String, default:
any
) - gemfire.security.ssl.keystore-type
- Identifies the type of Keystore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default:
JKS
) - gemfire.security.ssl.keystore-uri
- Location of the pre-created Keystore URI to be used for connecting to the Geode cluster. (Resource, default:
<none>
) - gemfire.security.ssl.ssl-keystore-password
- Password for accessing the keys truststore (String, default:
<none>
) - gemfire.security.ssl.ssl-truststore-password
- Password for accessing the trust store. (String, default:
<none>
) - gemfire.security.ssl.truststore-type
- Identifies the type of truststore used for SSL communications (e.g. JKS, PKCS11, etc.). (String, default:
JKS
) - gemfire.security.ssl.truststore-uri
- Location of the pre-created truststore URI to be used for connecting to the Geode cluster. (Resource, default:
<none>
) - gemfire.security.ssl.user-home-directory
- Local directory to cache the truststore and keystore files downloaded form the truststoreUri and keystoreUri locations. (String, default:
user.home
) - gemfire.security.username
- The cache username. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar gemfire-sink.jar --gemfire.keyExpression=
A sink module that route messages into GPDB/HAWQ
segments via
gpfdist protocol. Internally, this sink creates a custom http listener that supports
the gpfdist
protcol and schedules a task that orchestrates a gploadd
session in the
same way it is done natively in Greenplum.
No data is written into temporary files and all data is kept in stream buffers waiting
to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum,
the sink will block until such sessions are established.
The gpfdist sink has the following options:
- gpfdist.batch-count
- Number of windowed batch each segment takest (int, default: 100) (Integer, default:
100
) - gpfdist.batch-period
- Time in seconds for each load operation to sleep in between operations (int, default: 10) (Integer, default:
10
) - gpfdist.batch-timeout
- Timeout in seconds for segment inactivity. (Integer, default: 4) (Integer, default:
4
) - gpfdist.column-delimiter
- Data record column delimiter. *(Character, default: no default) (Character, default:
<none>
) - gpfdist.control-file
- Path to yaml control file (String, no default) (Resource, default:
<none>
) - gpfdist.db-host
- Database host (String, default: localhost) (String, default:
localhost
) - gpfdist.db-name
- Database name (String, default: gpadmin) (String, default:
gpadmin
) - gpfdist.db-password
- Database password (String, default: gpadmin) (String, default:
gpadmin
) - gpfdist.db-port
- Database port (int, default: 5432) (Integer, default:
5432
) - gpfdist.db-user
- Database user (String, default: gpadmin) (String, default:
gpadmin
) - gpfdist.delimiter
- Data line delimiter (String, default: newline character) (String, default:
) - gpfdist.flush-count
- Flush item count (int, default: 100) (Integer, default:
100
) - gpfdist.flush-time
- Flush item time (int, default: 2) (Integer, default:
2
) - gpfdist.gpfdist-port
- Port of gpfdist server. Default port `0` indicates that a random port is chosen. (Integer, default: 0) (Integer, default:
0
) - gpfdist.log-errors
- Enable log errors. (Boolean, default: false) (Boolean, default:
false
) - gpfdist.match-columns
- Match columns with update (String, no default) (String, default:
<none>
) - gpfdist.mode
- Mode, either insert or update (String, no default) (String, default:
<none>
) - gpfdist.null-string
- Null string definition. (String, default: ``) (String, default:
<none>
) - gpfdist.rate-interval
- Enable transfer rate interval (int, default: 0) (Integer, default:
0
) - gpfdist.segment-reject-limit
- Error reject limit. (String, default: ``) (String, default:
<none>
) - gpfdist.segment-reject-type
- Error reject type, either `rows` or `percent`. (String, default: `rows`) (SegmentRejectType, default:
<none>
, possible values: ROWS
,PERCENT
) - gpfdist.sql-after
- Sql to run after load (String, no default) (String, default:
<none>
) - gpfdist.sql-before
- Sql to run before load (String, no default) (String, default:
<none>
) - gpfdist.table
- Target database table (String, no default) (String, default:
<none>
) - gpfdist.update-columns
- Update columns with update (String, no default) (String, default:
<none>
) - spring.net.hostdiscovery.loopback
- The new loopback flag. Default value is FALSE (Boolean, default:
false
) - spring.net.hostdiscovery.match-interface
- The new match interface regex pattern. Default value is is empty (String, default:
<none>
) - spring.net.hostdiscovery.match-ipv4
- Used to match ip address from a network using a cidr notation (String, default:
<none>
) - spring.net.hostdiscovery.point-to-point
- The new point to point flag. Default value is FALSE (Boolean, default:
false
) - spring.net.hostdiscovery.prefer-interface
- The new preferred interface list (List<String>, default:
<none>
)
4.6.4 Implementation Notes
Within a gpfdist
sink we have a Reactor based stream where data is published from the incoming SI channel.
This channel receives data from the Message Bus. The Reactor stream is then connected to Netty
based
http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among
existing http clients. When Greenplum
does a load from an external table, each segment will initiate
a http connection and start loading data. The net effect is that incoming data is automatically spread
among the Greenplum segments.
4.6.5 Detailed Option Descriptions
The gpfdist sink supports the following configuration properties:
- table
Database table to work with. (String, default: ``, required)
This option denotes a table where data will be inserted or updated.
Also external table structure will be derived from structure of this
table.
Currently table
is only way to define a structure of an external
table. Effectively it will replace other_table
in below clause
segment.
CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
- mode
Gpfdist mode, either `insert` or `update`. (String, default: insert
)
Currently only insert
and update
gpfdist mode is supported. Mode
merge
familiar from a native gpfdist loader is not yet supported.
For mode update
options matchColumns
and updateColumns
are
required.
- columnDelimiter
Data record column delimiter. (Character, default: ``)
Defines used delimiter
character in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[DELIMITER AS 'delimiter']
- segmentRejectLimit
Error reject limit. (String, default: ``)
Defines a count
value in a below clause segment.
[ [LOG ERRORS] SEGMENT REJECT LIMIT count
[ROWS | PERCENT] ]
As a conveniance this reject limit also recognizes a percentage format
2%
and if used, segmentRejectType
is automatically set to
percent
.
- segmentRejectType
Error reject type, either `rows` or `percent`. (String, default: ``)
Defines ROWS
or PERCENT
in below clause segment.
[ [LOG ERRORS] SEGMENT REJECT LIMIT count
[ROWS | PERCENT] ]
- logErrors
Enable or disable log errors. (Boolean, default: false
)
As error logging is optional with SEGMENT REJECT LIMIT
, it’s only used
if both segmentRejectLimit
and segmentRejectType
are set. Enables
the error log in below clause segment.
[ [LOG ERRORS] SEGMENT REJECT LIMIT count
[ROWS | PERCENT] ]
- nullString
Null string definition. (String, default: ``)
Defines used null string
in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[NULL AS 'null string']
- delimiter
Data record delimiter for incoming messages. (String, default: \n
)
On default a delimiter in this option will be added as a postfix to
every message sent into this sink. Currently NEWLINE is not a
supported config option and line termination for data is coming from a
default functionality.
| If not specified, a Greenplum Database segment will detect the
newline type by looking at the first row of data it receives and
using the first newline type encountered. | |
| --
External Table Docs
|
- matchColumns
Comma delimited list of columns to match. (String, default: ``)
![[Note]](images/note.png) | Note |
---|
See more from examples below. |
- updateColumns
Comma delimited list of columns to update. (String, default: ``)
![[Note]](images/note.png) | Note |
---|
See more from examples below. |
- sqlBefore
- Sql clause to run before each load operation. (String, default: ``)
- sqlAfter
- Sql clause to run after each load operation. (String, default: ``)
- rateInterval
Debug rate of data transfer. (Integer, default: 0
)
If set to non zero, sink will log a rate of messages passing throught
a sink after number of messages denoted by this setting has been
processed. Value 0
means that this rate calculation and logging is
disabled.
- flushCount
Max collected size per windowed data. (Integer, default: 100
)
![[Note]](images/note.png) | Note |
---|
For more info on flush and batch settings, see above. |
4.6.6 How Data Is Sent Into Segments
There are few important concepts involving how data passes into a
sink, through it and finally lands into a database.
- Sink has its normal message handler for incoming data from a source
module, gpfdist protocol listener based on netty where segments
connect to and in between those two a reactor based streams
controlling load balancing into different segment connections.
- Incoming data is first sent into a reactor which first constructs a
windows. This window is then released into a downstream when it gets
full(
flushTime
) or timeouts(flushTime
) if window doesn’t get full.
One window is then ready to get send into a segment. - Segments which connects to this stream are now able to see a stream
of window data, not stream of individual messages. We can also call
this as a stream of batches.
- When segment makes a connection to a protocol listener it subscribes
itself into this stream and takes count of batches denoted by
batchCount
and completes a stream if it got enough batches or if
batchTimeout
occurred due to inactivity. - It doesn’t matter how many simultaneous connections there are from
a database cluster at any given time as reactor will load balance
batches with all subscribers.
- Database cluster will initiate this loading session when select is
done from an external table which will point to this sink. These
loading operations are run in a background in a loop one after
another. Option
batchPeriod
is then used as a sleep time in
between these load sessions.
Lets take a closer look how options flushCount
, flushTime
,
batchCount
, batchTimeout
and batchPeriod
work.
As in a highest level where incoming data into a sink is windowed,
flushCount
and flushTime
controls when a batch of messages are
sent into a downstream. If there are a lot of simultaneous segment
connections, flushing less will keep more segments inactive as there
is more demand for batches than what flushing will produce.
When existing segment connection is active and it has subscribed
itself with a stream of batches, data will keep flowing until either
batchCount
is met or batchTimeout
occurs due to inactivity of data
from an upstream. Higher a batchCount
is more data each segment
will read. Higher a batchTimeout
is more time segment will wait in
case there is more data to come.
As gpfdist load operations are done in a loop, batchPeriod
simply
controls not to run things in a buzy loop. Buzy loop would be ok if
there is a constant stream of data coming in but if incoming data is
more like bursts then buzy loop would be unnecessary.
![[Note]](images/note.png) | Note |
---|
Data loaded via gpfdist will not become visible in a database until
whole distributed loading session have finished successfully. |
Reactor is also handling backpressure meaning if existing load
operations will not produce enought demand for data, eventually
message passing into a sink will block. This happens when Reactor’s
internal ring buffer(size of 32 items) gets full. Flow of data through
sink really happens when data is pulled from it by segments.
In this first example we’re just creating a simple stream which
inserts data from a time
source. Let’s create a table with two
text columns.
gpadmin=# create table ticktock (date text, time text);
Create a simple stream gpstream
.
dataflow:>stream create --name gpstream1 --definition "time | gpfdist
--dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1
--flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy
Let it run and see results from a database.
gpadmin=# select count(*) from ticktock;
count
-------
14
(1 row)
In previous example we did a simple inserts into a table. Let’s see
how we can update data in a table. Create a simple table httpdata with
three text columns and insert some data.
gpadmin=# create table httpdata (col1 text, col2 text, col3 text);
gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA');
gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');
Now table looks like this.
gpadmin=# select * from httpdata;
col1 | col2 | col3
-------+------+------
DATA3 | DATA | DATA
DATA2 | DATA | DATA
DATA1 | DATA | DATA
(3 rows)
Let’s create a stream which will update table httpdata by matching a
column col1 and updates columns col2 and col3.
dataflow:>stream create --name gpfdiststream2 --definition "http
--server.port=8081|gpfdist --mode=update --table=httpdata
--dbHost=mdw --columnDelimiter=',' --matchColumns=col1
--updateColumns=col2,col3" --deploy
Post some data into a stream which will be passed into a gpfdist sink
via http source.
curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/
If you query table again, you’ll see that row for DATA1 has been
updated.
gpadmin=# select * from httpdata;
col1 | col2 | col3
-------+-------+-------
DATA3 | DATA | DATA
DATA2 | DATA | DATA
DATA1 | DATA1 | DATA1
(3 rows)
4.6.8 Tuning Transfer Rate
Default values for options flushCount
, flushTime
, batchCount
,
batchTimeout
and batchPeriod
are relatively conservative and needs
to be tuned for every use case for optimal performance. Order to make
a decision on how to tune sink behaviour to suit your needs few things
needs to be considered.
- What is an average size of messages ingested by a sink.
- How fast you want data to become visible in a database.
- Is incoming data a constant flow or a bursts of data.
Everything what flows throught a sink is kept in-memory and because
sink is handling backpressure, memory consumption is relatively low.
However because sink cannot predict what is an average size of
an incoming data and this data is anyway windowed later in a
downstream you should not allow window size to become too large if
average data size is large as every batch of data is kept in memory.
Generally speaking if you have a lot of segments in a load operation,
it’s adviced to keep flushed window size relatively small which allows
more segments to stay active. This however also depends on how much
data is flowing in into a sink itself.
Longer a load session for each segment is active higher the overall
transfer rate is going to be. Option batchCount
naturally controls
this. However option batchTimeout
then really controls how fast each
segment will complete a stream due to inactivity from upstream and to
step away from a loading session to allow distributes session to
finish and data become visible in a database.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and
build it:
$ ./mvnw clean package
This module writes each message it receives to HDFS.
The hdfs sink has the following options:
- hdfs.close-timeout
- Timeout in ms, regardless of activity, after which file will be automatically closed. (Long, default:
0
) - hdfs.codec
- Compression codec alias name (gzip, snappy, bzip2, lzo, or slzo). (String, default:
<none>
) - hdfs.directory
- Base path to write files to. (String, default:
<none>
) - hdfs.enable-sync
- Whether writer will sync to datanode when flush is called, setting this to 'true' could impact throughput. (Boolean, default:
false
) - hdfs.file-extension
- The base filename extension to use for the created files. (String, default:
txt
) - hdfs.file-name
- The base filename to use for the created files. (String, default:
<none>
) - hdfs.file-open-attempts
- Maximum number of file open attempts to find a path. (Integer, default:
10
) - hdfs.file-uuid
- Whether file name should contain uuid. (Boolean, default:
false
) - hdfs.flush-timeout
- Timeout in ms, regardless of activity, after which data written to file will be flushed. (Long, default:
0
) - hdfs.fs-uri
- URL for HDFS Namenode. (String, default:
<none>
) - hdfs.idle-timeout
- Inactivity timeout in ms after which file will be automatically closed. (Long, default:
0
) - hdfs.in-use-prefix
- Prefix for files currently being written. (String, default:
<none>
) - hdfs.in-use-suffix
- Suffix for files currently being written. (String, default:
<none>
) - hdfs.overwrite
- Whether writer is allowed to overwrite files in Hadoop FileSystem. (Boolean, default:
false
) - hdfs.partition-path
- A SpEL expression defining the partition path. (String, default:
<none>
) - hdfs.rollover
- Threshold in bytes when file will be automatically rolled over. (Integer, default:
1000000000
)
![[Note]](images/note.png) | Note |
---|
This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one. |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar hdfs-sink.jar --fsUri
A module that writes its incoming payload to an RDBMS using JDBC.
Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)
The jdbc sink has the following options:
- jdbc.columns
- The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update.
Names are used at initialization time to issue the DDL. (String, default:
payload:payload.toString()
) - jdbc.initialize
- 'true', 'false' or the location of a custom initialization script for the table. (String, default:
false
) - jdbc.table-name
- The name of the table to write into. (String, default:
messages
) - jdbc.batch-size
- Threshold in number of messages when data will be flushed to database table. (Integer, default:
1
) - jdbc.idle-timeout
- Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default:
-1
) - spring.datasource.data
- Data (DML) script resource references. (List<String>, default:
<none>
) - spring.datasource.driver-class-name
- Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - spring.datasource.initialization-mode
- Initialize the datasource using available DDL and DML scripts. (DataSourceInitializationMode, default:
embedded
, possible values: ALWAYS
,EMBEDDED
,NEVER
) - spring.datasource.password
- Login password of the database. (String, default:
<none>
) - spring.datasource.schema
- Schema (DDL) script resource references. (List<String>, default:
<none>
) - spring.datasource.url
- JDBC url of the database. (String, default:
<none>
) - spring.datasource.username
- Login username of the database. (String, default:
<none>
)
The jdbc.columns
property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE]
where EXPRESSION_FOR_VALUE
(together with the colon) is optional.
In this case the value is evaluated via generated expression like payload.COLUMN_NAME
, so this way we have a direct mapping from object properties to the table column.
For example we have a JSON payload like:
{
"name": "My Name"
"address": {
"city": "Big City",
"street": "Narrow Alley"
}
}
So, we can insert it into the table with name
, city
and street
structure using the configuration:
--jdbc.columns=name,city:address.city,street:address.street
This sink supports batch inserts, as far as supported by the underlying JDBC driver.
Batch inserts are configured via the batch-size
and idle-timeout
properties:
Incoming messages are aggregated until batch-size
messages are present, then inserted as a batch.
If idle-timeout
milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size
, capping maximum latency.
![[Note]](images/note.png) | Note |
---|
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply. |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar jdbc-sink.jar --jdbc.tableName=names --jdbc.columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
The log
sink uses the application logger to output the data for inspection.
Please understand that log
sink uses type-less handler, which affects how the actual logging will be performed.
This means that if the content-type is textual, then raw payload bytes will be converted to String, otherwise raw bytes will be logged.
Please see more info in the user-guide.
The log sink has the following options:
- log.expression
- A SpEL expression (against the incoming message) to evaluate as the logged message. (String, default:
payload
) - log.level
- The level at which to log messages. (Level, default:
<none>
, possible values: FATAL
,ERROR
,WARN
,INFO
,DEBUG
,TRACE
) - log.name
- The name of the logger to use. (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
This module sends messages to RabbitMQ.
content-type: application/octet-stream
content-type: application/x-java-serialized-object
Note: With converterBeanName = jsonConverter
any object that can be converted to JSON by Jackson (content type sent to rabbit will be application/json
with type information in other headers.
With converterBeanName
set to something else, payload will be any object that the converter can handle.
The rabbit sink has the following options:
(See the Spring Boot documentation for RabbitMQ connection properties)
- rabbit.converter-bean-name
- The bean name for a custom message converter; if omitted, a SimpleMessageConverter is used.
If 'jsonConverter', a Jackson2JsonMessageConverter bean will be created for you. (String, default:
<none>
) - rabbit.exchange
- Exchange name - overridden by exchangeNameExpression, if supplied. (String, default:
<empty string>
) - rabbit.exchange-expression
- A SpEL expression that evaluates to an exchange name. (Expression, default:
<none>
) - rabbit.mapped-request-headers
- Headers that will be mapped. (String[], default:
[*]
) - rabbit.own-connection
- When true, use a separate connection based on the boot properties. (Boolean, default:
false
) - rabbit.persistent-delivery-mode
- Default delivery mode when 'amqp_deliveryMode' header is not present,
true for PERSISTENT. (Boolean, default:
false
) - rabbit.routing-key
- Routing key - overridden by routingKeyExpression, if supplied. (String, default:
<none>
) - rabbit.routing-key-expression
- A SpEL expression that evaluates to a routing key. (Expression, default:
<none>
) - spring.rabbitmq.addresses
- Comma-separated list of addresses to which the client should connect. (String, default:
<none>
) - spring.rabbitmq.connection-timeout
- Connection timeout. Set it to zero to wait forever. (Duration, default:
<none>
) - spring.rabbitmq.host
- RabbitMQ host. (String, default:
localhost
) - spring.rabbitmq.password
- Login to authenticate against the broker. (String, default:
guest
) - spring.rabbitmq.port
- RabbitMQ port. (Integer, default:
5672
) - spring.rabbitmq.publisher-confirms
- Whether to enable publisher confirms. (Boolean, default:
false
) - spring.rabbitmq.publisher-returns
- Whether to enable publisher returns. (Boolean, default:
false
) - spring.rabbitmq.requested-heartbeat
- Requested heartbeat timeout; zero for none. If a duration suffix is not specified,
seconds will be used. (Duration, default:
<none>
) - spring.rabbitmq.username
- Login user to authenticate to the broker. (String, default:
guest
) - spring.rabbitmq.virtual-host
- Virtual host to use when connecting to the broker. (String, default:
<none>
)
![[Note]](images/note.png) | Note |
---|
By default, the message converter is a SimpleMessageConverter which handles byte[] , String and
java.io.Serializable .
A well-known bean name jsonConverter will configure a Jackson2JsonMessageConverter instead.
In addition, a custom converter bean can be added to the context and referenced by the converterBeanName property. |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar rabbit-sink.jar --rabbit.routingKey=
java -jar rabbit-sink.jar --rabbit.routingKeyExpression=
This sink application ingest incoming data into MongoDB.
This application is fully based on the MongoDataAutoConfiguration
, so refer to the Spring Boot MongoDB Support for more information.
The mongodb sink has the following options:
- mongodb.collection
- The MongoDB collection to store data (String, default:
<none>
) - mongodb.collection-expression
- The SpEL expression to evaluate MongoDB collection (Expression, default:
<none>
) - spring.data.mongodb.authentication-database
- Authentication database name. (String, default:
<none>
) - spring.data.mongodb.database
- Database name. (String, default:
<none>
) - spring.data.mongodb.field-naming-strategy
- Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - spring.data.mongodb.grid-fs-database
- GridFS database name. (String, default:
<none>
) - spring.data.mongodb.host
- Mongo server host. Cannot be set with URI. (String, default:
<none>
) - spring.data.mongodb.password
- Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - spring.data.mongodb.port
- Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - spring.data.mongodb.uri
- Mongo database URI. Cannot be set with host, port and credentials. (String, default:
mongodb://localhost/test
) - spring.data.mongodb.username
- Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
)
Also see the Spring Boot Documentation for additional MongoProperties
properties.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar mongodb-sink.jar --mongodb.collection=
java -jar mongodb-sink.jar --mongodb.collectionExpression=
This module sends messages to MQTT.
The mqtt sink has the following options:
- mqtt.async
- whether or not to use async sends (Boolean, default:
false
) - mqtt.charset
- the charset used to convert a String payload to byte[] (String, default:
UTF-8
) - mqtt.clean-session
- whether the client and server should remember state across restarts and reconnects (Boolean, default:
true
) - mqtt.client-id
- identifies the client (String, default:
stream.client.id.sink
) - mqtt.connection-timeout
- the connection timeout in seconds (Integer, default:
30
) - mqtt.keep-alive-interval
- the ping interval in seconds (Integer, default:
60
) - mqtt.password
- the password to use when connecting to the broker (String, default:
guest
) - mqtt.persistence
- 'memory' or 'file' (String, default:
memory
) - mqtt.persistence-directory
- Persistence directory (String, default:
/tmp/paho
) - mqtt.qos
- the quality of service to use (Integer, default:
1
) - mqtt.retained
- whether to set the 'retained' flag (Boolean, default:
false
) - mqtt.topic
- the topic to which the sink will publish (String, default:
stream.mqtt
) - mqtt.url
- location of the mqtt broker(s) (comma-delimited list) (String[], default:
[tcp://localhost:1883]
) - mqtt.username
- the username to use when connecting to the broker (String, default:
guest
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar mqtt-sink.jar --mqtt.clientid= --mqtt.topic=
A module that writes its incoming payload to an RDBMS using the PostgreSQL COPY command.
Column expression will be evaluated against the message and the expression will usually be compatible with only one type (such as a Map or bean etc.)
The jdbc sink has the following options:
- pgcopy.batch-size
- Threshold in number of messages when data will be flushed to database table. (Integer, default:
10000
) - pgcopy.columns
- The names of the columns that shall receive data.
Also used at initialization time to issue the DDL. (List<String>, default:
payload
) - pgcopy.delimiter
- Specifies the character that separates columns within each row (line) of the file. The default is a tab character
in text format, a comma in CSV format. This must be a single one-byte character. Using an escaped value like '\t'
is allowed. (String, default:
<none>
) - pgcopy.error-table
- The name of the error table used for writing rows causing errors. The error table should have three columns
named "table_name", "error_message" and "payload" large enough to hold potential data values.
You can use the following DDL to create this table:
'CREATE TABLE ERRORS (TABLE_NAME VARCHAR(255), ERROR_MESSAGE TEXT,PAYLOAD TEXT)' (String, default:
<none>
) - pgcopy.escape
- Specifies the character that should appear before a data character that matches the QUOTE value. The default is
the same as the QUOTE value (so that the quoting character is doubled if it appears in the data). This must be
a single one-byte character. This option is allowed only when using CSV format. (Character, default:
<none>
) - pgcopy.format
- Format to use for the copy command. (Format, default:
<none>
, possible values: TEXT
,CSV
) - pgcopy.idle-timeout
- Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default:
-1
) - pgcopy.initialize
- 'true', 'false' or the location of a custom initialization script for the table. (String, default:
false
) - pgcopy.null-string
- Specifies the string that represents a null value. The default is \N (backslash-N) in text format, and an
unquoted empty string in CSV format. (String, default:
<none>
) - pgcopy.quote
- Specifies the quoting character to be used when a data value is quoted. The default is double-quote. This must
be a single one-byte character. This option is allowed only when using CSV format. (Character, default:
<none>
) - pgcopy.table-name
- The name of the table to write into. (String, default:
<none>
) - spring.datasource.driver-class-name
- Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - spring.datasource.password
- Login password of the database. (String, default:
<none>
) - spring.datasource.url
- JDBC url of the database. (String, default:
<none>
) - spring.datasource.username
- Login username of the database. (String, default:
<none>
)
![[Note]](images/note.png) | Note |
---|
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply. |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
For integration tests to run, start a PostgreSQL database on localhost:
docker run -e POSTGRES_PASSWORD=spring -e POSTGRES_DB=test -p 5432:5432 -d postgres:latest
java -jar pgcopy-sink.jar --tableName=names --columns=name --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
--spring.datasource.url='jdbc:mysql://localhost:3306/test
This module sends messages to Redis store.
content-type: application/octet-stream
The redis sink has the following options:
- redis.key
- A literal key name to use when storing to a key. (String, default:
<none>
) - redis.key-expression
- A SpEL expression to use for storing to a key. (Expression, default:
<none>
) - redis.queue
- A literal queue name to use when storing in a queue. (String, default:
<none>
) - redis.queue-expression
- A SpEL expression to use for queue. (Expression, default:
<none>
) - redis.topic
- A literal topic name to use when publishing to a topic. (String, default:
<none>
) - redis.topic-expression
- A SpEL expression to use for topic. (Expression, default:
<none>
) - spring.redis.database
- Database index used by the connection factory. (Integer, default:
0
) - spring.redis.host
- Redis server host. (String, default:
localhost
) - spring.redis.jedis.pool.max-active
- Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - spring.redis.jedis.pool.max-idle
- Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - spring.redis.jedis.pool.max-wait
- Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - spring.redis.jedis.pool.min-idle
- Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if it is positive. (Integer, default:
0
) - spring.redis.lettuce.pool.max-active
- Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - spring.redis.lettuce.pool.max-idle
- Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - spring.redis.lettuce.pool.max-wait
- Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - spring.redis.lettuce.pool.min-idle
- Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if it is positive. (Integer, default:
0
) - spring.redis.password
- Login password of the redis server. (String, default:
<none>
) - spring.redis.port
- Redis server port. (Integer, default:
6379
) - spring.redis.sentinel.master
- Name of the Redis server. (String, default:
<none>
) - spring.redis.sentinel.nodes
- Comma-separated list of "host:port" pairs. (List<String>, default:
<none>
) - spring.redis.ssl
- Whether to enable SSL support. (Boolean, default:
false
) - spring.redis.timeout
- Connection timeout. (Duration, default:
<none>
) - spring.redis.url
- Connection URL. Overrides host, port, and password. User is ignored. Example: redis://user:[email protected]:6379 (String, default:
<none>
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one of the folders and build it:
$ ./mvnw clean package
java -jar redis-pubsub-sink.jar --redis.queue=
java -jar redis-pubsub-sink.jar --redis.queueExpression=
java -jar redis-pubsub-sink.jar --redis.key=
java -jar redis-pubsub-sink.jar --redis.keyExpression=
java -jar redis-pubsub-sink.jar --redis.topic=
java -jar redis-pubsub-sink.jar --redis.topicExpression=
This application routes messages to named channels.
The router sink has the following options:
- router.default-output-channel
- Where to send unroutable messages. (String, default:
nullChannel
) - router.destination-mappings
- Destination mappings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.expression
- The expression to be applied to the message to determine the channel(s) to route to. Note that the payload wire format for content types such as text, json or xml is byte[] not String!. Consult the documentation for how to handle byte array payload content. (Expression, default:
<none>
) - router.refresh-delay
- How often to check for script changes in ms (if present); < 0 means don't refresh. (Integer, default:
60000
) - router.resolution-required
- Whether or not channel resolution is required. (Boolean, default:
false
) - router.script
- The location of a groovy script that returns channels or channel mapping resolution keys. (Resource, default:
<none>
) - router.variables
- Variable bindings as a new line delimited string of name-value pairs, e.g. 'foo=bar\n baz=car'. (Properties, default:
<none>
) - router.variables-location
- The location of a properties file containing custom script variable bindings. (Resource, default:
<none>
)
![[Note]](images/note.png) | Note |
---|
Since this is a dynamic router, destinations are created as needed; therefore, by default the defaultOutputChannel
and resolutionRequired will only be used if the Binder has some problem binding to the destination. |
You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations
property.
By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of
destination names, only those will be bound.
Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel
, which
must also appear in the list.
destinationMappings
are used to map the evaluation results to an actual destination name.
4.15.5 SpEL-based Routing
The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.
For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring
Integration Reference manual
Configuring (Generic) Router section.
![[Note]](images/note.png) | Note |
---|
Starting with Spring Cloud Stream 2.0 onwards the message wire format for json , text and xml content types is byte[] not String !
This is an altering change from SCSt 1.x that treats those types as Strings!
Depends on the content type, different techniques for handling the byte[] payloads are available. For plain text
content types, one can covert the octet payload into string using the new String(payload) SpEL expression. For json
types the jsonPath() SpEL utility
already supports string and byte array content interchangeably. Same applies for the xml content type and the
#xpath() SpEL utility. |
For example for text
content type one should use:
new String(payload).contains('a')
and for json
content type SpEL expressions like this:
#jsonPath(payload, '$.person.name')
4.15.6 Groovy-based Routing
Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at
"file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'");
if (payload.contains('a')) {
return "foo"
}
else {
return "bar"
}
If you want to pass variable values to your script, you can statically bind values using the variables option or
optionally pass the path to a properties file containing the bindings using the propertiesLocation option.
All properties in the file will be made available to the script as variables. You may specify both variables and
propertiesLocation, in which case any duplicate values provided as variables override values provided in
propertiesLocation.
Note that payload and headers are implicitly bound to give you access to the data contained in a message.
For more information, see the Spring Integration Reference manual
Groovy Support.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar router-sink.jar --expression="new String(payload).contains('a')?':foo':':bar'"
java -jar router-sink.jar --script=" "
This sink app supports transfer files to the Amazon S3 bucket.
Files payloads (and directories recursively) are transferred to the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages accepted by this sink must contain payload
as:
File
, including directories for recursive upload;InputStream
;byte[]
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
java.io.File
java.io.InputStream
byte[]
String
The s3 sink has the following options:
- s3.acl
- S3 Object access control list. (CannedAccessControlList, default:
<none>
, possible values: private
,public-read
,public-read-write
,authenticated-read
,log-delivery-write
,bucket-owner-read
,bucket-owner-full-control
,aws-exec-read
) - s3.acl-expression
- Expression to evaluate S3 Object access control list. (Expression, default:
<none>
) - s3.bucket
- AWS bucket for target file(s) to store. (String, default:
<none>
) - s3.bucket-expression
- Expression to evaluate AWS bucket name. (Expression, default:
<none>
) - s3.key-expression
- Expression to evaluate S3 Object key. (Expression, default:
<none>
)
The target generated application based on the AmazonS3SinkConfiguration
can be enhanced with the S3MessageHandler.UploadMetadataProvider
and/or S3ProgressListener
, which are injected into S3MessageHandler
bean.
4.16.4 Amazon AWS common options
The Amazon S3 Sink (as all other Amazon AWS applications) is based on the
Spring Cloud AWS project as a foundation, and its auto-configuration
classes are used automatically by Spring Boot.
Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
- cloud.aws.credentials.accessKey
- cloud.aws.credentials.secretKey
- cloud.aws.credentials.instanceProfile
- cloud.aws.credentials.profileName
- cloud.aws.credentials.profilePath
Other are for AWS Region
definition:
- cloud.aws.region.auto
- cloud.aws.region.static
And for AWS Stack
:
- cloud.aws.stack.auto
- cloud.aws.stack.name
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar s3-sink.jar --s3.bucket=/tmp/bar
SFTP sink is a simple option to push files to an SFTP server from incoming messages.
It uses an sftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
![[Note]](images/note.png) | Note |
---|
By default Spring Integration will use o.s.i.file.DefaultFileNameGenerator if none is specified. DefaultFileNameGenerator will determine the file name
based on the value of the file_name header (if it exists) in the MessageHeaders , or if the payload of the Message is already a java.io.File , then it will
use the original name of that file. |
When configuring the sftp.factory.known-hosts-expression
option, the root object of the evaluation is the application context, an example might be sftp.factory.known-hosts-expression = @systemProperties['user.home'] + '/.ssh/known_hosts'
.
file_name
(See note above)
java.io.File
java.io.InputStream
byte[]
String
N/A (writes to the SFTP server).
The sftp sink has the following options:
- sftp.auto-create-dir
- Whether or not to create the remote directory. (Boolean, default:
true
) - sftp.factory.allow-unknown-keys
- True to allow an unknown or changed key. (Boolean, default:
false
) - sftp.factory.cache-sessions
- Cache sessions (Boolean, default:
<none>
) - sftp.factory.host
- The host name of the server. (String, default:
localhost
) - sftp.factory.known-hosts-expression
- A SpEL expression resolving to the location of the known hosts file. (Expression, default:
<none>
) - sftp.factory.pass-phrase
- Passphrase for user's private key. (String, default:
<empty string>
) - sftp.factory.password
- The password to use to connect to the server. (String, default:
<none>
) - sftp.factory.port
- The port of the server. (Integer, default:
22
) - sftp.factory.private-key
- Resource location of user's private key. (Resource, default:
<none>
) - sftp.factory.username
- The username to use to connect to the server. (String, default:
<none>
) - sftp.filename-expression
- A SpEL expression to generate the remote file name. (Expression, default:
<none>
) - sftp.mode
- Action to take if the remote file already exists. (FileExistsMode, default:
<none>
, possible values: APPEND
,APPEND_NO_FLUSH
,FAIL
,IGNORE
,REPLACE
,REPLACE_IF_MODIFIED
) - sftp.remote-dir
- The remote FTP directory. (String, default:
/
) - sftp.remote-file-separator
- The remote file separator. (String, default:
/
) - sftp.temporary-remote-dir
- A temporary directory where the file will be written if 'isUseTemporaryFilename()' is true. (String, default:
/
) - sftp.tmp-file-suffix
- The suffix to use while the transfer is in progress. (String, default:
.tmp
) - sftp.use-temporary-filename
- Whether or not to write to a temporary file and rename. (Boolean, default:
true
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar sftp_sink.jar --sftp.remote-dir=bar --sftp.factory.host=sftpserver \
--sftp.factory.username=user --sftp.factory.password=pw
This module writes messages to TCP using an Encoder.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are
available, the default being 'CRLF'.
Content-Type: application/octet-stream
The tcp sink has the following options:
- tcp.charset
- The charset used when converting from bytes to String. (String, default:
UTF-8
) - tcp.close
- Whether to close the socket after each message. (Boolean, default:
false
) - tcp.encoder
- The encoder to use when sending messages. (Encoding, default:
<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
) - tcp.host
- The host to which this sink will connect. (String, default:
<none>
) - tcp.nio
- Whether or not to use NIO. (Boolean, default:
false
) - tcp.port
- The port on which to listen; 0 for the OS to choose a port. (Integer, default:
1234
) - tcp.reverse-lookup
- Perform a reverse DNS lookup on the remote IP Address; if false,
just the IP address is included in the message headers. (Boolean, default:
false
) - tcp.socket-timeout
- The timeout (ms) before closing the socket when no data is received. (Integer, default:
120000
) - tcp.use-direct-buffers
- Whether or not to use direct buffers. (Boolean, default:
false
)
4.18.4 Available Encoders
Text Data
- CRLF (default)
- text terminated by carriage return (0x0d) followed by line feed (0x0a)
- LF
- text terminated by line feed (0x0a)
- NULL
- text terminated by a null byte (0x00)
- STXETX
- text preceded by an STX (0x02) and terminated by an ETX (0x03)
Text and Binary Data
- RAW
- no structure - the client indicates a complete message by closing the socket
- L1
- data preceded by a one byte (unsigned) length field (supports up to 255 bytes)
- L2
- data preceded by a two byte (unsigned) length field (up to 216-1 bytes)
- L4
- data preceded by a four byte (signed) length field (up to 231-1 bytes)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar tcp_sink.jar --tcp.encoder=LF
A simple handler that will count messages and log witnessed throughput at a selected interval.
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
java -jar throughput-sink.jar
A simple Websocket Sink implementation.
The following commmand line arguments are supported:
- websocket.log-level
- the logLevel for netty channels. Default is <tt>WARN</tt> (String, default:
<none>
) - websocket.path
- the path on which a WebsocketSink consumer needs to connect. Default is <tt>/websocket</tt> (String, default:
/websocket
) - websocket.port
- the port on which the Netty server listens. Default is <tt>9292</tt> (Integer, default:
9292
) - websocket.ssl
- whether or not to create a {@link io.netty.handler.ssl.SslContext} (Boolean, default:
false
) - websocket.threads
- the number of threads for the Netty {@link io.netty.channel.EventLoopGroup}. Default is <tt>1</tt> (Integer, default:
1
)
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here.
You can then cd into one one of the folders and build it:
$ ./mvnw clean package
To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the
following simple end-to-end setup.
Step 2: Deploy a time-source
Step 3: Deploy a websocket-sink
(the app that contains this starter jar)
Finally start a websocket-sink in trace
mode so that you see the messages produced by the time-source
in the log:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \
--logging.level.org.springframework.cloud.stream.module.websocket=TRACE
You should start seeing log messages in the console where you started the WebsocketSink like this:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}]
Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
There is an Endpoint
that you can use to access the last n
messages sent and received. You have to
enable it by providing --endpoints.websocketsinktrace.enabled=true
. By default it shows the last 100 messages via the
host:port/websocketsinktrace
. Here is a sample output:
[
{
"timestamp": 1445453703508,
"info": {
"type": "text",
"direction": "out",
"id": "2ff9be50-c9b2-724b-5404-1a6305c033e4",
"payload": "2015-10-21 20:54:33"
}
},
...
{
"timestamp": 1445453703506,
"info": {
"type": "text",
"direction": "out",
"id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75",
"payload": "2015-10-21 20:54:32"
}
}
]
There is also a simple HTML page where you see forwarded messages in a text area. You can access
it directly via host:port
in your browser
![[Note]](images/note.png) | Note |
---|
For SSL mode (--ssl=true ) a self signed certificate is used that might cause troubles with some
Websocket clients. In a future release, there will be a --certificate=mycert.cer switch to pass a valid (not
self-signed) certificate. |
4.21 TaskLauncher Data Flow Sink
This application launches a registered task application using the Data Flow Server REST API.
Launch request args including:
- the task name (required and registered as a task with the target Data Flow Server)
- deployment properties (key value pairs, optional).
- program arguments for the task (a list, optional).
Content-Type: application/json
A JSON document:
{
"name":"foo",
"deploymentProps": {"key1":"val1","key2":"val2"},
"args":["--debug", "--foo", "bar"]
}
minimally,
{"name":"foo"}
N/A (launches task to the local system).
The tasklauncher-dataflow sink supports the following configuration properties:
- spring.cloud.dataflow.client.authentication.basic.password
- The login password. (String, default:
<none>
) - spring.cloud.dataflow.client.authentication.basic.username
- The login username. (String, default:
<none>
) - spring.cloud.dataflow.client.enable-dsl
- Enable Data Flow DSL access. (Boolean, default:
false
) - spring.cloud.dataflow.client.server-uri
- The Data Flow server URI. (String, default:
http://localhost:9393
) - spring.cloud.dataflow.client.skip-ssl-validation
- Skip Ssl validation. (Boolean, default:
true
) - trigger.initial-delay
- The initial delay in milliseconds. (Integer, default:
1000
) - trigger.max-period
- The maximum polling period in milliseconds. Will be set to period if period > maxPeriod. (Integer, default:
30000
) - trigger.period
- The polling period in milliseconds. (Integer, default:
1000
)
4.21.3 Using the TaskLauncher
A tasklauncher is a sink that consumes LaunchRequest
messages, as described above, and launches a task using the
configured Spring Cloud Data Flow server (given by --dataflow.uri
). The task launcher periodically polls its input
for launch requests but will pause when the SCDF server’s concurrent task execution limit given by spring.cloud
.dataflow.task
.maximum-concurrent-tasks
is reached (see the
reference docs for more details).
When the number of running tasks drops below this limit message polling resumes. This is intended to prevent
the SCDF deployer’s deployment platform from running out of resources under heavy task load. The poller is
scheduled using a DynamicPeriodicTrigger
. By default the polling rate is 1 second, but may be
configured to any duration. When paused, or if there are no launch requests, the trigger period will increase, applying
exponential backoff, up to a configured maximum (30 seconds by default).
![[Note]](images/note.png) | Note |
---|
When the poller is paused it puts pressure
on the message broker so some tuning will be necessary in extreme cases to balance resource utilization. |
$ ./mvnw clean install -PgenerateApps
$ cd apps
You can find the corresponding binder based projects here. You can then cd into one one of the folders and
build it:
$ ./mvnw clean package
Register a task app and create a task, the
timestamp sample
provides a simple demonstration.
dataflow:>app register --name timestamp --type task --uri ...
dataflow:>stream create http | task-launcher-dataflow-sink --deploy
Send a launch request,
$curl http://localhost:<port> -H"Content-Type:application/json" -d '{"name":"timestamp"}'
dataflow:>task execution list
╔═════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
║Task Name│ID│ Start Time │ End Time │Exit Code║
╠═════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
║timestamp│1 │Fri Aug 10 08:48:05 EDT 2018│Fri Aug 10 08:48:05 EDT 2018│0 ║
╚═════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝