diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py index a1583725c34a67e524ec7769d056c628025cc390..eb6c5c41daad27392a635696045a53475a4fbd2f 100644 --- a/swh/objstorage/backends/winery/sharedbase.py +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -118,6 +118,19 @@ class TemporaryShardLocker: class SharedBase(Database): + """The main database for a Winery instance. + + This handles access to the following tables: + + * ``shards`` is the list of shards and their associated :py:class:`ShardState`. + * ``signature2shard`` is the mapping between object ids and the shard that + contains the associated object. + + This class is also used to lock a shard for exclusive use (by moving it to a + locked state, and setting a locker id). + + """ + current_version: int = 2 def __init__( @@ -136,6 +149,7 @@ class SharedBase(Database): @property def locked_shard(self) -> str: + """The name of the shard that is currently locked for writing by this SharedBase.""" self.set_locked_shard() assert self._locked_shard, "failed to lock a shard" @@ -143,12 +157,16 @@ class SharedBase(Database): @property def locked_shard_id(self) -> int: + """The numeric ID of the shard that is currently locked for writing by this + :py:class`SharedBase`.""" self.set_locked_shard() assert self._locked_shard, "failed to lock a shard" return self._locked_shard[1] def set_locked_shard(self) -> None: + """Lock a shard in :py:const:`ShardState.STANDBY` for writing, creating a new + write shard (and the associated table) if none is currently available.""" if self._locked_shard is not None: return @@ -276,6 +294,18 @@ class SharedBase(Database): name: Optional[str] = None, db: Optional[psycopg.Connection] = None, ): + """Set the state of a given shard (or of the shard that is currently locked). + + Arguments: + new_state: the new :py:class:`ShardState` for the shard. + set_locker: whether the shard should be marked as locked by the current + :py:class:`SharedBase`. + check_locker: whether state change should only be accepted if the shard + is currently locked by us. + name: the name of the shard to change the state of (default to the currently + locked shard). + db: pass an existing psycopg connection to run this in an existing transaction. + """ if not name: if not self._locked_shard: raise ValueError("Can't set shard state, no shard specified or locked") @@ -325,6 +355,19 @@ class SharedBase(Database): self._locked_shard = None def create_shard(self, new_state: ShardState) -> Tuple[str, int]: + """Create a new write shard (locked by the current `SharedBase`), with a + generated name. + + Arguments: + new_state: the :py:class:`ShardState` for the new shard. + + Returns: + the name and numeric id of the newly created shard. + + Raises: + RuntimeError: if the shard creation failed (for instance if a shard + with an identical name was created concurrently). + """ name = uuid.uuid4().hex # # ensure the first character is not a number so it can be used as a @@ -353,6 +396,7 @@ class SharedBase(Database): return res def shard_packing_starts(self, name: str): + """Record the named shard as being packed now.""" with self.pool.connection() as db, db.transaction(): with db.cursor() as c: c.execute( @@ -380,7 +424,8 @@ class SharedBase(Database): db=db, ) - def shard_packing_ends(self, name): + def shard_packing_ends(self, name: str): + """Record the completion of packing shard ``name``.""" with self.pool.connection() as db, db.transaction(): with db.cursor() as c: c.execute( @@ -407,6 +452,11 @@ class SharedBase(Database): ) def get_shard_info(self, id: int) -> Optional[Tuple[str, ShardState]]: + """Get the name and :py:class:`ShardState` of the shard with the given ``id``. + + Returns: + :py:const:`None` if the shard with the given ``id`` doesn't exist. + """ with self.pool.connection() as db, db.cursor() as c: c.execute("SELECT name, state FROM shards WHERE id = %s", (id,)) row = c.fetchone() @@ -415,6 +465,11 @@ class SharedBase(Database): return (row[0], ShardState(row[1])) def get_shard_state(self, name: str) -> Optional[ShardState]: + """Get the :py:class:`ShardState` of the named shard. + + Returns: + :py:const:`None` if the shard with the given ``name`` doesn't exist. + """ with self.pool.connection() as db, db.cursor() as c: c.execute("SELECT state FROM shards WHERE name = %s", (name,)) row = c.fetchone() @@ -423,12 +478,25 @@ class SharedBase(Database): return ShardState(row[0]) def list_shards(self) -> Iterator[Tuple[str, ShardState]]: + """List all known shards and their current :py:class:`ShardState`.""" with self.pool.connection() as db, db.cursor() as c: c.execute("SELECT name, state FROM shards") for row in c: yield row[0], ShardState(row[1]) def count_objects(self, name: Optional[str] = None) -> Optional[int]: + """Count the known objects in a shard. + + Arguments: + name: the name of the shard in which objects should be counted + (defaults to the currently locked shard) + + Returns: + :py:const:`None` if no shard exists with the given ``name``. + + Raises: + ValueError: if no shard has been specified and no shard is currently locked. + """ if not name: if not self._locked_shard: raise ValueError("Can't count objects, no shard specified or locked") @@ -450,6 +518,11 @@ class SharedBase(Database): return row[1] def record_shard_mapped(self, host: str, name: str) -> Set[str]: + """Record that the ``name``d shard has been mapped on the given ``host``. + + This is used in the distributed winery mode to acknowledge shards that + have been seen by hosts, before the write shard is removed for cleanup. + """ with self.pool.connection() as db, db.transaction(): with db.cursor() as c: c.execute( @@ -473,7 +546,9 @@ class SharedBase(Database): ) return hosts - def contains(self, obj_id) -> Optional[int]: + def contains(self, obj_id: bytes) -> Optional[int]: + """Return the id of the shard which contains ``obj_id``, or :py:const`None` + if the object is not known (or deleted).""" with self.pool.connection() as db, db.cursor() as c: c.execute( "SELECT shard FROM signature2shard WHERE " @@ -486,12 +561,25 @@ class SharedBase(Database): return row[0] def get(self, obj_id) -> Optional[Tuple[str, ShardState]]: + """Return the name and :py:class:`ShardState` of the shard containing ``obj_id``, + or :py:const:`None` if the object is not known (or deleted).""" id = self.contains(obj_id) if id is None: return None return self.get_shard_info(id) - def record_new_obj_id(self, db, obj_id) -> Optional[int]: + def record_new_obj_id(self, db: psycopg.Connection, obj_id: bytes) -> Optional[int]: + """Try to record ``obj_id`` as present in the currently locked shard. + + Arguments: + db: a psycopg database with an open transaction + obj_id: the id of the object being added + + Returns: + The numeric id of the shard in which the object is recorded as present + (which can differ from the currently locked shard, if the object was + added in another concurrent transaction). + """ db.execute( "INSERT INTO signature2shard (signature, shard, state) " "VALUES (%s, %s, 'present') ON CONFLICT (signature) DO NOTHING", @@ -500,11 +588,15 @@ class SharedBase(Database): cur = db.execute( "SELECT shard FROM signature2shard WHERE signature = %s", (obj_id,) ) - return cur.fetchone()[0] + res = cur.fetchone() + if not res: + raise RuntimeError("Could not record the object in any shard?") + return res[0] def list_signatures( self, after_id: Optional[bytes] = None, limit: Optional[int] = None ) -> Iterator[bytes]: + """List ``limit`` known object ids after ``after_id``.""" with self.pool.connection() as db: cur = db.execute( """SELECT signature @@ -519,7 +611,8 @@ class SharedBase(Database): for row in cur: yield row[0] - def delete(self, obj_id): + def delete(self, obj_id: bytes): + """Mark ``obj_id`` for deletion.""" with self.pool.connection() as db: db.execute( "UPDATE signature2shard SET state = 'deleted' WHERE signature = %s", @@ -527,6 +620,12 @@ class SharedBase(Database): ) def deleted_objects(self) -> Iterator[Tuple[bytes, str, ShardState]]: + """List all objects marked for deletion, with the name and state of the + shard in which the object is stored. + + Returns: + an iterator over ``object_id``, shard name, :py:class:`ShardState` tuples + """ with self.pool.connection() as db: cur = db.execute( """SELECT signature, shards.name, shards.state @@ -539,5 +638,6 @@ class SharedBase(Database): yield bytes(signature), name, ShardState(state) def clean_deleted_object(self, obj_id) -> None: + """Remove the reference to the deleted object ``obj_id``.""" with self.pool.connection() as db: db.execute("DELETE FROM signature2shard WHERE signature = %s", (obj_id,))