The aggregate counter differs from a simple counter in that it not only keeps a total value for the count, but also retains the total count values for each minute, hour day and month of the period for which it is run. The data can then be queried by supplying a start and end date and the resolution at which the data should be returned.
The aggregate-counter sink has the following options:
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
)<none>
)<none>
)<none>
)<none>
)0
)localhost
)<none>
)6379
)0
)This sink application writes the content of each message it receives into Cassandra.
The cassandra sink has the following options:
<none>
, possible values: NONE
,SNAPPY
)<none>
)false
)[]
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
, possible values: NONE
,CREATE
,RECREATE
,RECREATE_DROP_UNUSED
)<none>
)<none>
, possible values: ANY
,ONE
,TWO
,THREE
,QUOROM
,LOCAL_QUOROM
,EACH_QUOROM
,ALL
,LOCAL_ONE
,SERIAL
,LOCAL_SERIAL
)<none>
)<none>
)<none>
, possible values: DEFAULT
,DOWNGRADING_CONSISTENCY
,FALLTHROUGH
,LOGGING
)<none>
)0
)The counter sink simply counts the number of messages it receives, optionally storing counts in a separate store such as redis.
The counter sink has the following options:
<none>
)<none>
)0
)localhost
)<none>
)6379
)0
)A field value counter is a Metric used for counting occurrences of unique values for a named field in a message payload. This sinks supports the following payload types out of the box:
For example suppose a message source produces a payload with a field named user :
class Foo { String user; public Foo(String user) { this.user = user; } }
If the stream source produces messages with the following objects:
new Foo("fred") new Foo("sue") new Foo("dave") new Foo("sue")
The field value counter on the field user will contain:
fred:1, sue:2, dave:1
Multi-value fields are also supported. For example, if a field contains a list, each value will be counted once:
users:["dave","fred","sue"] users:["sue","jon"]
The field value counter on the field users will contain:
dave:1, fred:1, sue:2, jon:1
The field-value-counter sink has the following options:
<none>
)<none>
)<none>
)0
)localhost
)<none>
)6379
)0
)This module writes each message it receives to a file.
The file sink has the following options:
false
)UTF-8
)<none>
)<none>
)<none>
, possible values: APPEND
,FAIL
,IGNORE
,REPLACE
)file-sink
)<none>
)<empty string>
)FTP sink is a simple option to push files to an FTP server from incoming messages.
It uses an ftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
Note | |
---|---|
By default Spring Integration will use |
The ftp sink has the following options:
<none>
)<none>
)<none>
, possible values: ACTIVE
,PASSIVE
)<none>
)<none>
)21
)<none>
)<none>
)<none>
, possible values: APPEND
,FAIL
,IGNORE
,REPLACE
)<none>
)<none>
)<none>
)<none>
)<none>
)The Gemfire sink allows one to write message payloads to a Gemfire server.
The gemfire sink has the following options:
false
)<none>
)<none>
, possible values: locator
,server
)<none>
)false
)<none>
)A sink module that route messages into GPDB/HAWQ
segments via
gpfdist protocol. Internally, this sink creates a custom http listener that supports
the gpfdist
protcol and schedules a task that orchestrates a gploadd
session in the
same way it is done natively in Greenplum.
No data is written into temporary files and all data is kept in stream buffers waiting to get inserted into Greenplum DB or HAWQ. If there are no existing load sessions from Greenplum, the sink will block until such sessions are established.
The gpfdist sink has the following options:
100
)10
)4
)<none>
)<none>
)localhost
)gpadmin
)gpadmin
)5432
)gpadmin
)
)<none>
)100
)2
)0
)<none>
)<none>
)<none>
)0
)<none>
)<none>
, possible values: ROWS
,PERCENT
)<none>
)<none>
)<none>
)<none>
)false
)<none>
)<none>
)false
)<none>
)Within a gpfdist
sink we have a Reactor based stream where data is published from the incoming SI channel.
This channel receives data from the Message Bus. The Reactor stream is then connected to Netty
based
http channel adapters so that when a new http connection is established, the Reactor stream is flushed and balanced among
existing http clients. When Greenplum
does a load from an external table, each segment will initiate
a http connection and start loading data. The net effect is that incoming data is automatically spread
among the Greenplum segments.
The gpfdist sink supports the following configuration properties:
Database table to work with. (String, default: ``, required)
This option denotes a table where data will be inserted or updated. Also external table structure will be derived from structure of this table.
Currently table
is only way to define a structure of an external
table. Effectively it will replace other_table
in below clause
segment.
CREATE READABLE EXTERNAL TABLE table_name LIKE other_table
Gpfdist mode, either `insert` or `update`. (String, default: insert
)
Currently only insert
and update
gpfdist mode is supported. Mode
merge
familiar from a native gpfdist loader is not yet supported.
For mode update
options matchColumns
and updateColumns
are
required.
Data record column delimiter. (Character, default: ``)
Defines used delimiter
character in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[DELIMITER AS 'delimiter']
Error reject limit. (String, default: ``)
Defines a count
value in a below clause segment.
[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
As a conveniance this reject limit also recognizes a percentage format
2%
and if used, segmentRejectType
is automatically set to
percent
.
Error reject type, either `rows` or `percent`. (String, default: ``)
Defines ROWS
or PERCENT
in below clause segment.
[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
Tablename to log errors. (String, default: ``)
As error table is optional with SEGMENT REJECT LIMIT
, it’s only used
if both segmentRejectLimit
and segmentRejectType
are set. Sets
error_table
in below clause segment.
[ [LOG ERRORS INTO error_table] SEGMENT REJECT LIMIT count [ROWS | PERCENT] ]
Null string definition. (String, default: ``)
Defines used null string
in below clause segment which would
be part of a FORMAT 'TEXT'
or FORMAT 'CSV'
sections.
[NULL AS 'null string']
Data record delimiter for incoming messages. (String, default: \n
)
On default a delimiter in this option will be added as a postfix to every message sent into this sink. Currently NEWLINE is not a supported config option and line termination for data is coming from a default functionality.
If not specified, a Greenplum Database segment will detect the newline type by looking at the first row of data it receives and using the first newline type encountered. | ||
-- External Table Docs |
Comma delimited list of columns to match. (String, default: ``)
Note | |
---|---|
See more from examples below. |
Comma delimited list of columns to update. (String, default: ``)
Note | |
---|---|
See more from examples below. |
Debug rate of data transfer. (Integer, default: 0
)
If set to non zero, sink will log a rate of messages passing throught
a sink after number of messages denoted by this setting has been
processed. Value 0
means that this rate calculation and logging is
disabled.
Max collected size per windowed data. (Integer, default: 100
)
Note | |
---|---|
For more info on flush and batch settings, see above. |
There are few important concepts involving how data passes into a sink, through it and finally lands into a database.
flushTime
) or timeouts(flushTime
) if window doesn’t get full.
One window is then ready to get send into a segment.batchCount
and completes a stream if it got enough batches or if
batchTimeout
occurred due to inactivity.batchPeriod
is then used as a sleep time in
between these load sessions.Lets take a closer look how options flushCount
, flushTime
,
batchCount
, batchTimeout
and batchPeriod
work.
As in a highest level where incoming data into a sink is windowed,
flushCount
and flushTime
controls when a batch of messages are
sent into a downstream. If there are a lot of simultaneous segment
connections, flushing less will keep more segments inactive as there
is more demand for batches than what flushing will produce.
When existing segment connection is active and it has subscribed
itself with a stream of batches, data will keep flowing until either
batchCount
is met or batchTimeout
occurs due to inactivity of data
from an upstream. Higher a batchCount
is more data each segment
will read. Higher a batchTimeout
is more time segment will wait in
case there is more data to come.
As gpfdist load operations are done in a loop, batchPeriod
simply
controls not to run things in a buzy loop. Buzy loop would be ok if
there is a constant stream of data coming in but if incoming data is
more like bursts then buzy loop would be unnecessary.
Note | |
---|---|
Data loaded via gpfdist will not become visible in a database until whole distributed loading session have finished successfully. |
Reactor is also handling backpressure meaning if existing load operations will not produce enought demand for data, eventually message passing into a sink will block. This happens when Reactor’s internal ring buffer(size of 32 items) gets full. Flow of data through sink really happens when data is pulled from it by segments.
In this first example we’re just creating a simple stream which
inserts data from a time
source. Let’s create a table with two
text columns.
gpadmin=# create table ticktock (date text, time text);
Create a simple stream gpstream
.
dataflow:>stream create --name gpstream1 --definition "time | gpfdist --dbHost=mdw --table=ticktock --batchTime=1 --batchPeriod=1 --flushCount=2 --flushTime=2 --columnDelimiter=' '" --deploy
Let it run and see results from a database.
gpadmin=# select count(*) from ticktock; count ------- 14 (1 row)
In previous example we did a simple inserts into a table. Let’s see how we can update data in a table. Create a simple table httpdata with three text columns and insert some data.
gpadmin=# create table httpdata (col1 text, col2 text, col3 text); gpadmin=# insert into httpdata values ('DATA1', 'DATA', 'DATA'); gpadmin=# insert into httpdata values ('DATA2', 'DATA', 'DATA'); gpadmin=# insert into httpdata values ('DATA3', 'DATA', 'DATA');
Now table looks like this.
gpadmin=# select * from httpdata; col1 | col2 | col3 -------+------+------ DATA3 | DATA | DATA DATA2 | DATA | DATA DATA1 | DATA | DATA (3 rows)
Let’s create a stream which will update table httpdata by matching a column col1 and updates columns col2 and col3.
dataflow:>stream create --name gpfdiststream2 --definition "http --server.port=8081|gpfdist --mode=update --table=httpdata --dbHost=mdw --columnDelimiter=',' --matchColumns=col1 --updateColumns=col2,col3" --deploy
Post some data into a stream which will be passed into a gpfdist sink via http source.
curl --data "DATA1,DATA1,DATA1" -H "Content-Type:text/plain" http://localhost:8081/
If you query table again, you’ll see that row for DATA1 has been updated.
gpadmin=# select * from httpdata; col1 | col2 | col3 -------+-------+------- DATA3 | DATA | DATA DATA2 | DATA | DATA DATA1 | DATA1 | DATA1 (3 rows)
Default values for options flushCount
, flushTime
, batchCount
,
batchTimeout
and batchPeriod
are relatively conservative and needs
to be tuned for every use case for optimal performance. Order to make
a decision on how to tune sink behaviour to suit your needs few things
needs to be considered.
Everything what flows throught a sink is kept in-memory and because sink is handling backpressure, memory consumption is relatively low. However because sink cannot predict what is an average size of an incoming data and this data is anyway windowed later in a downstream you should not allow window size to become too large if average data size is large as every batch of data is kept in memory.
Generally speaking if you have a lot of segments in a load operation, it’s adviced to keep flushed window size relatively small which allows more segments to stay active. This however also depends on how much data is flowing in into a sink itself.
Longer a load session for each segment is active higher the overall
transfer rate is going to be. Option batchCount
naturally controls
this. However option batchTimeout
then really controls how fast each
segment will complete a stream due to inactivity from upstream and to
step away from a loading session to allow distributes session to
finish and data become visible in a database.
This module writes each message it receives to HDFS.
The hdfs sink has the following options:
0
)<none>
)<none>
)false
)txt
)<none>
)10
)false
)0
)<none>
)0
)<none>
)<none>
)false
)<none>
)1000000000
)Note | |
---|---|
This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one. |
This module writes each message it receives to HDFS as part of a Kite SDK Dataset.
The hdfs-dataset sink has the following options:
false
)10000
)snappy
)/tmp/hdfs-dataset-sink
)avro
)<none>
)-1
)<none>
)<none>
)-1
)Note | |
---|---|
This module can have it’s runtime dependencies provided during startup if you would like to use a Hadoop distribution other than the default one. |
A module that writes its incoming payload to an RDBMS using JDBC.
The jdbc sink has the following options:
<none>
)false
)<none>
)<none>
)<none>
)true
)<none>
)<none>
)<none>
)Note | |
---|---|
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like |
The log
sink uses the application logger to output the data for inspection.
The log sink has the following options:
payload
)<none>
, possible values: FATAL
,ERROR
,WARN
,INFO
,DEBUG
,TRACE
)<none>
)This module sends messages to RabbitMQ.
The rabbit sink has the following options:
(See the Spring Boot documentation for RabbitMQ connection properties)
<none>
)<empty string>
)<none>
)[*]
)false
)<none>
)<none>
)<none>
)localhost
)<none>
)5672
)<none>
)<none>
)<none>
)Note | |
---|---|
By default, the message converter is a |
This module sends messages to Redis store.
The redis sink has the following options:
<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)0
)localhost
)<none>
)8
)8
)-1
)0
)6379
)<none>
)<none>
)0
)This module routes messages to named channels.
The router sink has the following options:
nullChannel
)<none>
)<none>
)60000
)false
)<none>
)<none>
)<none>
)Note | |
---|---|
Since this is a dynamic router, destinations are created as needed; therefore, by default the |
You can restrict the creation of dynamic bindings using the spring.cloud.stream.dynamicDestinations
property.
By default, all resolved destinations will be bound dynamically; if this property has a comma-delimited list of
destination names, only those will be bound.
Messages that resolve to a destination that is not in this list will be routed to the defaultOutputChannel
, which
must also appear in the list.
destinationMappings
are used to map the evaluation results to an actual destination name.
The expression evaluates against the message and returns either a channel name, or the key to a map of channel names.
For more information, please see the "Routers and the Spring Expression Language (SpEL)" subsection in the Spring Integration Reference manual Configuring (Generic) Router section.
Instead of SpEL expressions, Groovy scripts can also be used. Let’s create a Groovy script in the file system at "file:/my/path/router.groovy", or "classpath:/my/path/router.groovy" :
println("Groovy processing payload '" + payload + "'"); if (payload.contains('a')) { return "foo" } else { return "bar" }
If you want to pass variable values to your script, you can statically bind values using the variables option or optionally pass the path to a properties file containing the bindings using the propertiesLocation option. All properties in the file will be made available to the script as variables. You may specify both variables and propertiesLocation, in which case any duplicate values provided as variables override values provided in propertiesLocation. Note that payload and headers are implicitly bound to give you access to the data contained in a message.
For more information, see the Spring Integration Reference manual Groovy Support.
This sink app supports transfer files to the Amazon S3 bucket.
Files payloads (and directories recursively) are transferred to the remote
directory (S3 bucket) to the local
directory where the application is deployed.
Messages accepted by this sink must contain payload
as:
File
, including directories for recursive upload;InputStream
;byte[]
When using --mode=lines
, you can also provide the additional option --withMarkers=true
.
If set to true
, the underlying FileSplitter
will emit additional start-of-file and end-of-file marker messages before and after the actual data.
The payload of these 2 additional marker messages is of type FileSplitter.FileMarker
. The option withMarkers
defaults to false
if not explicitly set.
The s3 sink has the following options:
File.getName()
if any)The target generated application based on the AmazonS3SinkConfiguration
can be enhanced with the S3MessageHandler.UploadMetadataProvider
and/or S3ProgressListener
, which are injected into S3MessageHandler
bean.
The Amazon S3 Sink (as all other Amazon AWS applications) is based on the Spring Cloud AWS project as a foundation, and its auto-configuration classes are used automatically by Spring Boot. Consult their documentation regarding required and useful auto-configuration properties.
Some of them are about AWS credentials:
Other are for AWS Region
definition:
And for AWS Stack
:
SFTP sink is a simple option to push files to an SFTP server from incoming messages.
It uses an sftp-outbound-adapter
, therefore incoming messages can be either a java.io.File
object, a String
(content of the file)
or an array of bytes
(file content as well).
To use this sink, you need a username and a password to login.
Note | |
---|---|
By default Spring Integration will use |
The sftp sink has the following options:
<none>
)false
)<none>
)<none>
)<none>
)<empty string>
)<none>
)22
)<empty string>
)<none>
)<none>
)<none>
, possible values: APPEND
,FAIL
,IGNORE
,REPLACE
)<none>
)<none>
)<none>
)<none>
)<none>
)This module writes messages to TCP using an Encoder.
TCP is a streaming protocol and some mechanism is needed to frame messages on the wire. A number of encoders are available, the default being 'CRLF'.
The tcp sink has the following options:
UTF-8
)false
)<none>
, possible values: CRLF
,LF
,NULL
,STXETX
,RAW
,L1
,L2
,L4
)<none>
)<none>
)<none>
)<none>
)<none>
)<none>
)Text Data
Text and Binary Data
A simple handler that will count messages and log witnessed throughput at a selected interval.
A simple Websocket Sink implementation.
The following commmand line arguments are supported:
<none>
)/websocket
)9292
)false
)1
)To verify that the websocket-sink receives messages from other spring-cloud-stream apps, you can use the following simple end-to-end setup.
The default broker that is used is Redis. Normally can start Redis via redis-server
.
Finally start a websocket-sink in trace
mode so that you see the messages produced by the time-source
in the log:
java -jar <spring boot application for websocket-sink> --spring.cloud.stream.bindings.input=ticktock --server.port=9393 \ --logging.level.org.springframework.cloud.stream.module.websocket=TRACE
You should start seeing log messages in the console where you started the WebsocketSink like this:
Handling message: GenericMessage [payload=2015-10-21 12:52:53, headers={id=09ae31e0-a04e-b811-d211-b4d4e75b6f29, timestamp=1445424778065}] Handling message: GenericMessage [payload=2015-10-21 12:52:54, headers={id=75eaaf30-e5c6-494f-b007-9d5b5b920001, timestamp=1445424778065}] Handling message: GenericMessage [payload=2015-10-21 12:52:55, headers={id=18b887db-81fc-c634-7a9a-16b1c72de291, timestamp=1445424778066}]
There is an Endpoint
that you can use to access the last n
messages sent and received. You have to
enable it by providing --endpoints.websocketsinktrace.enabled=true
. By default it shows the last 100 messages via the
host:port/websocketsinktrace
. Here is a sample output:
[ { "timestamp": 1445453703508, "info": { "type": "text", "direction": "out", "id": "2ff9be50-c9b2-724b-5404-1a6305c033e4", "payload": "2015-10-21 20:54:33" } }, ... { "timestamp": 1445453703506, "info": { "type": "text", "direction": "out", "id": "2b9dbcaf-c808-084d-a51b-50f617ae6a75", "payload": "2015-10-21 20:54:32" } } ]
There is also a simple HTML page where you see forwarded messages in a text area. You can access
it directly via host:port
in your browser
Note | |
---|---|
For SSL mode ( |