From d3d5b797718698646e409167180b94347f1dde52 Mon Sep 17 00:00:00 2001
From: "Antoine R. Dumont (@ardumont)" <antoine.romain.dumont@gmail.com>
Date: Fri, 13 Mar 2020 15:44:41 +0100
Subject: [PATCH] Migrate to latest origin_visit_upsert/add api changes

Related to D2813
Related to D2820
---
 requirements-swh.txt                   |  2 +-
 swh/journal/replay.py                  | 85 +++++++++++++++-----------
 swh/journal/tests/test_kafka_writer.py |  5 +-
 swh/journal/tests/test_write_replay.py | 22 ++++---
 4 files changed, 65 insertions(+), 49 deletions(-)

diff --git a/requirements-swh.txt b/requirements-swh.txt
index 0b5f629..932ebfd 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,3 @@
 swh.core[db,http] >= 0.0.60
 swh.model >= 0.0.60
-swh.storage >= 0.0.177
+swh.storage >= 0.0.178
diff --git a/swh/journal/replay.py b/swh/journal/replay.py
index 37bf4fa..83a3227 100644
--- a/swh/journal/replay.py
+++ b/swh/journal/replay.py
@@ -26,7 +26,7 @@ from swh.model.identifiers import normalize_timestamp
 from swh.model.hashutil import hash_to_hex
 
 from swh.model.model import (
-    BaseContent, BaseModel, Content, Directory, Origin, Revision,
+    BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision,
     SHA1_SIZE, SkippedContent, Snapshot, Release
 )
 from swh.objstorage.objstorage import (
@@ -46,6 +46,7 @@ CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds"
 
 object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = {
     'origin': Origin.from_dict,
+    'origin_visit': OriginVisit.from_dict,
     'snapshot': Snapshot.from_dict,
     'revision': Revision.from_dict,
     'release': Release.from_dict,
@@ -205,45 +206,61 @@ def _fix_revisions(revisions: List[Dict]) -> List[Dict]:
     return good_revisions
 
 
-def _fix_origin_visits(visits: List[Dict]) -> List[Dict]:
-    """Adapt origin visits into a list of current storage compatible dicts.
+def _fix_origin_visit(visit: Dict) -> OriginVisit:
+    """Adapt origin visits into a list of current storage compatible
+       OriginVisits.
 
     `visit['origin']` is a dict instead of an URL:
 
+    >>> from datetime import datetime, timezone
     >>> from pprint import pprint
-    >>> pprint(_fix_origin_visits([{
+    >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc)
+    >>> pprint(_fix_origin_visit({
     ...     'origin': {'url': 'http://foo'},
+    ...     'date': date,
     ...     'type': 'git',
-    ... }]))
-    [{'metadata': None, 'origin': 'http://foo', 'type': 'git'}]
+    ...     'status': 'ongoing',
+    ...     'snapshot': None,
+    ... }).to_dict())
+    {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc),
+     'metadata': None,
+     'origin': 'http://foo',
+     'snapshot': None,
+     'status': 'ongoing',
+     'type': 'git'}
 
     `visit['type']` is missing , but `origin['visit']['type']` exists:
 
