From 639262db913070f12d5c40e6af223d6049c3ad53 Mon Sep 17 00:00:00 2001 From: "Antoine R. Dumont (@ardumont)" <antoine.romain.dumont@gmail.com> Date: Sat, 9 Apr 2016 19:52:59 +0200 Subject: [PATCH] Add simple cache with size --- swh/loader/vcs/cache.py | 57 ++++++++++++++++++++++++++++++ swh/loader/vcs/loader.py | 46 +++++++++++++++--------- swh/loader/vcs/tests/test_cache.py | 50 ++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 16 deletions(-) create mode 100644 swh/loader/vcs/cache.py create mode 100644 swh/loader/vcs/tests/test_cache.py diff --git a/swh/loader/vcs/cache.py b/swh/loader/vcs/cache.py new file mode 100644 index 00000000..f3ada05e --- /dev/null +++ b/swh/loader/vcs/cache.py @@ -0,0 +1,57 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import deque + + +class SimpleCache(): + def __init__(self, max_size=10000, eviction_percent=0.2): + """Initialize cache of max_size elements. + + Args: + + - max_size: the max number of elements to cache. + - eviction_percent: Percent of elements to evict from cache + when max_size is reached. The eviction removes the first + elements from the cache. + + """ + self.max_size = max_size + assert eviction_percent >= 0 and eviction_percent <= 1 + self.nb_elements_to_purge = int(max_size * eviction_percent) + self.s = set() + self.stack = deque([], maxlen=max_size) + self.count = 0 + + def __str__(self): + return ('set: %s, stack: %s, count: %s, max-size: %s, nb-purge: %s' % ( + self.s, + self.stack, + self.count, + self.max_size, + self.nb_elements_to_purge)) + + def _evict(self): + """Remove self.nb_elements_to_purge from cache. + + """ + elems_to_remove = set() + for x in range(0, self.nb_elements_to_purge): + e = self.stack.popleft() + elems_to_remove.add(e) + self.s = self.s - elems_to_remove + self.count = self.count - self.nb_elements_to_purge + + def add(self, e): + if e not in self.s: + self.s.add(e) + self.stack.append(e) + self.count += 1 + + if self.count >= self.max_size: + self._evict() + + def set(self): + return self.s diff --git a/swh/loader/vcs/loader.py b/swh/loader/vcs/loader.py index 81ed900b..9ab9ac9c 100644 --- a/swh/loader/vcs/loader.py +++ b/swh/loader/vcs/loader.py @@ -17,6 +17,7 @@ from . import converters from swh.model.git import GitType from swh.storage import get_storage +from .cache import SimpleCache from .queue import QueuePerSizeAndNbUniqueElements from .queue import QueuePerNbUniqueElements from .queue import QueuePerNbElements @@ -65,30 +66,32 @@ class SWHLoader(config.SWHConfig): self.log = logging.getLogger(logging_class) + self.max_content_size = config['content_packet_size_bytes'] + self.contents = QueuePerSizeAndNbUniqueElements( key='sha1', max_nb_elements=self.config['content_packet_size'], max_size=self.config['content_packet_block_size_bytes']) - self.contents_seen = set() + self.contents_seen = SimpleCache() self.directories = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['directory_packet_size']) - self.directories_seen = set() + self.directories_seen = SimpleCache() self.revisions = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['revision_packet_size']) - self.revisions_seen = set() + self.revisions_seen = SimpleCache() self.releases = QueuePerNbUniqueElements( key='id', max_nb_elements=self.config['release_packet_size']) - self.releases_seen = set() + self.releases_seen = SimpleCache() self.occurrences = QueuePerNbElements( self.config['occurrence_packet_size']) @@ -210,20 +213,22 @@ class SWHLoader(config.SWHConfig): """Filter missing blob from swh. """ - max_content_size = self.config['content_packet_size_bytes'] blobs_per_sha1 = {} shallow_blobs = [] - for key, blob in ((b['sha1'], b) for b in blobs - if b['sha1'] not in self.contents_seen): + for blob in blobs: + key = blob['sha1'] + if key in self.contents_seen.set(): + continue blobs_per_sha1[key] = blob shallow_blobs.append(converters.shallow_blob(blob)) self.contents_seen.add(key) for sha1 in self.storage.content_missing(shallow_blobs, key_hash='sha1'): - yield converters.blob_to_content(blobs_per_sha1[sha1], - max_content_size=max_content_size, - origin_id=self.origin_id) + yield converters.blob_to_content( + blobs_per_sha1[sha1], + max_content_size=self.max_content_size, + origin_id=self.origin_id) def bulk_send_blobs(self, blobs): """Format blobs as swh contents and send them to the database""" @@ -238,8 +243,11 @@ class SWHLoader(config.SWHConfig): """ trees_per_sha1 = {} shallow_trees = [] - for key, tree in ((t['sha1_git'], t) for t in trees - if t['sha1_git'] not in self.directories_seen): + for tree in trees: + key = tree['sha1_git'] + if key in self.directories_seen.set(): + continue + trees_per_sha1[key] = tree shallow_trees.append(converters.shallow_tree(tree)) self.directories_seen.add(key) @@ -261,8 +269,11 @@ class SWHLoader(config.SWHConfig): """ commits_per_sha1 = {} shallow_commits = [] - for key, commit in ((c['id'], c) for c in commits - if c['id'] not in self.revisions_seen): + for commit in commits: + key = commit['id'] + if key in self.revisions_seen.set(): + continue + commits_per_sha1[key] = commit shallow_commits.append(converters.shallow_commit(commit)) self.revisions_seen.add(key) @@ -287,8 +298,11 @@ class SWHLoader(config.SWHConfig): """ tags_per_sha1 = {} shallow_tags = [] - for key, tag in ((t['id'], t) for t in tags - if t['id'] not in self.releases_seen): + for tag in tags: + key = tag['id'] + if key in self.releases_seen.set(): + continue + tags_per_sha1[key] = tag shallow_tags.append(converters.shallow_tag(tag)) self.releases_seen.add(key) diff --git a/swh/loader/vcs/tests/test_cache.py b/swh/loader/vcs/tests/test_cache.py new file mode 100644 index 00000000..7f841f18 --- /dev/null +++ b/swh/loader/vcs/tests/test_cache.py @@ -0,0 +1,50 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from nose.tools import istest + +from swh.loader.vcs.cache import SimpleCache + + +class TestSimpleCache(unittest.TestCase): + + @istest + def simple_cache_behavior_fails_to_init(self): + try: + SimpleCache(max_size=6, eviction_percent=10) + except AssertionError: + self.assertTrue(True) + + @istest + def simple_cache_behavior(self): + # given + cache = SimpleCache(max_size=6, eviction_percent=0.5) + + cache.add(3) + cache.add(2) + cache.add(1) + cache.add(1) # duplicate elements are dismissed + + # when + self.assertEquals(cache.set(), {1, 2, 3}) + self.assertEquals(cache.count, 3) + + cache.add(4) + cache.add(5) + + self.assertEquals(cache.set(), {1, 2, 3, 4, 5}) + self.assertEquals(cache.count, 5) + + cache.add(6) # we hit max-size, 50% of elements (here 3) are evicted + + self.assertEquals(cache.set(), {4, 5, 6}) + self.assertEquals(cache.count, 3) + + cache.add(7) + cache.add(8) + self.assertEquals(cache.set(), {4, 5, 6, 7, 8}) + self.assertEquals(cache.count, 5) -- GitLab