Skip to content
Snippets Groups Projects
Verified Commit 7e03cc68 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

indexer.journal_client: Subscribe to OriginVisitStatus topic

... instead of OriginVisit.

OriginVisit model object no longer hold status information so the current
filtering happening on the journal client side could not work.

Related to T2814
Related to P882
parent d92c2419
No related branches found
Tags v0.6.0
No related merge requests found
......@@ -270,7 +270,7 @@ def journal_client(
brokers=brokers,
prefix=prefix,
group_id=group_id,
object_types=["origin_visit"],
object_types=["origin_visit_status"],
stop_after_objects=stop_after_objects,
)
......
......@@ -14,8 +14,8 @@ MAX_ORIGINS_PER_TASK = 100
def process_journal_objects(messages, *, scheduler, task_names):
"""Worker function for `JournalClient.process(worker_fn)`, after
currification of `scheduler` and `task_names`."""
assert set(messages) == {"origin_visit"}, set(messages)
process_origin_visits(messages["origin_visit"], scheduler, task_names)
assert set(messages) == {"origin_visit_status"}, set(messages)
process_origin_visits(messages["origin_visit_status"], scheduler, task_names)
def process_origin_visits(visits, scheduler, task_names):
......
......@@ -354,7 +354,7 @@ def test_journal_client(
STATUS = {"status": "full", "origin": {"url": "file://dev/0000",}}
producer.produce(
topic=kafka_prefix + ".origin_visit",
topic=f"{kafka_prefix}.origin_visit_status",
key=b"bogus",
value=value_to_kafka(STATUS),
)
......
# Copyright (C) 2019 The Software Heritage developers
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -10,10 +10,10 @@ from swh.indexer.journal_client import process_journal_objects
class JournalClientTest(unittest.TestCase):
def testOneOriginVisit(self):
def test_one_origin_visit_status(self):
mock_scheduler = Mock()
messages = {
"origin_visit": [{"status": "full", "origin": "file:///dev/zero",},]
"origin_visit_status": [{"status": "full", "origin": "file:///dev/zero",},]
}
process_journal_objects(
messages,
......@@ -39,10 +39,10 @@ class JournalClientTest(unittest.TestCase):
),
)
def testOriginVisitLegacy(self):
def test_origin_visit_legacy(self):
mock_scheduler = Mock()
messages = {
"origin_visit": [
"origin_visit_status": [
{"status": "full", "origin": {"url": "file:///dev/zero",}},
]
}
......@@ -70,10 +70,10 @@ class JournalClientTest(unittest.TestCase):
),
)
def testOneOriginVisitBatch(self):
def test_one_origin_visit_batch(self):
mock_scheduler = Mock()
messages = {
"origin_visit": [
"origin_visit_status": [
{"status": "full", "origin": "file:///dev/zero",},
{"status": "full", "origin": "file:///tmp/foobar",},
]
......@@ -106,10 +106,10 @@ class JournalClientTest(unittest.TestCase):
)
@patch("swh.indexer.journal_client.MAX_ORIGINS_PER_TASK", 2)
def testOriginVisitBatches(self):
def test_origin_visit_batches(self):
mock_scheduler = Mock()
messages = {
"origin_visit": [
"origin_visit_status": [
{"status": "full", "origin": "file:///dev/zero",},
{"status": "full", "origin": "file:///tmp/foobar",},
{"status": "full", "origin": "file:///tmp/spamegg",},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment