Setup
I will first prepare some sample data:
import json
import random
from confluent_kafka import SerializingProducer
p = SerializingProducer({
"bootstrap.servers": "kafka0.tech-tales.blog:9092",
"key.serializer": lambda x, _: x.to_bytes(4, "big"),
"value.serializer": lambda x, _: json.dumps(x)
})
while True:
key = random.randint(1, 26)
msg = {
"arg2": random.choice(string.ascii_lowercase),
"arg3": random.randint(1, 1_000_000)
}
p.produce(
"input_topic",
value=msg,
key=key
)
time.sleep(1)
So I will create some messages. The key will be a number between $1$ and $26$ (both inclusive), the value will be a simple JSON object with arg2
being a lower-case ascii symbol (so, a
, b
, c
, …, z
) and arg3
being some random number between $1$ and $1,000,000$. I am going to use arg2
for rekeying later on and arg3
is essentially just some dummy value.
Data to kSQL
I will get my data into a KSQL stream because it is easy to do and makes further work easier, in particular considering non-materialized tables (which I want to avoid):
create stream input_for_grouping (
arg1 int key,
arg2 string,
arg3 int
) with (
kafka_topic='input_topic',
value_format='json'
);
Note the definition of arg1
as key
.
Creating a table is easy too:
create table aggregated_1 as
select
arg1,
latest_by_offset(arg2),
latest_by_offset(arg3)
from input_for_grouping
group by arg1
emit changes;
With emit changes
the table is not fixed in size but gets updated with every message written to the input_topic
topic.
Rekeying
From a KSQL part, the rekeying is easy:
create table aggregated_2 as
select
arg2,
latest_by_offset(arg1),
latest_by_offset(arg3)
from input_for_grouping
group by arg2
emit changes;
That’s it. Nothing fancy actually, but arg2
definitely is the key now.
What happens internally? KSQL creates two new topics. One is called something like __confluent-ksql-default...-changelog
and obviously is not too interesting (some changelog may be stored, I don’t really care about that). The second one is called __confluent-ksql-default...-Aggregate-GroupBy-repartition
. Here, the input_for_grouping
stream essentially gets copied. The body is a JSON object containing all three arguments arg1
, arg2
and arg3
. arg2
is duplicated in this topic because its value is also the key of the message.
From this topic it is easy to get to the desired result table.
Grouping by multiple key columns?
My last test since it is that easy:
create table aggregated_3 as
select
arg1,
arg2,
sum(arg3) as s
from input_for_grouping
group by arg1, arg2
emit changes;
KSQL here fails with an error message:
Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`ARG1` INTEGER KEY, `ARG2` STRING KEY], features=[]}
reason: The 'KAFKA' format only supports a single field. Got: [`ARG1` INTEGER KEY, `ARG2` STRING KEY]
Caused by: The 'KAFKA' format only supports a single field. Got: [`ARG1` INTEGER
KEY, `ARG2` STRING KEY]
Well theoretically we knew that already: At the time of writing, ksql only supports single-valued keys. We try to generate a multi-valued key - not too surprising, this fails. But I found interesting: It seems like just some other KSQL query is called internally which defines both values as key.