Skip to content
Snippets Groups Projects

Add a journal client base class to process messages

4 unresolved threads

This is a first draft to implement a base client from our swh-journal.

This can be used for example to trigger the creation of new content in the archiver db.

Related #424 (closed) Related T494

Outside of that scope, i see some possible refactoring between the client class (a consumer) and the publisher (which is also a consumer). Also between the publisher and the swh.storage.listener (the listener is a publisher).


Migrated from D180 (view on Phabricator)

Merge request reports

Closed by Phabricator Migration userPhabricator Migration user 8 years ago (Mar 13, 2017 10:17am UTC)

Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
19 # Only accepted object types
20 ACCEPTED_OBJECT_TYPES = [
21 'content',
22 'revision',
23 'release',
24 'occurrence',
25 'origin',
26 'origin_visit'
27 ]
28
29
30 class SWHJournalClient(SWHConfig, metaclass=ABCMeta):
31 """A base client for the Software Heritage journal.
32
33 The current implementation of the journal uses Apache Kafka
34 brokers to publish messages under a given topic prefix, with each
  • The topic wording is an internal kafka thing that doesn't really matter (except as an implementation detail) for swh.journal consumers.

    Proposed wording:

    A base client for the Software Heritage journal.
    
    The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix.
    
    Clients subscribe to events specific to each object type by using the `object_types` configuration variable.
    
    Clients can be sharded by setting the `client_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same client_id.
    
    Messages are processed by the `process_objects` method in batches of maximum `max_messages`.
  • Please register or sign in to reply
  • 51 # Prefix topic to receive notification from
    52 'topic_prefix': ('str', 'swh.journal.test_publisher'),
    53 # Consumer identifier
    54 'consumer_identifier': ('str', 'swh.journal.client.test'),
    55 # Object types to deal with (in a subscription manner)
    56 'object_types': ('list[str]', [
    57 'content', 'revision',
    58 'release', 'occurrence',
    59 'origin', 'origin_visit'
    60 ]),
    61 # Number of messages to batch process
    62 'max_messages': ('int', 100),
    63 'auto_offset_reset': ('str', 'earliest')
    64 }
    65
    66 CONFIG_BASE_FILENAME = 'journal/client'
  • I agree with the general goal of consistency between the producer and consumer configuration options.

    However, I think we should clearly separate both and making one a subclass of the other sounds like a recipe for confusion.

    Furthermore, we will need two producers: the "live" producer subscribing to database events, as well as the "catchup" producer which will have a feedback loop between the journal and the database. I think there's more value in providing a base class for both of those rather than for producer and consumer.

  • I agree with the general goal of consistency between the producer and consumer configuration options.

    Nice.

    However, I think we should clearly separate both and making one a subclass of the other sounds like a recipe for confusion.

    Ok, that sounds fair.

    Furthermore, we will need two producers: the "live" producer subscribing to database events, as well as the "catchup" producer which will have a feedback loop between the journal and the database.

    I believe what you mention refers to:

    • the live producer is the actual listener defined in swh.storage.listener (subscribed to db events and feeding new object events to the publisher's journal).
    • the catchup producer being the checker mentioned in T494 (which will be 'diffing' the journal against the db and effectively provide back the missing content ids to the journal).

    I think there's more value in providing a base class for both of those rather than for producer and consumer.

    Ok.

  • Adaptation according to proposed improvments

    • doc: Improve wording about SWHJournalClient class
    • Add option to allow consumer client to choose fetch policy
  • 25 'origin',
    26 'origin_visit'
    27 ]
    28
    29
    30 class SWHJournalClient(SWHConfig, metaclass=ABCMeta):
    31 """A base client for the Software Heritage journal.
    32
    33 The current implementation of the journal uses Apache Kafka
    34 brokers to publish messages under a given topic prefix, with each
    35 object type using a specific topic under that prefix.
    36
    37 Clients subscribe to events specific to each object type by using
    38 the `object_types` configuration variable.
    39
    40 Clients can be sharded by setting the `client_id` to a common
  • 48 DEFAULT_CONFIG = {
    49 # Broker to connect to
    50 'brokers': ('list[str]', ['localhost']),
    51 # Prefix topic to receive notification from
    52 'topic_prefix': ('str', 'swh.journal.test_publisher'),
    53 # Consumer identifier
    54 'consumer_identifier': ('str', 'swh.journal.client.test'),
    55 # Object types to deal with (in a subscription manner)
    56 'object_types': ('list[str]', [
    57 'content', 'revision',
    58 'release', 'occurrence',
    59 'origin', 'origin_visit'
    60 ]),
    61 # Number of messages to batch process
    62 'max_messages': ('int', 100),
    63 'auto_offset_reset': ('str', 'earliest')
    • Let's give a proper error message (with a ValueError).

      While we're checking the configuration, let's hard-code the list of supported object types and check that the configuration matches (again, raising ValueError on unknown values).

    • Please register or sign in to reply
  • A few more nits but this looks reasonable to build upon.

  • mentioned in merge request swh-storage!852 (closed)

  • Merge request was accepted

  • Nicolas Dandrimont approved this merge request

    approved this merge request

  • swh.journal.client: Ensure options are correctly set when starting

  • Merge request was merged

  • Please register or sign in to reply
    Loading