Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anlambert/swh-model
  • lunar/swh-model
  • franckbret/swh-model
  • douardda/swh-model
  • olasd/swh-model
  • swh/devel/swh-model
  • Alphare/swh-model
  • samplet/swh-model
  • marmoute/swh-model
  • rboyer/swh-model
10 results
Show changes
# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
from typing import Iterable, List
from swh.model import discovery, model
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.model.tests.test_identifiers import directory_example
pytest_plugins = ["aiohttp.pytest_plugin"]
UNKNOWN_HASH = hash_to_bytes("17140cb6109f1e3296dc52e2b2cd29bcb40e86be")
KNOWN_CONTENT_HASH = hash_to_bytes("e8e4106de42e2d5d5efab6a9422b9a8677c993c8")
KNOWN_DIRECTORY_HASH = hash_to_bytes("d7ed3d2c31d608823be58b1cbe57605310615231")
KNOWN_DIRECTORY_HASH_2 = hash_to_bytes("c76724e9a0be4b60f4bf0cb48b261df8eda94b1d")
@dataclass
class FakeArchive:
contents: List[model.Content]
skipped_contents: List[model.SkippedContent]
directories: List[model.Directory]
def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
return []
def skipped_content_missing(
self, skipped_contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
"""List skipped content missing from the archive by sha1"""
return []
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List directories missing from the archive by sha1"""
return []
def test_filter_known_objects(monkeypatch):
# Test with smaller sample sizes to actually trigger the random sampling
monkeypatch.setattr(discovery, "SAMPLE_SIZE", 1)
base_directory = model.Directory.from_dict(directory_example)
# Hardcoding another hash is enough since it's all that's being checked
directory_data = directory_example.copy()
directory_data["id"] = KNOWN_DIRECTORY_HASH_2
other_directory = model.Directory.from_dict(directory_data)
archive = FakeArchive(
contents=[model.Content.from_data(b"blabla")],
skipped_contents=[model.SkippedContent.from_data(b"blabla2", reason="reason")],
directories=[
base_directory,
other_directory,
],
)
assert archive.contents[0].sha1_git == KNOWN_CONTENT_HASH
assert archive.directories[0].id == KNOWN_DIRECTORY_HASH
assert archive.directories[1].id == KNOWN_DIRECTORY_HASH_2
(contents, skipped_contents, directories) = discovery.filter_known_objects(archive)
assert len(contents) == 0
assert len(skipped_contents) == 0
assert len(directories) == 0
This diff is collapsed.
......@@ -36,7 +36,6 @@ def blake2_hash_cache_reset():
@pytest.fixture
def hash_test_data():
class HashTestData:
data = b"1984\n"
hex_checksums = {
"sha1": "62be35bf00ff0c624f4a621e2ea5595a049e0731",
......@@ -112,34 +111,44 @@ def test_multi_hash_file_bytehexdigest(hash_test_data):
assert checksums == hash_test_data.bytehex_checksums
def test_multi_hash_file_with_md5(hash_test_data):
EXTRA_HASH_ALGOS = ["md5", "sha512"]
@pytest.mark.parametrize("hash_algo", EXTRA_HASH_ALGOS)
def test_multi_hash_file_with_extra_hash_algo(hash_test_data, hash_algo):
fobj = io.BytesIO(hash_test_data.data)
checksums = MultiHash.from_file(
fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=len(hash_test_data.data)
fobj,
hash_names=DEFAULT_ALGORITHMS | {hash_algo},
length=len(hash_test_data.data),
).digest()
md5sum = {"md5": hashlib.md5(hash_test_data.data).digest()}
assert checksums == {**hash_test_data.checksums, **md5sum}
checksum = {hash_algo: hashlib.new(hash_algo, hash_test_data.data).digest()}
assert checksums == {**hash_test_data.checksums, **checksum}
def test_multi_hash_file_hexdigest_with_md5(hash_test_data):
@pytest.mark.parametrize("hash_algo", EXTRA_HASH_ALGOS)
def test_multi_hash_file_hexdigest_with_extra_hash_algo(hash_test_data, hash_algo):
fobj = io.BytesIO(hash_test_data.data)
length = len(hash_test_data.data)
checksums = MultiHash.from_file(
fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length
fobj, hash_names=DEFAULT_ALGORITHMS | {hash_algo}, length=length
).hexdigest()
md5sum = {"md5": hashlib.md5(hash_test_data.data).hexdigest()}
assert checksums == {**hash_test_data.hex_checksums, **md5sum}
checksum = {hash_algo: hashlib.new(hash_algo, hash_test_data.data).hexdigest()}
assert checksums == {**hash_test_data.hex_checksums, **checksum}
def test_multi_hash_file_bytehexdigest_with_md5(hash_test_data):
@pytest.mark.parametrize("hash_algo", EXTRA_HASH_ALGOS)
def test_multi_hash_file_bytehexdigest_with_extra_algo(hash_test_data, hash_algo):
fobj = io.BytesIO(hash_test_data.data)
length = len(hash_test_data.data)
checksums = MultiHash.from_file(
fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length
fobj, hash_names=DEFAULT_ALGORITHMS | {hash_algo}, length=length
).bytehexdigest()
md5sum = {"md5": hash_to_bytehex(hashlib.md5(hash_test_data.data).digest())}
assert checksums == {**hash_test_data.bytehex_checksums, **md5sum}
checksum = {
hash_algo: hash_to_bytehex(hashlib.new(hash_algo, hash_test_data.data).digest())
}
assert checksums == {**hash_test_data.bytehex_checksums, **checksum}
def test_multi_hash_file_missing_length(hash_test_data):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.