🔋Database

Deliver all your indexed data to your own backend database in real-time

💡 Concepts

Namespaces

A namespace groups many entities together, you can think of a namespace as a "database instance".

You can create one or more namespaces (for versioning purposes, or separating dev and prod environments, etc) for your project. We recommend having a prod namespace like sushiswap and a development namespace such as sushiswap-dev.

As described in Getting Started guide, you will define your own namespace in the manifest.yml.

Writing to the namespace

Refer to Global Variables > database section to see how to write data to or read from your namespace from custom processor scripts.

You are not required to define schemas in Flair, all your entities are stored in a schemaless database. This means simply just "upsert" a new or existing entity.

👩‍💻 Sync to your database

Processors use the database integration to store entities in your namespace (e.g. sushiswap). You can define any destination for your data using Flair's managed Apache Flink engine.

On high-level database syncing involves creating a table (e.g Postgres, MySQL, MongoDB, etc.), then defining a SQL INSERT statement. You will define this for both historical and real-time syncing as describe blow.

1️⃣ Create your database and add credentials

Note: We are going to use "Swap" as a sample entity in this guide.

1.A) Create your destination table

Depending on the type of database you're using, create the table (or collection) for the entities you need to sync. For example for a MySQL you can execute the following SQL query in your database to create a table to store the "Swap" entity data:

-- ### Swap Table ###
CREATE TABLE my_swaps (
    `entityId` VARCHAR(255),
    `entityUpdatedAt` BIGINT,
    `chainId` INT,
    `contractAddress` VARCHAR(255),
    `horizon` VARCHAR(255),
    `blockNumber` BIGINT,
    `blockTimestamp` BIGINT,
    `forkIndex` INT,
    `transactionIndex` INT,
    `logIndex` INT,
    `localIndex` INT,
    `txHash` VARCHAR(255),
    `txTo` VARCHAR(255),
    `txFrom` VARCHAR(255),
    `assetId` VARCHAR(255),
    `resolver` VARCHAR(255),
    PRIMARY KEY (`entityId`),
    INDEX idx_chainId (`chainId`), -- add as many as needed indexes as necessary
);

Note: the above query is for creating a table in MySQL. Remember that depending on your type of DB, you might need to use a slightly different query.

1.B) Setting secrets for sensitive DB credentials using flair-cli

flair secret set -n mysql.password -v 'PASSWORD'

This secret will be used in next steps when defining your sync SQLs.

2️⃣ Enable real-time sync

For real-time syncing, create a Flink SQL script (e.g realtime-sync.sql). This will push incoming namespace data to the destination DB, in real-time.

Note: A good practice is to have all real-time syncs in one file, but separate historical syncs into separate files as usually they take longer to sync.

CREATE TABLE source_AMMBundleClosed (
    `entityId` STRING,
    `entityUpdatedAt` BIGINT,
    `chainId` INT,
    `contractAddress` STRING,
    `horizon` STRING,
    `blockNumber` BIGINT,
    `blockTimestamp` BIGINT,
    `forkIndex` INT,
    `transactionIndex` INT,
    `logIndex` INT,
    `localIndex` INT,
    `txHash` STRING,
    `txTo` STRING,
    `txFrom` STRING,
    `assetId` STRING,
    `removed` BOOLEAN,
    `resolver` STRING,
    PRIMARY KEY (`entityId`) NOT ENFORCED
) WITH (
    'connector' = 'stream',
    'mode' = 'cdc',
    'namespace' = '{{ namespace }}',
    'entity-type' = 'AMMBundleClosed',
    'scan.startup.mode' = 'timestamp',
    'scan.startup.timestamp-millis' = '{{ chrono("2 hours ago") * 1000 }}'
);

