Topic -> SQL Sink
This tutorial is part of the HTTP website to SQL database series:
- Part 1: HTTP Source -> Topic
- Part 2: Connector Transformations
- Part 3: Topic -> SQL Sink
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions here!
We will be using Postgres
database, You can download it and set up from PostgreSQL website for your OS. Alternatively use a cloud service like ElephantSQL.
Introduction
In previous tutorials, we have seen how to read data from external sources and write it to a Fluvio topic. In this tutorial, we will go through how to sink data from a Fluvio topic to external sink such as a database.
We will use sink
type of connectors. All sink
connectors consume data from luvio topic and write it to an external system. Particularly, we will use the SQL Sink Connector
which can write to a PostgreSQL or SQLite database.
Since this is targeted to SQL
database, configuration will be concern with mapping the JSON data to SQL columns. Sink Connector will perform these steps:
- Read data from the topic
- Transform the data to SQL insert statement.
- Send the SQL insert statement to the database.
SQL transformation will be done using SmartModule which allow you to plug-in different transformation logic if needed.
We will be using topics from first tutorial [Streaming from HTTP Source] which stream data to cat-facts
topic. Please run that tutorial first to set up the topic.
As in previous tutorials, we will use cdk
to manage the connectors. Run following command to download the connector from the Hub.
$ cdk hub download infinyon/sql-sink@0.4.3
Then download SQL SmartModule from the Hub.
$ fluvio hub sm download infinyon/json-sql@0.2.1
Then you should see two smartmodules downloaded assuming you have already downloaded the jolt
SmartModule from previous tutorial.
$ fluvio sm list
SMARTMODULE SIZE
infinyon/json-sql@0.2.1 559.6 KB
infinyon/jolt@0.4.1 589.3 KB
Sink Connector configuration
Copy and paste following config and save it as sql-cat-fact.yaml
.
# sql.yaml
apiVersion: 0.1.0
meta:
name: simple-cat-facts-sql
type: sql-sink
version: 0.4.3
topic: cat-facts
sql:
url: "postgres://user:password@db.postgreshost.example/dbname"
transforms:
- uses: infinyon/json-sql@0.2.1
invoke: insert
with:
mapping:
table: "animalfacts"
map-columns:
"length":
json-key: "length"
value:
type: "int"
default: "0"
required: true
"raw_fact_json":
json-key: "$"
value:
type: "jsonb"
required: true
This configuration will read data from cat-facts
topic and insert into animalfacts
table in the database. The json-sql
SmartModule will transform the JSON data into SQL insert statement.
Please change line containing url
to your database connection string.
SQL Mapping
The SmartModule json-sql
implements a domain specific language (DSL) to specify a transformation of input JSON to SQL insert statement. It uses model similar to Django Model where SQL tables are abstract into a model. The model is then used to generate SQL insert statement.
The mapping is designed for translation JSON into SQL. Each column of the table is mapped from a JSON expression.
For example, here is mapping for length
column:
"length":
json-key: "length"
value:
type: "int"
default: "0"
required: true
This mapping will take length
field from JSON and insert into length
column in the table. If length
field is not found, it will use default value of 0
.
Setup the Database
In order to run the connector, you need to create a table in your database. Run following SQL command in postgres CLI:
# create table animalfacts(length integer, raw_fact_json jsonb);
You can confirm table is created:
# select * from animalfacts;
length | raw_fact_json
--------+---------------
(0 rows)
Once you have the config file, you can create the connector using the cdk deploy start
command.
$ cdk deploy start --ipkg infinyon-sql-sink-0.4.3.ipkg --config ./sql-cat-fact.yaml
You can use cdk deploy list
to view the status of the connector.
$ cdk deploy list
NAME STATUS
simple-cat-facts-sql Running
Generate data and Check the result
Fluvio topic allow you to decouple the data source from the data sink. This means both source and sink can be run independently without affecting each other. You can run the source connector to generate data but it is not required for this demo.
Here, we will manually produce same data from previous tutorial to the cat-facts
topic. This way we can control the data and see how it is sinked to the database.
By default, sink connector will consume the data from the end of topic which means it will ignore exiting data in the topic.
Let's produce a single record to the topic.
$ fluvio produce cat-facts
{"fact":"A cat’s jaw can’t move sideways, so a cat can’t chew large chunks of food.","length":74}
Ok!
Then you can query the database to see the record.
# select * from animalfacts;
length | raw_fact_json
--------+------------------------------------------------------------------------------------------------------
74 | {"fact": "A cat’s jaw can’t move sideways, so a cat can’t chew large chunks of food.", "length": 74}
(1 row)
You can add more records to the topic and see how SQL connector is inserting the data into the database.
$ fluvio produce cat-facts
{"fact":"Unlike humans, cats are usually lefties. Studies indicate that their left paw is typically their dominant paw.","length":110}
Ok!
# select * from animalfacts;
length | raw_fact_json
--------+-------------------------------------------------------------------------------------------------------------------------------------------
74 | {"fact": "A cat’s jaw can’t move sideways, so a cat can’t chew large chunks of food.", "length": 74}
110 | {"fact": "Unlike humans, cats are usually lefties. Studies indicate that their left paw is typically their dominant paw.", "length": 110}
(2 rows)
Clean up
Same in previous tutorials, use cdk deploy shutdown
to stop the connector.
Conclusion
This tutorial showed you how to sink data from a Fluvio topic to a SQL database. You can use the same concept to sink data to other databases or systems.
You can combine this tutorial with previous tutorials to create a complete data pipeline from source to sink. This just requires deploying multiple connectors.
With Fluvio's event driven architecture, source and sink can be run independently and doesn't effect each other. You can also chain together multiple sources and sinks to create complex data pipelines.