Setup
I will first prepare some sample data:
| |
So I will create some messages. The key will be a number between $1$ and $26$ (both inclusive), the value will be a simple JSON object with arg2 being a lower-case ascii symbol (so, a, b, c, …, z) and arg3 being some random number between $1$ and $1,000,000$. I am going to use arg2 for rekeying later on and arg3 is essentially just some dummy value.
Data to kSQL
I will get my data into a KSQL stream because it is easy to do and makes further work easier, in particular considering non-materialized tables (which I want to avoid):
| |
Note the definition of arg1 as key.
Creating a table is easy too:
| |
With emit changes the table is not fixed in size but gets updated with every message written to the input_topic topic.
Rekeying
From a KSQL part, the rekeying is easy:
| |
That’s it. Nothing fancy actually, but arg2 definitely is the key now.
What happens internally? KSQL creates two new topics. One is called something like __confluent-ksql-default...-changelog and obviously is not too interesting (some changelog may be stored, I don’t really care about that). The second one is called __confluent-ksql-default...-Aggregate-GroupBy-repartition. Here, the input_for_grouping stream essentially gets copied. The body is a JSON object containing all three arguments arg1, arg2 and arg3. arg2 is duplicated in this topic because its value is also the key of the message.
From this topic it is easy to get to the desired result table.
Grouping by multiple key columns?
My last test since it is that easy:
| |
KSQL here fails with an error message:
Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[`ARG1` INTEGER KEY, `ARG2` STRING KEY], features=[]}
reason: The 'KAFKA' format only supports a single field. Got: [`ARG1` INTEGER KEY, `ARG2` STRING KEY]
Caused by: The 'KAFKA' format only supports a single field. Got: [`ARG1` INTEGER
KEY, `ARG2` STRING KEY]
Well theoretically we knew that already: At the time of writing, ksql only supports single-valued keys. We try to generate a multi-valued key - not too surprising, this fails. But I found interesting: It seems like just some other KSQL query is called internally which defines both values as key.