-    >>> pprint(_fix_origin_visits([
-    ...     {'origin': {'type': 'hg', 'url': 'http://foo'}
-    ... }]))
-    [{'metadata': None, 'origin': 'http://foo', 'type': 'hg'}]
-
-    """
-    good_visits = []
-    for visit in visits:
-        visit = visit.copy()
-        if 'type' not in visit:
-            if isinstance(visit['origin'], dict) and 'type' in visit['origin']:
-                # Very old version of the schema: visits did not have a type,
-                # but their 'origin' field was a dict with a 'type' key.
-                visit['type'] = visit['origin']['type']
-            else:
-                # Very very old version of the schema: 'type' is missing,
-                # so there is nothing we can do to fix it.
-                raise ValueError('Got an origin_visit too old to be replayed.')
-        if isinstance(visit['origin'], dict):
-            # Old version of the schema: visit['origin'] was a dict.
-            visit['origin'] = visit['origin']['url']
-        if 'metadata' not in visit:
-            visit['metadata'] = None
-        good_visits.append(visit)
-    return good_visits
+    >>> pprint(_fix_origin_visit(
+    ...     {'origin': {'type': 'hg', 'url': 'http://foo'},
+    ...     'date': date,
+    ...     'status': 'ongoing',
+    ...     'snapshot': None,
+    ... }).to_dict())
+    {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc),
+     'metadata': None,
+     'origin': 'http://foo',
+     'snapshot': None,
+     'status': 'ongoing',
+     'type': 'hg'}
+
+    """  # noqa
+    visit = visit.copy()
+    if 'type' not in visit:
+        if isinstance(visit['origin'], dict) and 'type' in visit['origin']:
+            # Very old version of the schema: visits did not have a type,
+            # but their 'origin' field was a dict with a 'type' key.
+            visit['type'] = visit['origin']['type']
+        else:
+            # Very very old version of the schema: 'type' is missing,
+            # so there is nothing we can do to fix it.
+            raise ValueError('Got an origin_visit too old to be replayed.')
+    if isinstance(visit['origin'], dict):
+        # Old version of the schema: visit['origin'] was a dict.
+        visit['origin'] = visit['origin']['url']
+    if 'metadata' not in visit:
+        visit['metadata'] = None
+    return OriginVisit.from_dict(visit)
 
 
 def collision_aware_content_add(
@@ -308,10 +325,8 @@ def _insert_objects(object_type: str, objects: List[Dict], storage) -> None:
             Revision.from_dict(r) for r in _fix_revisions(objects)
         )
     elif object_type == 'origin_visit':
-        visits = _fix_origin_visits(objects)
-        storage.origin_add(Origin(url=v['origin']) for v in visits)
-        # FIXME: Should be List[OriginVisit], working on fixing
-        # swh.storage.origin_visit_upsert (D2813)
+        visits = [_fix_origin_visit(v) for v in objects]
+        storage.origin_add(Origin(url=v.origin) for v in visits)
         storage.origin_visit_upsert(visits)
     elif object_type in ('directory', 'release', 'snapshot', 'origin'):
         method = getattr(storage, object_type + '_add')
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
index 6a8e402..90861a2 100644
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -150,11 +150,10 @@ def test_storage_direct_writer(
                 object_ = object_.copy()
                 origin_url = object_.pop('origin')
                 storage.origin_add_one(Origin(url=origin_url))
-                visit = method(origin=origin_url, date=object_.pop('date'),
+                visit = method(origin_url, date=object_.pop('date'),
                                type=object_.pop('type'))
                 expected_messages += 1
-                visit_id = visit['visit']
-                storage.origin_visit_update(origin_url, visit_id, **object_)
+                storage.origin_visit_update(origin_url, visit.visit, **object_)
                 expected_messages += 1
         else:
             assert False, object_type
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
index a2b06c4..d096da6 100644
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -10,7 +10,9 @@ import attr
 from hypothesis import given, settings, HealthCheck
 from hypothesis.strategies import lists
 
-from swh.model.hypothesis_strategies import object_dicts, present_contents
+from swh.model.hypothesis_strategies import (
+    object_dicts, present_contents
+)
 from swh.model.model import Origin
 from swh.storage import get_storage, HashCollision
 
@@ -22,10 +24,8 @@ from .utils import MockedJournalClient, MockedKafkaWriter
 
 
 storage_config = {
-    'cls': 'pipeline',
-    'steps': [
-        {'cls': 'memory', 'journal_writer': {'cls': 'memory'}},
-    ]
+    'cls': 'memory',
+    'journal_writer': {'cls': 'memory'},
 }
 
 
@@ -67,16 +67,18 @@ def test_write_replay_same_order_batches(objects):
 
     # Write objects to storage1
     for (obj_type, obj) in objects:
-        obj = obj.copy()
+        if obj_type == 'content' and obj.get('status') == 'absent':
+            obj_type = 'skipped_content'
+
+        obj = object_converter_fn[obj_type](obj)
+
         if obj_type == 'origin_visit':
-            storage1.origin_add_one(Origin(url=obj['origin']))
+            storage1.origin_add_one(Origin(url=obj.origin))
             storage1.origin_visit_upsert([obj])
         else:
-            if obj_type == 'content' and obj.get('status') == 'absent':
-                obj_type = 'skipped_content'
             method = getattr(storage1, obj_type + '_add')
             try:
-                method([object_converter_fn[obj_type](obj)])
+                method([obj])
             except HashCollision:
                 pass
 
-- 
GitLab