Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anlambert/swh-model
  • lunar/swh-model
  • franckbret/swh-model
  • douardda/swh-model
  • olasd/swh-model
  • swh/devel/swh-model
  • Alphare/swh-model
  • samplet/swh-model
  • marmoute/swh-model
  • rboyer/swh-model
10 results
Show changes
# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
from typing import Iterable, List
from swh.model import discovery, model
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.model.tests.test_identifiers import directory_example
pytest_plugins = ["aiohttp.pytest_plugin"]
UNKNOWN_HASH = hash_to_bytes("17140cb6109f1e3296dc52e2b2cd29bcb40e86be")
KNOWN_CONTENT_HASH = hash_to_bytes("e8e4106de42e2d5d5efab6a9422b9a8677c993c8")
KNOWN_DIRECTORY_HASH = hash_to_bytes("d7ed3d2c31d608823be58b1cbe57605310615231")
KNOWN_DIRECTORY_HASH_2 = hash_to_bytes("c76724e9a0be4b60f4bf0cb48b261df8eda94b1d")
@dataclass
class FakeArchive:
contents: List[model.Content]
skipped_contents: List[model.SkippedContent]
directories: List[model.Directory]
def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
return []
def skipped_content_missing(
self, skipped_contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
"""List skipped content missing from the archive by sha1"""
return []
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List directories missing from the archive by sha1"""
return []
def test_filter_known_objects(monkeypatch):
# Test with smaller sample sizes to actually trigger the random sampling
monkeypatch.setattr(discovery, "SAMPLE_SIZE", 1)
base_directory = model.Directory.from_dict(directory_example)
# Hardcoding another hash is enough since it's all that's being checked
directory_data = directory_example.copy()
directory_data["id"] = KNOWN_DIRECTORY_HASH_2
other_directory = model.Directory.from_dict(directory_data)
archive = FakeArchive(
contents=[model.Content.from_data(b"blabla")],
skipped_contents=[model.SkippedContent.from_data(b"blabla2", reason="reason")],
directories=[
base_directory,
other_directory,
],
)
assert archive.contents[0].sha1_git == KNOWN_CONTENT_HASH
assert archive.directories[0].id == KNOWN_DIRECTORY_HASH
assert archive.directories[1].id == KNOWN_DIRECTORY_HASH_2
(contents, skipped_contents, directories) = discovery.filter_known_objects(archive)
assert len(contents) == 0
assert len(skipped_contents) == 0
assert len(directories) == 0
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Copyright (C) 2017-2020 The Software Heritage developers
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -15,11 +15,10 @@ class MerkleTestNode(merkle.MerkleNode):
super().__init__(data)
self.compute_hash_called = 0
def compute_hash(self):
def compute_hash(self) -> bytes:
self.compute_hash_called += 1
child_data = [child + b"=" + self[child].hash for child in sorted(self)]
return b"hash(" + b", ".join([self.data["value"]] + child_data) + b")"
return b"hash(" + b", ".join([self.data.get("value", b"")] + child_data) + b")"
class MerkleTestLeaf(merkle.MerkleLeaf):
......@@ -31,7 +30,7 @@ class MerkleTestLeaf(merkle.MerkleLeaf):
def compute_hash(self):
self.compute_hash_called += 1
return b"hash(" + self.data["value"] + b")"
return b"hash(" + self.data.get("value", b"") + b")"
class TestMerkleLeaf(unittest.TestCase):
......@@ -62,14 +61,10 @@ class TestMerkleLeaf(unittest.TestCase):
collected = self.instance.collect()
self.assertEqual(
collected,
{
self.instance.object_type: {
self.instance.hash: self.instance.get_data(),
},
},
{self.instance},
)
collected2 = self.instance.collect()
self.assertEqual(collected2, {})
self.assertEqual(collected2, set())
self.instance.reset_collect()
collected3 = self.instance.collect()
self.assertEqual(collected, collected3)
......@@ -96,32 +91,44 @@ class TestMerkleNode(unittest.TestCase):
self.nodes = {b"root": self.root}
for i in (b"a", b"b", b"c"):
value = b"root/" + i
node = MerkleTestNode({"value": value,})
node = MerkleTestNode(
{
"value": value,
}
)
self.root[i] = node
self.nodes[value] = node
for j in (b"a", b"b", b"c"):
value2 = value + b"/" + j
node2 = MerkleTestNode({"value": value2,})
node2 = MerkleTestNode(
{
"value": value2,
}
)
node[j] = node2
self.nodes[value2] = node2
for k in (b"a", b"b", b"c"):
value3 = value2 + b"/" + j
node3 = MerkleTestNode({"value": value3,})
node3 = MerkleTestNode(
{
"value": value3,
}
)
node2[j] = node3
self.nodes[value3] = node3
def test_equality(self):
node1 = merkle.MerkleNode({"foo": b"bar"})
node2 = merkle.MerkleNode({"foo": b"bar"})
node3 = merkle.MerkleNode({})
node1 = MerkleTestNode({"value": b"bar"})
node2 = MerkleTestNode({"value": b"bar"})
node3 = MerkleTestNode({})
self.assertEqual(node1, node2)
self.assertNotEqual(node1, node3, node1 == node3)
node1["foo"] = node3
node1[b"a"] = node3
self.assertNotEqual(node1, node2)
node2["foo"] = node3
node2[b"a"] = node3
self.assertEqual(node1, node2)
def test_hash(self):
......@@ -166,16 +173,24 @@ class TestMerkleNode(unittest.TestCase):
def test_collect(self):
collected = self.root.collect()
self.assertEqual(len(collected[self.root.object_type]), len(self.nodes))
self.assertEqual(collected, set(self.nodes.values()))
for node in self.nodes.values():
self.assertTrue(node.collected)
collected2 = self.root.collect()
self.assertEqual(collected2, {})
self.assertEqual(collected2, set())
def test_iter_tree(self):
def test_iter_tree_with_deduplication(self):
nodes = list(self.root.iter_tree())
self.assertCountEqual(nodes, self.nodes.values())
def test_iter_tree_without_deduplication(self):
# duplicate existing hash in merkle tree
self.root[b"d"] = MerkleTestNode({"value": b"root/c/c/c"})
nodes_dedup = list(self.root.iter_tree())
nodes = list(self.root.iter_tree(dedup=False))
assert nodes != nodes_dedup
assert len(nodes) == len(nodes_dedup) + 1
def test_get(self):
for key in (b"a", b"b", b"c"):
self.assertEqual(self.root[key], self.nodes[b"root/" + key])
......@@ -232,16 +247,16 @@ class TestMerkleNode(unittest.TestCase):
# Ensure we collected root, root/b, and both new children
collected_after_update = self.root.collect()
self.assertCountEqual(
collected_after_update[MerkleTestNode.object_type],
[
self.nodes[b"root"].hash,
self.nodes[b"root/b"].hash,
new_children[b"c"].hash,
new_children[b"d"].hash,
],
self.assertEqual(
collected_after_update,
{
self.nodes[b"root"],
self.nodes[b"root/b"],
new_children[b"c"],
new_children[b"d"],
},
)
# test that noop updates doesn't invalidate anything
self.root[b"a"][b"b"].update({})
self.assertEqual(self.root.collect(), {})
self.assertEqual(self.root.collect(), set())
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.