Skip to content
Snippets Groups Projects
Commit b496be4e authored by Antoine Pietri's avatar Antoine Pietri
Browse files

seirl: add swh-dedup

parent d43f2eaf
No related branches found
No related tags found
No related merge requests found
import logging
import sys
from multiprocessing import Process, Queue
from swh.model.hashutil import hash_to_bytes
from swh.objstorage.exc import ObjNotFoundError
from deduper.deduper import Deduper
NUM_WORKERS = 1
def dedup_worker(task_queue, result_queue):
while True:
content_id = task_queue.get()
if content_id is None: # no more tasks
break
try:
Deduper().dedup(hash_to_bytes(content_id))
result_queue.put((content_id, True))
except ObjNotFoundError:
logging.warning('cannot find object "%s", skipping' % content_id)
result_queue.put((content_id, False))
def progress_monitor(result_queue):
obj_count = 0
while True:
(content_id, _success) = result_queue.get()
obj_count += 1
if obj_count % 1000 == 0:
logging.info('processed %d objects, currently at %s' %
(obj_count, content_id))
def main():
task_queue = Queue()
result_queue = Queue()
workers = []
for i in range(0, NUM_WORKERS):
p = Process(target=dedup_worker,
args=(task_queue, result_queue))
workers.append(p)
p.start()
monitor = Process(target=progress_monitor, args=(result_queue,))
monitor.start()
for line in sys.stdin: # schedule tasks
content_id = line.rstrip()
task_queue.put(content_id)
for i in range(0, NUM_WORKERS): # tell workers we're done
task_queue.put(None)
for p in workers: # wait for completion
p.join()
monitor.terminate()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()
import borg.chunker
import io
import math
import rabin
import zlib
from hashlib import sha1
def buzhash_chunk(content, params):
args = {
'seed': params.get('seed', 0),
'chunk_min_exp': int(math.log2(params.get('min_block_size'))),
'chunk_max_exp': int(math.log2(params.get('max_block_size'))),
'hash_mask_bits': int(math.log2(params.get('average_block_size'))),
'hash_window_size': params.get('window_size')
}
chunker = borg.chunker.Chunker(**args)
buf = bytearray()
for data in content:
buf.extend(data)
pos = 0
for chunk in chunker.chunkify(io.BytesIO(buf)):
yield pos, len(chunk)
pos += len(chunk)
def rabin_chunk(content, params):
if 'prime' in params:
rabin.set_prime(params['prime'])
if 'window_size' in params:
rabin.set_window_size(params['window_size'])
if 'min_block_size' in params:
rabin.set_min_block_size(params['min_block_size'])
if 'average_block_size' in params:
rabin.set_average_block_size(params['average_block_size'])
if 'max_block_size' in params:
rabin.set_max_block_size(params['max_block_size'])
r = rabin.Rabin()
buf = bytearray()
for data in content:
buf.extend(data) # TODO avoid loading the entire content in memory
r.update(data)
if buf: # r.fingerprints() invoked on empty objects segfaults :-(
for position, length, _fpr in r.fingerprints():
yield position, length
r.clear()
ALGOS = {
'rabin': rabin_chunk,
'buzhash': buzhash_chunk
}
def chunk(algo, params, content):
f = ALGOS[algo]
for position, length in f(content, params):
chunk = content[position:(position+length)]
chunk_id = sha1(chunk).digest()
compressed_size = len(zlib.compress(chunk))
yield chunk_id, position, length, compressed_size
#!/usr/bin/python3
"""compute Rabin fingerprints of Software Heritage content objects
Read a list of Software Heritage content object IDs on standard input, fetch
each of them from a (local) object storage and apply Rabin fingerprinting to
its content. Store in a (postgres) DB the mapping between content objects and
(Rabin-delimited) chunks.
"""
import magic
import psycopg2
import zlib
from psycopg2.extras import execute_values, RealDictCursor
from swh.objstorage import PathSlicingObjStorage as ObjStorage
from deduper.chunkers import chunk
OBJS_ROOT = '/srv/softwareheritage/objects'
OBJS_SLICING = '0:2/2:4'
DB_SERVICE = 'swh-dedup' # postgres service name
class Deduper:
def __init__(self):
self.db_conn = psycopg2.connect('service=%s' % DB_SERVICE)
self.obj_storage = ObjStorage(OBJS_ROOT, OBJS_SLICING)
def dedup(self, content_id):
content = self.obj_storage.get(content_id)
self._insert_content(content_id, content)
# get list of methods not yet sweeped
with self.db_conn.cursor(cursor_factory=RealDictCursor) as c:
c.execute("""SELECT id, algo, min_block_size, average_block_size,
max_block_size, window_size
FROM chunking_method
LEFT JOIN chunked_content
ON method_id = chunking_method.id
WHERE content_id = %s AND method_id IS NULL""",
(content_id,))
methods = c.fetchall()
for method in methods:
method_id = method['id']
algo = method['algo']
params = {
'min_block_size': method['min_block_size'],
'average_block_size': method['average_block_size'],
'max_block_size': method['max_block_size'],
'window_size': method['window_size'],
}
chunks = list(chunk(algo, params, content))
self._insert_chunks(content_id, method_id, chunks)
def _insert_content(self, content_id, content):
size = len(content)
compressed_size = len(zlib.compress(content))
ftype = magic.from_buffer(content)
with self.db_conn.cursor() as cur:
cur.execute("""INSERT INTO content
(id, length, compressed_length, file_type)
VALUES (%s, %s, %s, %s)
ON CONFLICT DO NOTHING""",
(content_id, size, compressed_size, ftype))
def _insert_chunks(self, content_id, method_id, chunks):
chunk_values = []
chunked_content_values = []
for (chunk_id, position, length, compressed_length) in chunks:
chunk_values.append((chunk_id, length, compressed_length))
chunked_content_values.append((content_id, chunk_id, method_id,
position))
with self.db_conn.cursor() as cur:
execute_values(cur, """INSERT INTO chunk
(id, length, compressed_length)
VALUES %s
ON CONFLICT DO NOTHING""",
chunk_values)
execute_values(cur, """INSERT INTO chunked_content
(content_id, chunk_id, method_id, position)
VALUES %s""",
chunked_content_values)
cython
python3-dev
libacl1-dev
borgbackup
pyrabin
python-magic
#!/bin/bash
db_name=swh-dedup2
db_service=$db_name
sudo -u postgres dropdb -p 5433 $db_name
sudo -u postgres createdb -p 5433 -O swhdev $db_name
psql service=$db_service -f swh-dedup-blocks.sql
time \
find /srv/softwareheritage/objects -type f \
| sort \
| cut -f 7 -d/ \
| ./swh-dedup-blocks.py
CREATE DOMAIN sha1 AS bytea CHECK (length(value) = 20);
CREATE TABLE content (
id sha1 PRIMARY KEY, -- SHA1 checksum
length integer,
compressed_length integer
file_type text,
);
CREATE TABLE chunk (
id sha1 PRIMARY KEY, -- SHA1 checksum
length integer,
compressed_length integer
);
CREATE TYPE chunking_algo as enum ('rabin', 'buzhash');
CREATE TABLE chunking_method (
id serial PRIMARY KEY,
algo chunking_algo,
min_block_size integer,
average_block_size integer,
max_block_size integer,
window_size integer,
);
CREATE TABLE chunked_content (
content_id sha1 REFERENCES content(id),
chunk_id sha1 REFERENCES chunk(id),
method_id integer REFERENCES chunking_method(id),
position integer
);
CREATE INDEX ON chunked_content (content_id);
CREATE INDEX ON chunked_content (method_id);
CREATE UNIQUE INDEX ON chunked_content (content_id, chunk_id, method_id, position);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment