Django management command to populate Kafka messages into application database.
Classes
Command
Bases: BaseCommand
Consumes messages from Kafka and stores them in the database.
Functions
add_arguments(parser)
Add commandline options.
Source code in main/management/commands/kafka_consumer.py
| def add_arguments(self, parser: ArgumentParser) -> None:
"""Add commandline options."""
parser.add_argument("--debug", action="store_true")
|
handle(debug=False, **kwargs)
Command business logic.
Source code in main/management/commands/kafka_consumer.py
| def handle(self, debug: bool = False, **kwargs: Any) -> None: # type: ignore[misc]
"""Command business logic."""
consumer = KafkaConsumer(bootstrap_servers=[settings.KAFKA_ADDRESS])
consumer.subscribe(pattern=f"({'|'.join(settings.KAFKA_TOPIC_REGEX.values())})")
# TODO: determine why the below doesn't work
# consumer.subscribe(pattern="control.no_session.process_manager")
self.stdout.write("Listening for messages from Kafka.")
while True:
for messages in consumer.poll(timeout_ms=500).values():
message_records = []
for message in messages:
if debug:
self.stdout.write(f"Message received: {message}")
self.stdout.flush()
# Convert Kafka timestamp (milliseconds) to datetime (seconds).
time = datetime.fromtimestamp(message.timestamp / 1e3, tz=UTC)
bm = BroadcastMessage()
bm.ParseFromString(message.value)
body = bm.data.value.decode("utf-8")
message_records.append(
DruncMessage(topic=message.topic, timestamp=time, message=body)
)
if message_records:
DruncMessage.objects.bulk_create(message_records)
# Remove expired messages from the database.
message_timeout = timedelta(seconds=settings.MESSAGE_EXPIRE_SECS)
expire_time = datetime.now(tz=UTC) - message_timeout
query = DruncMessage.objects.filter(timestamp__lt=expire_time)
if query.count():
if debug:
self.stdout.write(
f"Deleting {query.count()} messages "
f"older than {expire_time}."
)
query.delete()
|