Setup

I will first prepare some sample data:

import json
import random

from confluent_kafka import SerializingProducer

p = SerializingProducer({
    "bootstrap.servers": "kafka0.tech-tales.blog:9092",
    "key.serializer": lambda x, _: x.to_bytes(4, "big"),
    "value.serializer": lambda x, _: json.dumps(x)
})

while True:

    key = random.randint(1, 26)

    msg = {
        "arg2": random.choice(string.ascii_lowercase),
        "arg3": random.randint(1, 1_000_000)
    }

    p.produce(
        "input_topic",
        value=msg,
        key=key
    )

    time.sleep(1)

So I will create some messages. The key will be a number between $1$ and $26$ (both inclusive), the value will be a simple JSON object with arg2 being a lower-case ascii symbol (so, a, b, c, …, z) and arg3 being some random number between $1$ and $1,000,000$. I am going to use arg2 for rekeying later on and arg3 is essentially just some dummy value.

Data to kSQL

I will get my data into a KSQL stream because it is easy to do and makes further work easier, in particular considering non-materialized tables (which I want to avoid):

create stream input_for_grouping (
    arg1 int key,
    arg2 string,
    arg3 int
) with (
    kafka_topic='input_topic',
    value_format='json'
);

Note the definition of arg1 as key.

Creating a table is easy too:

create table aggregated_1 as
    select
        arg1,
        latest_by_offset(arg2),
        latest_by_offset(arg3)
    from input_for_grouping
    group by arg1
emit changes;

With emit changes the table is not fixed in size but gets updated with every message written to the input_topic topic.

Rekeying

From a KSQL part, the rekeying is easy:

create table aggregated_2 as
    select
        arg2,
        latest_by_offset(arg1),
        latest_by_offset(arg3)
    from input_for_grouping
    group by arg2
emit changes;

That’s it. Nothing fancy actually, but arg2 definitely is the key now.

What happens internally? KSQL creates two new topics. One is called something like __confluent-ksql-default...-changelog and obviously is not too interesting (some changelog may be stored, I don’t really care about that). The second one is called __confluent-ksql-default...-Aggregate-GroupBy-repartition. Here, the input_for_grouping stream essentially gets copied. The body is a JSON object containing all three arguments arg1, arg2 and arg3. arg2 is duplicated in this topic because its value is also the key of the message.

From this topic it is easy to get to the desired result table.

Grouping by multiple key columns?

My last test since it is that easy:

create table aggregated_3 as
    select
        arg1,
        arg2,
        sum(arg3) as s
    from input_for_grouping
    group by arg1, arg2
emit changes;

KSQL here fails with an error message:

Key format does not support schema.
  format: KAFKA
  schema: Persistence{columns=[`ARG1` INTEGER KEY, `ARG2` STRING KEY], features=[]}
  reason: The 'KAFKA' format only supports a single field. Got: [`ARG1` INTEGER KEY, `ARG2` STRING KEY]
  Caused by: The 'KAFKA' format only supports a single field. Got: [`ARG1` INTEGER
        KEY, `ARG2` STRING KEY]

Well theoretically we knew that already: At the time of writing, ksql only supports single-valued keys. We try to generate a multi-valued key - not too surprising, this fails. But I found interesting: It seems like just some other KSQL query is called internally which defines both values as key.