CREATE TABLE sink_AMMBundleClosed (
    `entityId` STRING,
    `entityUpdatedAt` BIGINT,
    `chainId` INT,
    `contractAddress` STRING,
    `horizon` STRING,
    `blockNumber` BIGINT,
    `blockTimestamp` BIGINT,
    `forkIndex` INT,
    `transactionIndex` INT,
    `logIndex` INT,
    `localIndex` INT,
    `txHash` STRING,
    `txTo` STRING,
    `txFrom` STRING,
    `assetId` STRING,
    `removed` BOOLEAN,
    `resolver` STRING,
    PRIMARY KEY (`entityId`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://HOST/DB',
    'table-name' = 'AMMBundleClosed',
    'username' = 'USERNAME',
    'password' = '{{ secret("mysql.password") }}',
    'sink.max-retries' = '10',
    'sink.buffer-flush.interval' = '60s'
);

INSERT INTO sink_AMMBundleClosed SELECT * FROM source_AMMBundleClosed WHERE entityId IS NOT NULL;

After this, make sure you have the real-time enricher setup in manifest.yaml:

- id: database-streaming-sync
    engine: flink
    parallelism: 4
    inputSql: ./src/database/streaming.sql

After this, running a simple pnpm deploy:prod should setup the real-time syncing.

Note: A good practice is to enable "real-time syncing" enabled before "historical syncing". This ensures that no data is missed.

3️⃣ Historical data sync (one-off)

For one-off historical syncing for already indexed data, define a SQL script (e.g batch.swap.sql) to INSERT data in an existing namespace to your destination DB.

Note: Separate historical syncs on separate "ENTITY.batch.sql" files so it is easier when doing partial syncs.

  1. Create the src/database/batch.swap.sql in your project:

CREATE TABLE source_Swap (
    `entityId` STRING,
    `entityUpdatedAt` BIGINT,
    `chainId` INT,
    `contractAddress` STRING,
    `horizon` STRING,
    `blockNumber` BIGINT,
    `blockTimestamp` BIGINT,
    `forkIndex` INT,
    `transactionIndex` INT,
    `logIndex` INT,
    `localIndex` INT,
    `txHash` STRING,
    `txTo` STRING,
    `txFrom` STRING,
    `assetId` STRING,
    `resolver` STRING,
    PRIMARY KEY (`entityId`) NOT ENFORCED
) WITH (
    'connector' = 'database',
    'mode' = 'read',
    'namespace' = '{{ namespace }}',
    'entity-type' = 'Swap',
    -- NOTE: If you have a large dataset, you can use the below partitioning configs. 
    -- A good rule of thumb is to add 10 partition.num for every 1M row for your entity
    'scan.partition.num' = '10',
    'scan.partition.column' = 'blockTimestamp',
    'scan.partition.lower-bound' = '{{ chrono(fromTimestamp | default("01-01-2021 00:00 UTC")) }}',
    'scan.partition.upper-bound' = '{{ chrono(toTimestamp | default("now")) }}'
);

CREATE TABLE sink_Swap (
    `entityId` STRING,
    `entityUpdatedAt` BIGINT,
    `chainId` INT,
    `contractAddress` STRING,
    `horizon` STRING,
    `blockNumber` BIGINT,
    `blockTimestamp` BIGINT,
    `forkIndex` INT,
    `transactionIndex` INT,
    `logIndex` INT,
    `localIndex` INT,
    `txHash` STRING,
    `txTo` STRING,
    `txFrom` STRING,
    `assetId` STRING,
    `resolver` STRING,
    PRIMARY KEY (`entityId`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://HOST/DB',
    'table-name' = 'my_swaps',
    'username' = 'USERNAME',
    'password' = '{{ secret("mysql.password") }}',
    'sink.max-retries' = '10',
    'sink.buffer-flush.interval' = '60s'
);

INSERT INTO sink_Swap SELECT * FROM source_Swap;
  1. Update your manifest.yaml file with a new enricher:

enrichers:
  - id: database-historical-swaps
    engine: flink
    inputSql: ./src/database/batch.swap.sql
  1. Deploy your cluster so the enricher is ready to trigger.

pnpm generate-and-deploy
  1. Trigger the batch sync. adjust the parallelism depending on how large your data is:

flair enricher trigger database-historical-swaps -l WARN -o parallelism=2

Upon triggering this command, you should be able to monitor logs of enricher via the Flink job link shown in terminal or using flair logs -t JobId=<job-id>

That's it! You can see the status of your real-time database sync in the job manager GUI:

🧰 Maintenance

Partial re-syncs

For fixing data issues or if your destination database was down for some time, you can run the same batch job you've created on step 3 above, either for full data, or just a specific period:

# For the example scenario above, you can run:
flair enricher trigger database-historical-swaps \
   -p fromTimestamp='2 days ago' \
   -p toTimestamp='1 hour ago'

Re-processing entities

In cases where your processing logic is changed (e.g. added a new USD price field to your Swap entities) then you would need to use backfill mechanism to apply those changes. In such scenario you do NOT need to sync the database, because all those changed entities will be applied in real-time.

Best Practices

  • Avoid changing field data types on RDBMS databases and instead CAST the types in the INSERT SELECT statement, for example:

INSERT INTO sink_Swap 
SELECT
  entityId,
  // and rest of the fields...
  CAST(amountUSD AS FLOAT) as amountUSD
FROM source_Swap 
;
  • Define all indexes and check the schema before syncing a huge table to avoid timeouts on your database engine.

  • Preferably use eu-central-1 region (Frankfurt, Central Europe) for highest performance. If you need other regions ping our engineers.

Last updated