Python-Prelude
I will use the following packages:
import time
import uuid
from confluent_kafka import Consumer, Producer, TopicPartition
Setup a sample data generator
Let’s generate some random data into a Kafka Topic. It doesn’t matter what data is generated since I am not interested in the data; I will just insert some dummy data.1
prod = Producer({"bootstrap.servers": "kafka0.tech-tales.blog:9092"})
while True:
prod.produce(topic="test-topic", value=str(time.time_ns()))
prod.flush()
time.sleep(1)
Note: I generated this topic with two partitions because this will be important soon.
You can let this producer just run in the background; you can also stop it after having produced some messages. For my use case, it is sufficient if in every partition are at least a few messages.
The offsets
Now we need a consumer polling messages from that topic. I will do that interactively.
I will also need to read out the offsets reported by the consumer, so let’s generate this method first.
def print_consumer_offsets(consumer):
for part in consumer.position(consumer.assignment())
print(part.partition, part.offset)
What’s happening?
- Line 2: I am first calling
consumer.assignment()
, which gives me a list ofTopicPartitions
. - I pass this list to the
consumer.positions(...)
method which again returns a list of partitions. This new list of partitions has actual offsets set. - Line 3: Essentially, every
TopicPartition
object has three important properties:part.topic
- The topic the partition is part ofpart.partition
- The partition number; starting with0
for the first onepart.offset
- Some offset
Note: If not explicitly asked or set, the offsets of a TopicPartition
element are nonsense. In consumer.assignment()
, every partition receives offset -1000
; if the consumer does not yet have a valid offset for the partition then the offset for this partition is set to -1001
- we will see that later. Actually, this -1001
is the whole reason for this article.
A sample output of the above function:
print_consumer_offsets(consumer)
0 7
1 42
This means: I have assigned partitions 0
and 1
of some topic, and I have read all the messages which have an offset of 7
or less in partition 0
and 42
or less in partition 1
. In particular, I will either receive message 0.8
or 1.43
next (I hope, you understand that notation?).
The consumer
I will generate some consumers throughout this post. Those will all be generated as follows:
def generate_consumer():
return Consumer({
"bootstrap.servers": "kafka0.tech-tales.blog:9092",
"group.id": "some-group-" + str(uuid.uuid4()),
"enable.auto.commit": False,
"auto.offset.reset": "earliest"
})
Note:
- I am setting the
group.id
to some random string. The main idea is that thegroup.id
does not repeat in successive executions of the code - otherwise, I would have to wait for a group rebalance. This would take a long time to wait, every time… - I am disabling
enable.auto.commit
so that I don’t have to think about a load of consumer groups after having finished this article. - I am setting
auto.offset.reset
toearliest
because I want to read from the beginning, obviously.
My trials
Trial: No offsets set
Let’s start with a consumer which should read just all the messages.
>>> consumer = generate_consumer()
>>> consumer.subscribe(["test-topic"])
>>> print_consumer_offsets(consumer)
>>>
>>> # Nothing was printed. No surprise here, the consumer did not
>>> # connect to the broker until now
>>> # Connect to the broker, receive an assignment, poll the first
>>> # message
>>> msg = None
>>> while msg is None:
... msg = consumer.poll(1)
>>> msg.partition()
1
>>> print_consumer_offsets(consumer)
0 -1001
1 1
>>> # Surprise!
What would I expect in the last command? In my opinion, partition 0
should have received offset 0
. Partition 1
has offset 1
, just as expected since I polled one message from it.
Trial: Assign offsets to partitions
I will do more or less the same thing again, but this time, I will assign some real offsets for both partitions.
Note that the Consumer
object requires the group.id
to be set; but since I will not use subscribe(...)
here, it will not be used.
>>> consumer = generate_consumer()
>>> p0 = TopicPartition(topic="test-topic", partition=0, offset=2)
>>> p1 = TopicPartition(topic="test-topic", partition=1, offset=2)
>>> consumer.assign([p0, p1])
>>> print_consumer_offsets(consumer)
0 -1001
1 -1001
>>> # This is interesting because I have assigned offsets
>>> # for both partitions.
>>> msg = None
>>> while msg is None:
... msg = consumer.poll(1)
>>> msg.partition()
1
>>> print_consumer_offsets(consumer)
0 -1001
1 3
So the consumer still reports false offsets until he first polled from that partition.
Another thing I noted on the way: When assigning partitions by hand and not using consumer.subscribe(...)
, then the consumer first reads all the messages from one partition and then from the next one. Also, he seems to use the partition with the highest partition number which still has unread messages…
Why this is important
We had a case where I would provide a REST API for a topic. This should also support pagination.
My approach was the following: As soon as I have collected enough data, I will just return the current offsets for each partition as the next pagination parameters.
The problem about that: If the consumer did not touch one partition in this run, then the current offsets were crap. So I had to also consider the offsets I started this request with to make this work.
Note that the Kafka server I am using does not actually exist. So you don’t have to bother. ↩︎