Skip to content
Snippets Groups Projects
Commit 7bddfcee authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

New upstream version 0.0.8

parents 82a486d1 d124e6e9
No related branches found
Tags debian/upstream/0.0.8
No related merge requests found
Metadata-Version: 1.0
Name: swh.model
Version: 0.0.7
Version: 0.0.8
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
......
Metadata-Version: 1.0
Name: swh.model
Version: 0.0.7
Version: 0.0.8
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
......
......@@ -186,8 +186,116 @@ def compute_tree_metadata(dirname, ls_hashes):
}
def default_validation_dir(dirpath):
"""Default validation function.
This is the equivalent of the identity function.
Args:
dirpath: Path to validate
Returns: True
"""
return True
def __walk(rootdir,
dir_ok_fn=default_validation_dir,
remove_empty_folder=False):
"""Walk the filesystem and yields a 3 tuples (dirpath, dirnames as set
of absolute paths, filenames as set of abslute paths)
Ignore files which won't pass the dir_ok_fn validation.
If remove_empty_folder is True, remove and ignore any
encountered empty folder.
Args:
- rootdir: starting walk root directory path
- dir_ok_fn: validation function. if folder encountered are
not ok, they are ignored. Default to default_validation_dir
which does nothing.
- remove_empty_folder: Flag to remove and ignore any
encountered empty folders.
Yields:
3 tuples dirpath, set of absolute children dirname paths, set
of absolute filename paths.
"""
def basic_gen_dir(rootdir):
for dp, dns, fns in os.walk(rootdir, topdown=False):
yield (dp,
set((os.path.join(dp, dn) for dn in dns)),
set((os.path.join(dp, fn) for fn in fns)))
if dir_ok_fn == default_validation_dir:
if not remove_empty_folder: # os.walk
yield from basic_gen_dir(rootdir)
else: # os.walk + empty dir cleanup
empty_folders = set()
for dp, dns, fns in basic_gen_dir(rootdir):
if not dns and not fns:
empty_folders.add(dp)
# need to remove it because folder of empty folder
# is an empty folder!!!
if os.path.islink(dp):
os.remove(dp)
else:
os.rmdir(dp)
parent = os.path.dirname(dp)
# edge case about parent containing one empty
# folder which become an empty one
while not os.listdir(parent):
empty_folders.add(parent)
if os.path.islink(parent):
os.remove(parent)
else:
os.rmdir(parent)
parent = os.path.dirname(parent)
continue
yield (dp, dns - empty_folders, fns)
else:
def filtfn(dirnames):
return set(filter(dir_ok_fn, dirnames))
gen_dir = ((dp, dns, fns) for dp, dns, fns
in basic_gen_dir(rootdir) if dir_ok_fn(dp))
if not remove_empty_folder: # os.walk + filtering
for dp, dns, fns in gen_dir:
yield (dp, filtfn(dns), fns)
else: # os.walk + filtering + empty dir cleanup
empty_folders = set()
for dp, dns, fns in gen_dir:
dps = filtfn(dns)
if not dps and not fns:
empty_folders.add(dp)
# need to remove it because folder of empty folder
# is an empty folder!!!
if os.path.islink(dp):
print('remove link to empty folder')
os.remove(dp)
else:
print('remove empty folder')
os.rmdir(dp)
parent = os.path.dirname(dp)
# edge case about parent containing one empty
# folder which become an empty one
while not os.listdir(parent):
empty_folders.add(parent)
if os.path.islink(parent):
os.remove(parent)
else:
os.rmdir(parent)
parent = os.path.dirname(parent)
continue
yield dp, dps - empty_folders, fns
def walk_and_compute_sha1_from_directory(rootdir,
dir_ok_fn=lambda dirpath: True,
dir_ok_fn=default_validation_dir,
with_root_tree=True,
remove_empty_folder=False):
"""Compute git sha1 from directory rootdir.
......@@ -228,53 +336,27 @@ def walk_and_compute_sha1_from_directory(rootdir,
if rootdir.endswith(b'/'):
rootdir = rootdir.rstrip(b'/')
def filtfn(dirpath, dirnames):
return list(filter(lambda dirname: dir_ok_fn(os.path.join(dirpath,
dirname)),
dirnames))
if remove_empty_folder: # round-trip to remove empty folders
gen_dir = ((dp, filtfn(dp, dns), fns) for (dp, dns, fns)
in os.walk(rootdir, topdown=False)
if dir_ok_fn(dp))
for dirpath, dirnames, filenames in gen_dir:
if dirnames == [] and filenames == []:
if os.path.islink(dirpath):
os.remove(dirpath)
else:
os.removedirs(dirpath)
gen_dir = ((dp, filtfn(dp, dns), fns) for (dp, dns, fns)
in os.walk(rootdir, topdown=False)
if dir_ok_fn(dp))
for dirpath, dirnames, filenames in gen_dir:
for dirpath, dirnames, filenames in __walk(
rootdir, dir_ok_fn, remove_empty_folder):
hashes = []
links = (os.path.join(dirpath, file)
for file in (filenames+dirnames)
if os.path.islink(os.path.join(dirpath, file)))
links = (file
for file in filenames.union(dirnames)
if os.path.islink(file))
for linkpath in links:
all_links.add(linkpath)
m_hashes = compute_link_metadata(linkpath)
hashes.append(m_hashes)
only_files = (os.path.join(dirpath, file)
for file in filenames
if os.path.join(dirpath, file) not in all_links)
for filepath in only_files:
for filepath in (file for file in filenames if file not in all_links):
m_hashes = compute_blob_metadata(filepath)
hashes.append(m_hashes)
ls_hashes[dirpath] = hashes
dir_hashes = []
subdirs = (os.path.join(dirpath, dir)
for dir in dirnames
if os.path.join(dirpath, dir)
not in all_links)
for fulldirname in subdirs:
for fulldirname in (dir for dir in dirnames if dir not in all_links):
tree_hash = compute_tree_metadata(fulldirname, ls_hashes)
dir_hashes.append(tree_hash)
......@@ -398,7 +480,7 @@ def commonpath(paths):
def __remove_paths_from_objects(objects, rootpaths,
dir_ok_fn=lambda dirpath: True):
dir_ok_fn=default_validation_dir):
"""Given top paths to remove, remove all paths and descendants from
objects.
......@@ -441,7 +523,7 @@ def __remove_paths_from_objects(objects, rootpaths,
def update_checksums_from(changed_paths, objects,
dir_ok_fn=lambda dirpath: True,
dir_ok_fn=default_validation_dir,
remove_empty_folder=False):
"""Given a list of changed paths, recompute the checksums only where
needed.
......
......@@ -149,17 +149,17 @@ class GitHashWalkArborescenceTree(unittest.TestCase):
self.maxDiff = None
start_path = os.path.dirname(__file__).encode('utf-8')
pkg_doc_linux_r11 = os.path.join(start_path,
b'../../../..',
b'swh-storage-testdata',
b'dir-folders',
b'sample-folder.tgz')
sample_folder = os.path.join(start_path,
b'../../../..',
b'swh-storage-testdata',
b'dir-folders',
b'sample-folder.tgz')
self.root_path = os.path.join(self.tmp_root_path, b'sample-folder')
# uncompress the sample folder
subprocess.check_output(
['tar', 'xvf', pkg_doc_linux_r11, '-C', self.tmp_root_path])
['tar', 'xvf', sample_folder, '-C', self.tmp_root_path])
def tearDown(self):
if os.path.exists(self.tmp_root_path):
......@@ -276,6 +276,97 @@ class GitHashUpdate(GitHashWalkArborescenceTree):
self.assertEquals(expected_dict, actual_dict)
@istest
def update_checksums_from_add_new_file_with_validation(self):
# make a temporary arborescence tree to hash without ignoring anything
# update the disk in some way (add a new file)
# update the actual git checksums from the deeper tree modified
# + Add some validation on some file to ignore
def dir_ok_fn(dirpath):
return b'empty-folder' not in dirpath
# when
objects = git.walk_and_compute_sha1_from_directory(
self.tmp_root_path, dir_ok_fn=dir_ok_fn)
# update the existing file
changed_path = os.path.join(self.tmp_root_path,
b'sample-folder/bar/barfoo/new')
with open(changed_path, 'wb') as f:
f.write(b'new line')
# walk1 (this will be our expectation)
expected_dict = git.walk_and_compute_sha1_from_directory(
self.tmp_root_path, dir_ok_fn=dir_ok_fn)
# then
actual_dict = git.update_checksums_from(
[{'path': changed_path, 'action': 'A'}],
objects)
self.assertEquals(expected_dict, actual_dict)
@istest
def update_checksums_from_add_new_file_remove_empty_folder(self):
# make a temporary arborescence tree to hash without ignoring anything
# update the disk in some way (add a new file)
# update the actual git checksums from the deeper tree modified
# + Add some validation on some file to ignore
# when
objects = git.walk_and_compute_sha1_from_directory(
self.tmp_root_path, remove_empty_folder=True)
# update the existing file
changed_path = os.path.join(self.tmp_root_path,
b'sample-folder/bar/barfoo/new')
with open(changed_path, 'wb') as f:
f.write(b'new line')
# walk1 (this will be our expectation)
expected_dict = git.walk_and_compute_sha1_from_directory(
self.tmp_root_path, remove_empty_folder=True)
# then
actual_dict = git.update_checksums_from(
[{'path': changed_path, 'action': 'A'}],
objects)
self.assertEquals(expected_dict, actual_dict)
@istest
def update_checksums_new_file_with_validation_and_ignore_empty_dir(self):
# make a temporary arborescence tree to hash without ignoring anything
# update the disk in some way (add a new file)
# update the actual git checksums from the deeper tree modified
# + Add some validation on some file to ignore
# + ignore empty folder
def dir_ok_fn(dirpath):
return b'some-binary' not in dirpath
# when
objects = git.walk_and_compute_sha1_from_directory(
self.tmp_root_path, dir_ok_fn=dir_ok_fn, remove_empty_folder=True)
# update the existing file
changed_path = os.path.join(self.tmp_root_path,
b'sample-folder/bar/barfoo/new')
with open(changed_path, 'wb') as f:
f.write(b'new line')
# walk1 (this will be our expectation)
expected_dict = git.walk_and_compute_sha1_from_directory(
self.tmp_root_path, dir_ok_fn=dir_ok_fn, remove_empty_folder=True)
# then
actual_dict = git.update_checksums_from(
[{'path': changed_path, 'action': 'A'}],
objects)
self.assertEquals(expected_dict, actual_dict)
@istest
def update_checksums_from_modify_existing_file(self):
# make a temporary arborescence tree to hash without ignoring anything
......
v0.0.7-0-g22b9fca
\ No newline at end of file
v0.0.8-0-gd124e6e
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment