Add a journal client base class to process messages
This is a first draft to implement a base client from our swh-journal.
This can be used for example to trigger the creation of new content in the archiver db.
Related #424 (closed) Related T494
Outside of that scope, i see some possible refactoring between the client class (a consumer) and the publisher (which is also a consumer). Also between the publisher and the swh.storage.listener (the listener is a publisher).
Migrated from D180 (view on Phabricator)
Merge request reports
Activity
Some references in the commit message have been migrated:
- T424 is now #424 (closed)
Some references in the commit message have been migrated:
- T424 is now #424 (closed)
- swh/journal/client.py 0 → 100644
19 # Only accepted object types 20 ACCEPTED_OBJECT_TYPES = [ 21 'content', 22 'revision', 23 'release', 24 'occurrence', 25 'origin', 26 'origin_visit' 27 ] 28 29 30 class SWHJournalClient(SWHConfig, metaclass=ABCMeta): 31 """A base client for the Software Heritage journal. 32 33 The current implementation of the journal uses Apache Kafka 34 brokers to publish messages under a given topic prefix, with each The
topic
wording is an internal kafka thing that doesn't really matter (except as an implementation detail) for swh.journal consumers.Proposed wording:
A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. Clients subscribe to events specific to each object type by using the `object_types` configuration variable. Clients can be sharded by setting the `client_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same client_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`.
- swh/journal/client.py 0 → 100644
51 # Prefix topic to receive notification from 52 'topic_prefix': ('str', 'swh.journal.test_publisher'), 53 # Consumer identifier 54 'consumer_identifier': ('str', 'swh.journal.client.test'), 55 # Object types to deal with (in a subscription manner) 56 'object_types': ('list[str]', [ 57 'content', 'revision', 58 'release', 'occurrence', 59 'origin', 'origin_visit' 60 ]), 61 # Number of messages to batch process 62 'max_messages': ('int', 100), 63 'auto_offset_reset': ('str', 'earliest') 64 } 65 66 CONFIG_BASE_FILENAME = 'journal/client' I agree with the general goal of consistency between the producer and consumer configuration options.
However, I think we should clearly separate both and making one a subclass of the other sounds like a recipe for confusion.
Furthermore, we will need two producers: the "live" producer subscribing to database events, as well as the "catchup" producer which will have a feedback loop between the journal and the database. I think there's more value in providing a base class for both of those rather than for producer and consumer.
Some references in the commit message have been migrated:
- T424 is now #424 (closed)
I agree with the general goal of consistency between the producer and consumer configuration options.
Nice.
However, I think we should clearly separate both and making one a subclass of the other sounds like a recipe for confusion.
Ok, that sounds fair.
Furthermore, we will need two producers: the "live" producer subscribing to database events, as well as the "catchup" producer which will have a feedback loop between the journal and the database.
I believe what you mention refers to:
- the live producer is the actual listener defined in swh.storage.listener (subscribed to db events and feeding new object events to the publisher's journal).
- the catchup producer being the checker mentioned in T494 (which will be 'diffing' the journal against the db and effectively provide back the missing content ids to the journal).
I think there's more value in providing a base class for both of those rather than for producer and consumer.
Ok.
- swh/journal/client.py 0 → 100644
25 'origin', 26 'origin_visit' 27 ] 28 29 30 class SWHJournalClient(SWHConfig, metaclass=ABCMeta): 31 """A base client for the Software Heritage journal. 32 33 The current implementation of the journal uses Apache Kafka 34 brokers to publish messages under a given topic prefix, with each 35 object type using a specific topic under that prefix. 36 37 Clients subscribe to events specific to each object type by using 38 the `object_types` configuration variable. 39 40 Clients can be sharded by setting the `client_id` to a common - swh/journal/client.py 0 → 100644
48 DEFAULT_CONFIG = { 49 # Broker to connect to 50 'brokers': ('list[str]', ['localhost']), 51 # Prefix topic to receive notification from 52 'topic_prefix': ('str', 'swh.journal.test_publisher'), 53 # Consumer identifier 54 'consumer_identifier': ('str', 'swh.journal.client.test'), 55 # Object types to deal with (in a subscription manner) 56 'object_types': ('list[str]', [ 57 'content', 'revision', 58 'release', 'occurrence', 59 'origin', 'origin_visit' 60 ]), 61 # Number of messages to batch process 62 'max_messages': ('int', 100), 63 'auto_offset_reset': ('str', 'earliest') mentioned in merge request swh-storage!852 (closed)
Some references in the commit message have been migrated:
- T424 is now #424 (closed)