A stream SQL job is a continuous process which can listen to incoming events and update/create new entities in real-time.
For example listening to Swap events and creating a totalVolumeUSD per Pool for your AMM. Or create hourly/daily time-series data.
Example of a stream job to create hourly total volume buckets per Pool
Add new folder in your project for example under src/aggregations/pool-volume-buckets
Create a new SQL file named streaming.sql with a content like:
-- This instructs Flink that this is a real-time continuous job
-- that must always keep running.
SET 'execution.runtime-mode' = 'STREAMING';
-- Here we're creating a virtual table against a stream of incoming Swap events.
-- Note that that these fields are whatever you upsert in your processors.
-- Visit "examples" repo to see how each field like amountUSD is added.
CREATE TABLE swaps_stream (
`entityId` STRING,
`protocol` STRING,
`chainId` BIGINT,
`poolId` STRING,
`amount0` STRING,
`amount1` STRING,
`amount0USD` FLOAT,
`amount1USD` FLOAT,
`amountUSD` FLOAT,
`hourBucket` STRING,
`dayBucket` STRING,
`weekBucket` STRING,
`monthBucket` STRING,
`blockTimestamp` BIGINT,
`timestamp` BIGINT,
-- Field below tells Flink how to order incoming events based on timestamp
-- which in normal cases is derived from blockTimestamp.
`ts` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTES
) WITH (
'connector' = 'stream',
'mode' = 'append',
-- Namespace is automatically interpolated based on your cluster manifest.yml
'namespace' = '{{ namespace }}',
-- The entityType is same as what you have provided in your processor "upsert"
'entity-type' = 'Swap'
);
-- Here we create a destination sink towards your namespace on a different entityType
CREATE TABLE pool_stats_sink (
`entityId` STRING,
`protocol` STRING,
`chainId` BIGINT,
`poolId` STRING,
`granularity` STRING,
`timeBucket` STRING,
`bucketTimestamp` BIGINT,
`volumeUSD` FLOAT,
`volumeToken0` STRING,
`volumeToken1` STRING,
`volumeToken0USD` FLOAT,
`volumeToken1USD` FLOAT,
PRIMARY KEY (`entityId`) NOT ENFORCED
) PARTITIONED BY (`entityId`) WITH (
'connector' = 'database',
'mode' = 'write',
'namespace' = '{{ namespace }}',
'entity-type' = 'PoolStat'
);
-- Here we create a temporary inmemory view table that is basically an aggregated
-- table grouped by "Pool" and SUMs the USD-values and BigNumber-values
CREATE VIEW swap_volumes_buckets AS
SELECT
`poolId` || ':' || `hourBucket` as `entityId`,
`protocol`,
`chainId`,
`poolId`,
'hour' as `granularity`,
`hourBucket` as `timeBucket`,
MAX(`blockTimestamp`) as `bucketTimestamp`,
COALESCE(SUM(`amountUSD`), 0) as `volumeUSD`,
-- SUM_BN() is a custom function already imported in all Flink deployments
-- usedful for Web3-context where we have many values with 18 decimal
-- like "wei" value of ERC20 transfers:
COALESCE(SUM_BN(`amount0`), '0') as `volumeToken0`,
COALESCE(SUM_BN(`amount1`), '0') as `volumeToken1`,
COALESCE(SUM(`amount0USD`), 0) as `volumeToken0USD`,
COALESCE(SUM(`amount1USD`), 0) as `volumeToken1USD`
FROM
swaps_stream
WHERE
-- This condition helps Flink to ignore older data to keep active memory usage low.
-- It is mainly needed for "streaming" jobs where it is suppose to run and long-duration of time.
`ts` >= CAST(FLOOR((CURRENT_TIMESTAMP - INTERVAL '2' HOUR) TO HOUR) AS TIMESTAMP(3))
GROUP BY
`protocol`,
`chainId`,
`poolId`,
`hourBucket`
;
-- Finally we would "upsert" the data into our final destination entity-type.
-- Remember that due to how Flink works order of fields here must be
-- exact same as table definition above.
INSERT INTO
pool_stats_sink
SELECT
-- Each upsert must have at least "entityId" otherwise the connector does not
-- know what ID to use.
-- In this case we are creating an entityId based on Pool address + hourBucket
-- which is defined in the above definition of swap_volumes_buckets view.
`entityId`,
`protocol`,
`chainId`,
`poolId`,
`granularity`,
`timeBucket`,
`bucketTimestamp`,
`volumeUSD`,
`volumeToken0`,
`volumeToken1`,
`volumeToken0USD`,
`volumeToken1USD`
FROM
swap_volumes_buckets;
Now this job will upsert new entities in your namespace. Remember, to sync this data (or any other processed entity) to your own database (e.g. Postgres) the flow is exactly same as what is described in Database docs.
Now we need to add this aggregation definition in the manifest.yml so when you deploy the cluster this job starts working:
Now you can view your running Flink job in the native UI to see the stats and number of processed entities:
Batch Jobs
When dealing large historical data you can run a batch job to prepopulate historical data.
For example if you want to generate all the hourly time-series data for all past Pool data for your AMM.
Example of batch job to backfill historical hourly volume buckets per Pool
Add new folder in your project (if not done yet) for example under src/aggregations/pool-volume-buckets
Create a new SQL file named batch.sql with a content like:
-- This instructs Flink that this is a batch job that must run only
-- when triggered explicitly by user.
SET 'execution.runtime-mode' = 'BATCH';
-- Similar to "streaming" example above we need to create a virtual table for swaps.
-- Here we're creating a virtual table against existing database of Swap events.
-- Note that that these fields are whatever you upsert in your processors.
-- Visit "examples" repo to see how each field like amountUSD is added.
CREATE TABLE swaps_store (
`entityId` STRING,
`protocol` STRING,
`chainId` BIGINT,
`poolId` STRING,
`amount0` STRING,
`amount1` STRING,
`amount0USD` FLOAT,
`amount1USD` FLOAT,
`amountUSD` FLOAT,
`hourBucket` STRING,
`dayBucket` STRING,
`weekBucket` STRING,
`monthBucket` STRING,
`blockTimestamp` BIGINT,
PRIMARY KEY (`entityId`) NOT ENFORCED
) PARTITIONED BY (`entityId`) WITH (
-- As oppose to stream example above, we need to read all the
-- existing data from database (vs incoming stream)
'connector' = 'database',
'mode' = 'read',
'namespace' = '{{ namespace }}',
'entity-type' = 'Swap',
-- OPTIONALLY for large amounts of data (e.g. 100m swaps)
-- you can instruct Flink to partition in smaller batches
-- which means job won't fail due to memory constraints.
--
-- 'scan.partition.num' = '1000',
-- 'scan.partition.column' = 'blockTimestamp',
-- 'scan.partition.lower-bound' = '1577917612',
-- 'scan.partition.upper-bound' = '1702247157'
);
-- Exactly same as "stremaing" example above we need to create
-- a virtual table for the destination entityType.
-- Here we create a temporary inmemory view table that is basically an aggregated
-- table grouped by "Pool" and SUMs the USD-values and BigNumber-values
CREATE TABLE pool_stats_sink (
`entityId` STRING,
`protocol` STRING,
`chainId` BIGINT,
`poolId` STRING,
`granularity` STRING,
`timeBucket` STRING,
`bucketTimestamp` BIGINT,
`volumeUSD` FLOAT,
`volumeToken0` STRING,
`volumeToken1` STRING,
`volumeToken0USD` FLOAT,
`volumeToken1USD` FLOAT,
PRIMARY KEY (`entityId`) NOT ENFORCED
) PARTITIONED BY (`entityId`) WITH (
'connector' = 'database',
'mode' = 'write',
'namespace' = '{{ namespace }}',
'entity-type' = 'PoolStat'
);
-- Here we create a temporary in-memory view table that is basically an aggregated
-- table grouped by "Pool" and SUMs the USD-values and BigNumber-values
CREATE VIEW swap_volumes_buckets AS
SELECT
`poolId` || ':' || `hourBucket` as `entityId`,
`protocol`,
`chainId`,
`poolId`,
'hour' as `granularity`,
`hourBucket` as `timeBucket`,
MAX(`blockTimestamp`) as `bucketTimestamp`,
COALESCE(SUM(`amountUSD`), 0) as `volumeUSD`,
-- SUM_BN() is a custom function already imported in all Flink deployments
-- usedful for Web3-context where we have many values with 18 decimal
-- like "wei" value of ERC20 transfers:
COALESCE(SUM_BN(`amount0`), '0') as `volumeToken0`,
COALESCE(SUM_BN(`amount1`), '0') as `volumeToken1`,
COALESCE(SUM(`amount0USD`), 0) as `volumeToken0USD`,
COALESCE(SUM(`amount1USD`), 0) as `volumeToken1USD`
FROM
swaps_store
GROUP BY
`protocol`,
`chainId`,
`poolId`,
`hourBucket`
;
-- Finally we would "upsert" the data into our final destination entity-type.
-- Remember that due to how Flink works order of fields here must be
-- exact same as table definition above.
INSERT INTO
pool_stats_sink
SELECT
-- Each upsert must have at least "entityId" otherwise the connector does not
-- know what ID to use.
-- In this case we are creating an entityId based on Pool address + hourBucket
-- which is defined in the above definition of swap_volumes_buckets view.
`entityId`,
`protocol`,
`chainId`,
`poolId`,
`granularity`,
`timeBucket`,
`bucketTimestamp`,
`volumeUSD`,
`volumeToken0`,
`volumeToken1`,
`volumeToken0USD`,
`volumeToken1USD`
FROM
swap_volumes_buckets;
Now we need to add this aggregation definition in the manifest.yml so when you deploy the cluster this job starts working:
Manually trigger the batch job, this might take few minutes to provision the required resources and runners:
flair enricher trigger pool-volume-buckets-batch
Now this job will upsert all the historical entities in your namespace. Remember, to sync this data to your own database (e.g. Postgres) the flow is described in Database docs.
Example of a real-time streaming SQL job to SUM total volume per hourly, daily, weekly buckets.
You can see the native Flink UI by following the link in the status, or see any potential SQL Syntax issues by running the logs command, or even cancel the batch job if needed.