After two introduction blogs on Apache Kafka (ie. Apache Kafka Concepts by Example and Apache Kafka Consumer Group), it is time to discover the wide ecosystem around it. In this blog post, I will play with ksqlDB, streams and tables.

ksqlDB

ksqlDB is a server that takes benefit of an Apache Kafka infrastructure for real time data streaming. It can be used to capture events (via, for example, Kafka Connect), transform events, expose views (or tables).

Starting ksqlDB server is easy. First, we need to set the bootstrap servers of our Kafka cluster in etc/ksqldb/ksql-server.properties file:

bootstrap.servers=localhost:29092

As it is used for a proof of concept, I use a one broker cluster (ie. without high availability). Of course, ksqlDB server supports HA.

The command to start:

bin/ksql-server-start etc/ksqldb/ksql-server.properties

And once, it up and running, you should see this:

[2024-04-19 11:43:25,485] INFO Waiting until monitored service is ready for metrics collection (io.confluent.support.metrics.BaseMetricsReporter:173)
[2024-04-19 11:43:25,485] INFO Monitored service is now ready (io.confluent.support.metrics.BaseMetricsReporter:185)
[2024-04-19 11:43:25,485] INFO Attempting to collect and submit metrics (io.confluent.support.metrics.BaseMetricsReporter:144)
[2024-04-19 11:43:25,486] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:385)

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2022 Confluent Inc.

Server 0.29.0 listening on http://0.0.0.0:8088

To access the KSQL CLI, run:
ksql http://0.0.0.0:8088

[2024-04-19 11:43:25,489] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:153)
[2024-04-19 11:47:04,248] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)

ksqlDB Client

Now, we are ready to start the ksqlDB client:

bin/ksql http://localhost:8088

if all went well, you should receive the ksql prompt and see the server status as RUNNING:

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =        The Database purpose-built       =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2022 Confluent Inc.

CLI v0.29.0, Server v0.29.0 located at http://localhost:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Stream

One advantage, and a drawback as well, is that a Kafka topic (refer to my previous blog post if you don’t know what this is) can store anything and each message can have its own format (text or binary). Schema Registry can enforce formatting rules, versioning and serialization/de-serialization information (I will cover that in another blog). ksql also enforces the formatting when defining a stream. For example, I create a order stream with 4 fields:

  • order id
  • customer id
  • product id
  • status
ksql> create stream order_stream (order_id int, customer_id int, product_id int, status_id int)
with (kafka_topic='order_topic',value_format='json',partitions=1);

 Message
----------------
 Stream created
----------------

To check what happened in the background, I could either use Kafka UI or even ksql:

ksql> show streams;

 Stream Name         | Kafka Topic                 | Key Format | Value Format | Windowed
------------------------------------------------------------------------------------------
 KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA      | JSON         | false
 ORDER_STREAM        | order_topic                 | KAFKA      | JSON         | false
------------------------------------------------------------------------------------------
ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 default_ksql_processing_log | 1          | 1
 order_topic                 | 1          | 1
---------------------------------------------------------------
ksql>

We can see our stream and the associated backend topic.

Let’s insert data in the stream:

insert into order_stream (order_id, customer_id, product_id, status_id) values (1, 10, 21, 0);

And check data is there:

ksql> select * from order_stream;
+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_ID               |CUSTOMER_ID            |PRODUCT_ID             |STATUS_ID              |
+-----------------------+-----------------------+-----------------------+-----------------------+
|1                      |10                     |21                     |0                      |
Query Completed
Query terminated

And in the topic, what is actually stored? We can run a kafka-console-consumer.sh to see it. By the way, command must be started before inserting data or with --from-beginning option:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic order_topic
{"ORDER_ID":1,"CUSTOMER_ID":10,"PRODUCT_ID":21,"STATUS_ID":0}

Every update of the order status will imply a new event in the order_stream.

Table

Let’s say we want to see the status name instead of the status id which has no meaning to us. It is possible to create a table which will contain both status_id and the associated status_name:

create table status (status_id int primary key, status_name varchar)
with (kafka_topic='status',value_format='json',partitions=1);

As you can see, when defining a table, we have to define a primary key. One of the main difference is when querying a table, only the last value of each primary key will be provided.

Let’s insert some data into status table:

insert into status (status_id, status_name) values (0,'Pending');
insert into status (status_id, status_name) values (1,'Processing');
insert into status (status_id, status_name) values (2,'Shipped');
insert into status (status_id, status_name) values (3,'Delivered');
insert into status (status_id, status_name) values (4,'Canceled');

And we can query it:

