Python-Prelude

I will use the following packages:

import time
import uuid

from confluent_kafka import Consumer, Producer, TopicPartition

Setup a sample data generator

Let’s generate some random data into a Kafka Topic. It doesn’t matter what data is generated since I am not interested in the data; I will just insert some dummy data.1

prod = Producer({"bootstrap.servers": "kafka0.tech-tales.blog:9092"})

while True:
    prod.produce(topic="test-topic", value=str(time.time_ns()))
    prod.flush()
    time.sleep(1)

Note: I generated this topic with two partitions because this will be important soon.

You can let this producer just run in the background; you can also stop it after having produced some messages. For my use case, it is sufficient if in every partition are at least a few messages.

The offsets

Now we need a consumer polling messages from that topic. I will do that interactively.
I will also need to read out the offsets reported by the consumer, so let’s generate this method first.

def print_consumer_offsets(consumer):
    for part in consumer.position(consumer.assignment())
        print(part.partition, part.offset)

What’s happening?

  • Line 2: I am first calling consumer.assignment(), which gives me a list of TopicPartitions.
  • I pass this list to the consumer.positions(...) method which again returns a list of partitions. This new list of partitions has actual offsets set.
  • Line 3: Essentially, every TopicPartition object has three important properties:
    • part.topic - The topic the partition is part of
    • part.partition - The partition number; starting with 0 for the first one
    • part.offset - Some offset

Note: If not explicitly asked or set, the offsets of a TopicPartition element are nonsense. In consumer.assignment(), every partition receives offset -1000; if the consumer does not yet have a valid offset for the partition then the offset for this partition is set to -1001 - we will see that later. Actually, this -1001 is the whole reason for this article.

A sample output of the above function:

print_consumer_offsets(consumer)
0 7
1 42

This means: I have assigned partitions 0 and 1 of some topic, and I have read all the messages which have an offset of 7 or less in partition 0 and 42 or less in partition 1. In particular, I will either receive message 0.8 or 1.43 next (I hope, you understand that notation?).

The consumer

I will generate some consumers throughout this post. Those will all be generated as follows:

def generate_consumer():
    return Consumer({
        "bootstrap.servers": "kafka0.tech-tales.blog:9092",
        "group.id": "some-group-" + str(uuid.uuid4()),
        "enable.auto.commit": False,
        "auto.offset.reset": "earliest"
    })

Note:

  • I am setting the group.id to some random string. The main idea is that the group.id does not repeat in successive executions of the code - otherwise, I would have to wait for a group rebalance. This would take a long time to wait, every time…
  • I am disabling enable.auto.commit so that I don’t have to think about a load of consumer groups after having finished this article.
  • I am setting auto.offset.reset to earliest because I want to read from the beginning, obviously.

My trials

Trial: No offsets set

Let’s start with a consumer which should read just all the messages.

>>> consumer = generate_consumer()
>>> consumer.subscribe(["test-topic"])

>>> print_consumer_offsets(consumer)
>>>
>>> # Nothing was printed. No surprise here, the consumer did not
>>> # connect to the broker until now

>>> # Connect to the broker, receive an assignment, poll the first
>>> # message
>>> msg = None
>>> while msg is None:
...    msg = consumer.poll(1)

>>> msg.partition()
1
>>> print_consumer_offsets(consumer)
0 -1001
1 1
>>> # Surprise!

What would I expect in the last command? In my opinion, partition 0 should have received offset 0. Partition 1 has offset 1, just as expected since I polled one message from it.

Trial: Assign offsets to partitions

I will do more or less the same thing again, but this time, I will assign some real offsets for both partitions.
Note that the Consumer object requires the group.id to be set; but since I will not use subscribe(...) here, it will not be used.

>>> consumer = generate_consumer()
>>> p0 = TopicPartition(topic="test-topic", partition=0, offset=2)
>>> p1 = TopicPartition(topic="test-topic", partition=1, offset=2)
>>> consumer.assign([p0, p1])
>>> print_consumer_offsets(consumer)
0 -1001
1 -1001
>>> # This is interesting because I have assigned offsets
>>> # for both partitions.
>>> msg = None
>>> while msg is None:
...   msg = consumer.poll(1)
>>> msg.partition()
1
>>> print_consumer_offsets(consumer)
0 -1001
1 3

So the consumer still reports false offsets until he first polled from that partition.

Another thing I noted on the way: When assigning partitions by hand and not using consumer.subscribe(...), then the consumer first reads all the messages from one partition and then from the next one. Also, he seems to use the partition with the highest partition number which still has unread messages…

Why this is important

We had a case where I would provide a REST API for a topic. This should also support pagination.
My approach was the following: As soon as I have collected enough data, I will just return the current offsets for each partition as the next pagination parameters.
The problem about that: If the consumer did not touch one partition in this run, then the current offsets were crap. So I had to also consider the offsets I started this request with to make this work.


  1. Note that the Kafka server I am using does not actually exist. So you don’t have to bother. ↩︎