Die Frage
Ich möchte verstehen, wie der Standard Partitionierer von Kafka tatsächlich funktioniert.
Die Suche
- Kafka stellt ein Interface1
Partitioner
hier bereit; dieses Interface soll von jedem Partitionierer implementiert werden. - Falls nicht anders definiert, wird der
DefaultPartitioner
verwendet - das ist hier fixiert. Aktuell ist das zumindest noch so; aber das ist jetzt veraltet. Ich muss noch herausfinden, wie das neu geht. - Die Partitionierung an sich ist recht einfach:
- Falls eine Partition definiert ist, nutze sie.
- Falls kein Schlüssel gegeben ist, gib etwas zurück was
StickyPartition
heißt. Das ist vermutlich der Grund für die Deprecation. - Falls immer noch keine Partition gegeben ist, wird die Methode
partitionForKey
aufgerufen, das ist hier definiert. Der code ist simpel:return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions
. -serializedKey
sind einfach die Bytes vom Key. -numPartitions
ist die Anzahl der Partitionen; für ein Topic ist das konsistent. -murmur2
ist ein Hashing Algorithmus, der für Lookups verwendet wird. - Am Ende bekommen wir also eine Zahl modulonumPartitions
. Da Murmur2 deterministisch arbeitet, ist diese Methode generell deterministisch.
Das Resultat
Das Topic oder der Topicname spielen für die Partitionierung also eigentlich keine Rolle, ausschließlich die Anzahl an Partitionen ist relevant. Das erklärt auch, warum Co-Partitionierung von Topics funktioniert: Zwei Nachrichten in verschiedenen Topics aber mit gleichem Key werden vom Partitionierer gleich behandelt, falls die beiden Topics die gleiche Anzahl an Partitionen haben.
Zu beachten ist allerdings: Die Partitionierung passiert bereits im Producer, wir müssen also dem Producer vertrauen dass er korrekt partitioniert. Der rdkafka
Producer implementiert per Standard eine andere Methode!
Nächste Schritte
Ich möchte herausfinden, warum der Partitionierer veraltet ist, wie der neue Partitionierer funktioniert und ausschaut. Hilfreiche Referenzen hierfür sind vermutlich KIP-480 und KIP-794, die wurden nämlich direkt beim Deprecation-Befehl erwähnt.
Ein Interface ist die Abstraktion einer Klasse. Eine spezifische Klasse muss diese Spezifikation vollständig implementieren. ↩︎