ksql> select * from status emit changes;
+-------------------------------------------------+-------------------------------------------------+
|STATUS_ID                                        |STATUS_NAME                                      |
+-------------------------------------------------+-------------------------------------------------+
|0                                                |Pending                                          |
|1                                                |Processing                                       |
|2                                                |Shipped                                          |
|3                                                |Delivered                                        |
|4                                                |Canceled                                         |

Oops, I see a typo in Canceled. How to correct it? By inserting a new record with the update:

insert into status (status_id, status_name) values (4,'Cancelled');

If I keep the select open, I will see the update and if I query it again, I see the fixed status_name:

ksql> select * from status emit changes;
+-------------------------------------------------+-------------------------------------------------+
|STATUS_ID                                        |STATUS_NAME                                      |
+-------------------------------------------------+-------------------------------------------------+
|0                                                |Pending                                          |
|1                                                |Processing                                       |
|2                                                |Shipped                                          |
|3                                                |Delivered                                        |
|4                                                |Cancelled                                        |

Joining Table and Stream

One interest of this is that you can join table and stream like in any SQL database to improve the result. The SQL query to create that stream is:

create stream order_stream_with_status as
  select order_id, customer_id, order_stream.status_id, product_id, status.status_name as status_name
  from order_stream left join status on order_stream.status_id = status.status_id
emit changes;

Here I create a new stream (ie. order_stream_with_status) based on an stream order_stream and joined to table status_name. “emit changes” is to see all changes (messages) from the topic.

Let’s see what is happening while selecting from this new stream when inserting in order_stream:

 select * from ORDER_STREAM_WITH_STATUS emit changes;
+------------------+------------------+------------------+------------------+------------------+
|ORDER_STREAM_STATU|ORDER_ID          |CUSTOMER_ID       |PRODUCT_ID        |STATUS_NAME       |
|S_ID              |                  |                  |                  |                  |
+------------------+------------------+------------------+------------------+------------------+
|0                 |1                 |10                |21                |Pending           |

Great! Now, we see a status name. Of course, this can be done with other columns as well. We can even use Kafka Connect to get data from a database like MySQL or Postgres.

Filtered Stream

Now, let’s say our shop bills customer when order has been shipped. Obviously, they don’t want to be notified on all events received in order_stream, thus we can create a new stream which will filter on status_id=2. The sql query for that can be:

create stream order_stream_billing as
 select order_id, customer_id, order_stream.status_id, product_id, status.status_name as status_name
 from order_stream left join status on order_stream.status_id = status.status_id
 where order_stream.status_id=2
emit changes;

We can insert few orders into order_stream:

insert into order_stream (order_id, customer_id, product_id, status_id) values (1, 10, 21, 0);
insert into order_stream (order_id, customer_id, product_id, status_id) values (2, 10, 21, 0);
insert into order_stream (order_id, customer_id, product_id, status_id) values (3, 10, 21, 0);
insert into order_stream (order_id, customer_id, product_id, status_id) values (4, 10, 21, 0);
insert into order_stream (order_id, customer_id, product_id, status_id) values (5, 10, 21, 0);

And then update their status:

insert into order_stream (order_id, customer_id, product_id, status_id) values (1, 10, 21, 1);
insert into order_stream (order_id, customer_id, product_id, status_id) values (2, 10, 21, 1);
insert into order_stream (order_id, customer_id, product_id, status_id) values (3, 10, 21, 2);
insert into order_stream (order_id, customer_id, product_id, status_id) values (4, 10, 21, 1);
insert into order_stream (order_id, customer_id, product_id, status_id) values (5, 10, 21, 2);

What will the select show? You guessed right, only two of them will be in the queue:

select * from ORDER_STREAM_billing emit changes;
+------------------+------------------+------------------+------------------+------------------+
|ORDER_STREAM_STATU|ORDER_ID          |CUSTOMER_ID       |PRODUCT_ID        |STATUS_NAME       |
|S_ID              |                  |                  |                  |                  |
+------------------+------------------+------------------+------------------+------------------+
|2                 |3                 |10                |21                |Shipped           |
|2                 |5                 |10                |21                |Shipped           |

And in the Infrastructure?

While doing all these tests, I forgot to check what was happening on the pure Kafka side. Let’s see:

ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 ORDER_STREAM_BILLING        | 1          | 1
 ORDER_STREAM_WITH_STATUS    | 1          | 1
 default_ksql_processing_log | 1          | 1
 order_topic                 | 1          | 1
 status                      | 1          | 1
---------------------------------------------------------------

The two streams with upper case were created like that because topic name was not specified during the creation and, as per documentation, the upper case of table name is used as topic name.

In short, ksqlDB is part of an Extract, Transform and Load (ETL) process.