From 465353894cf1e047d17121106bb17202360d96da Mon Sep 17 00:00:00 2001 From: David Douard <david.douard@sdfa3.org> Date: Tue, 23 Apr 2024 12:34:55 +0200 Subject: [PATCH 1/2] Add an origin blocking proxy This proxy prevent registered origins from being visited again. If an origin url is matching a blocking rule, then any attempt to add an Origin, OriginVisit or OriginVisitStatus object targeting this url will be blocked, raising a BlockedOriginException. This is implemented in a similar fashion than the MaskingProxy, sharing the same management logic as this later. The url matching rules are, given a checked URL: - check for an exact match in the blocking rules on: 1. the given URL 2. the trimmed URL (if it has a trailing /) 3. the extension-less URL if it ends with a know suffix (eg. '.git') - if no exact match is found, look for the best prefix match on split sub-path urls (aka the longest url match in the blocking rules for which the URL starts with the match, splitting on '/') --- pyproject.toml | 2 + swh/storage/exc.py | 22 + swh/storage/proxies/blocking/__init__.py | 125 ++++ swh/storage/proxies/blocking/cli.py | 342 ++++++++++ swh/storage/proxies/blocking/db.py | 430 +++++++++++++ .../blocking/sql/10-superuser-init.sql | 1 + .../proxies/blocking/sql/30-schema.sql | 40 ++ .../proxies/blocking/sql/60-indexes.sql | 5 + swh/storage/tests/blocking/__init__.py | 0 swh/storage/tests/blocking/conftest.py | 37 ++ swh/storage/tests/blocking/test_cli.py | 585 ++++++++++++++++++ swh/storage/tests/blocking/test_db.py | 280 +++++++++ swh/storage/tests/blocking/test_proxy.py | 368 +++++++++++ .../tests/blocking/test_proxy_blocking.py | 87 +++ .../tests/blocking/test_proxy_no_blocking.py | 36 ++ 15 files changed, 2360 insertions(+) create mode 100644 swh/storage/proxies/blocking/__init__.py create mode 100644 swh/storage/proxies/blocking/cli.py create mode 100644 swh/storage/proxies/blocking/db.py create mode 100644 swh/storage/proxies/blocking/sql/10-superuser-init.sql create mode 100644 swh/storage/proxies/blocking/sql/30-schema.sql create mode 100644 swh/storage/proxies/blocking/sql/60-indexes.sql create mode 100644 swh/storage/tests/blocking/__init__.py create mode 100644 swh/storage/tests/blocking/conftest.py create mode 100644 swh/storage/tests/blocking/test_cli.py create mode 100644 swh/storage/tests/blocking/test_db.py create mode 100644 swh/storage/tests/blocking/test_proxy.py create mode 100644 swh/storage/tests/blocking/test_proxy_blocking.py create mode 100644 swh/storage/tests/blocking/test_proxy_no_blocking.py diff --git a/pyproject.toml b/pyproject.toml index 079c6b791..87e42f85a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,10 +42,12 @@ journal = {file = ["requirements-swh-journal.txt"]} "validate" = "swh.storage.proxies.validate:ValidatingProxyStorage" "record_references" = "swh.storage.proxies.record_references:RecordReferencesProxyStorage" "masking" = "swh.storage.proxies.masking:MaskingProxyStorage" +"blocking" = "swh.storage.proxies.blocking:BlockingProxyStorage" [project.entry-points."swh.cli.subcommands"] "swh.storage" = "swh.storage.cli" "swh.storage.proxies.masking" = "swh.storage.proxies.masking.cli" +"swh.storage.proxies.blocking" = "swh.storage.proxies.blocking.cli" [project.urls] "Homepage" = "https://gitlab.softwareheritage.org/swh/devel/swh-storage" diff --git a/swh/storage/exc.py b/swh/storage/exc.py index db7730070..a41b740d4 100644 --- a/swh/storage/exc.py +++ b/swh/storage/exc.py @@ -10,6 +10,7 @@ from swh.storage.utils import content_bytes_hashes, content_hex_hashes if TYPE_CHECKING: from swh.model.swhids import ExtendedSWHID + from swh.storage.proxies.blocking.db import BlockingStatus from swh.storage.proxies.masking.db import MaskedStatus @@ -61,6 +62,27 @@ class UnknownMetadataFetcher(StorageArgumentException): pass +class BlockedOriginException(NonRetryableException): + """Raised when the blocking proxy prevent from inserting a blocked origin""" + + def __init__(self, blocked: "Dict[str, BlockingStatus]"): + blocked = { + url: status + for url, status in blocked.items() + if status.state.name != "NON_BLOCKED" + } + if not blocked: + raise ValueError( + "Can't raise a BlockedOriginException if no origin is actually blocked" + ) + + self.blocked = blocked + super().__init__(blocked) + + def __str__(self): + return "Some origins are blocked: %s" % ", ".join(self.blocked) + + class MaskedObjectException(NonRetryableException): """Raised when the masking proxy attempts to return a masked object""" diff --git a/swh/storage/proxies/blocking/__init__.py b/swh/storage/proxies/blocking/__init__.py new file mode 100644 index 000000000..6a08f6a87 --- /dev/null +++ b/swh/storage/proxies/blocking/__init__.py @@ -0,0 +1,125 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager +from typing import Dict, Iterable, Iterator, List, Union + +import psycopg2.pool + +from swh.model.model import Origin, OriginVisit, OriginVisitStatus +from swh.storage import get_storage +from swh.storage.exc import BlockedOriginException +from swh.storage.interface import StorageInterface +from swh.storage.metrics import DifferentialTimer +from swh.storage.proxies.blocking.db import BlockingState + +from .db import BlockingQuery + +BLOCKING_OVERHEAD_METRIC = "swh_storage_blocking_overhead_seconds" + + +def get_datastore(cls, db): + assert cls == "postgresql" + from .db import BlockingAdmin + + return BlockingAdmin.connect(db) + + +def blocking_overhead_timer(method_name: str) -> DifferentialTimer: + """Return a properly setup DifferentialTimer for ``method_name`` of the storage""" + return DifferentialTimer(BLOCKING_OVERHEAD_METRIC, tags={"endpoint": method_name}) + + +class BlockingProxyStorage: + """Blocking storage proxy + + This proxy prevents visits from a known list of origins to be performed at all. + + It uses a specific PostgreSQL database (which for now is colocated with the + swh.storage PostgreSQL database), the access to which is implemented in the + :mod:`.db` submodule. + + Sample configuration + + .. code-block: yaml + + storage: + cls: blocking + blocking_db: 'dbname=swh-blocking-proxy' + max_pool_conns: 10 + storage: + - cls: remote + url: http://storage.internal.staging.swh.network:5002/ + + """ + + def __init__( + self, + blocking_db: str, + storage: Union[Dict, StorageInterface], + min_pool_conns: int = 1, + max_pool_conns: int = 5, + ): + self.storage: StorageInterface = ( + get_storage(**storage) if isinstance(storage, dict) else storage + ) + + self._blocking_pool = psycopg2.pool.ThreadedConnectionPool( + min_pool_conns, max_pool_conns, blocking_db + ) + + def origin_visit_status_add( + self, visit_statuses: List[OriginVisitStatus] + ) -> Dict[str, int]: + with self._blocking_query() as q: + statuses = q.origins_are_blocked([v.origin for v in visit_statuses]) + if statuses and any( + status.state != BlockingState.NON_BLOCKED + for status in statuses.values() + ): + raise BlockedOriginException(statuses) + return self.storage.origin_visit_status_add(visit_statuses) + + def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: + with self._blocking_query() as q: + statuses = q.origins_are_blocked([v.origin for v in visits]) + if statuses and any( + status.state != BlockingState.NON_BLOCKED + for status in statuses.values() + ): + raise BlockedOriginException(statuses) + return self.storage.origin_visit_add(visits) + + def origin_add(self, origins: List[Origin]) -> Dict[str, int]: + with self._blocking_query() as q: + statuses = {} + for origin in origins: + status = q.origin_is_blocked(origin.url) + if status and status.state != BlockingState.NON_BLOCKED: + statuses[origin.url] = status + if statuses: + raise BlockedOriginException(statuses) + return self.storage.origin_add(origins) + + @contextmanager + def _blocking_query(self) -> Iterator[BlockingQuery]: + ret = None + try: + ret = BlockingQuery.from_pool(self._blocking_pool) + yield ret + finally: + if ret: + ret.put_conn() + + def __getattr__(self, key): + method = getattr(self.storage, key) + if method: + # Avoid going through __getattr__ again next time + setattr(self, key, method) + return method + + # Raise a NotImplementedError to make sure we don't forget to add + # masking to any new storage functions + raise NotImplementedError(key) diff --git a/swh/storage/proxies/blocking/cli.py b/swh/storage/proxies/blocking/cli.py new file mode 100644 index 000000000..317e727e3 --- /dev/null +++ b/swh/storage/proxies/blocking/cli.py @@ -0,0 +1,342 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import TYPE_CHECKING, Iterator, List, Optional, TextIO + +import click + +from swh.storage.cli import storage as storage_cli_group +from swh.storage.proxies.masking.cli import EditAborted, edit_message, output_message + +if TYPE_CHECKING: + from .db import BlockingRequest, BlockingState + + +@storage_cli_group.group(name="blocking") +@click.pass_context +def blocking_cli_group(ctx: click.Context) -> click.Context: + """Configure blocking of origins, preventing them from being archived + + These tools require read/write access to the blocking database. + An entry must be added to the configuration file as follow:: + + \b + storage: + … + \b + blocking_admin: + blocking_db: "service=swh-blocking-admin" + """ + + if ( + "blocking_admin" not in ctx.obj["config"] + or "blocking_db" not in ctx.obj["config"]["blocking_admin"] + ): + ctx.fail( + "You must have a blocking_admin section, with a blocking_db entry, " + "configured in your config file." + ) + + from psycopg2 import OperationalError + + from .db import BlockingAdmin + + try: + ctx.obj["blocking_admin"] = BlockingAdmin.connect( + ctx.obj["config"]["blocking_admin"]["blocking_db"] + ) + except OperationalError as ex: + raise click.ClickException(str(ex)) + + return ctx + + +class RequestType(click.ParamType): + name = "request slug or uuid" + + def convert(self, value, param, ctx) -> "BlockingRequest": + # Try from UUID first + from uuid import UUID + + try: + uuid = UUID(value) + except ValueError: + pass + else: + request = ctx.obj["blocking_admin"].find_request_by_id(uuid) + if not request: + raise click.ClickException(f"Request “{uuid}†not found from id.") + return request + + # Try from slug + request = ctx.obj["blocking_admin"].find_request(value) + if not request: + raise click.ClickException(f"Request “{value}†not found.") + return request + + +@blocking_cli_group.command(name="new-request") +@click.option( + "-m", "--message", "reason", metavar="REASON", help="why the request was made" +) +@click.argument("slug") +@click.pass_context +def new_request(ctx: click.Context, slug: str, reason: Optional[str] = None) -> None: + """Create a new request to block objects + + SLUG is a human-readable unique identifier for the request. It is an + internal identifier that will be used in subsequent commands to address this + newly recorded request. + + A reason for the request must be specified, either using the `-m` option or + via the provided editor. + """ + + from .db import DuplicateRequest + + if reason is None or reason == "": + try: + reason = edit_message(f"Please enter a reason for the request “{slug}â€.") + except EditAborted: + raise click.ClickException("Aborting due to an empty reason.") + + try: + request = ctx.obj["blocking_admin"].create_request(slug, reason) + except DuplicateRequest: + raise click.ClickException(f"Request “{slug}†already exists.") + click.echo( + f"New request “{click.style(slug, fg='green', bold=True)}†" + f"recorded with identifier {request.id}" + ) + + +@blocking_cli_group.command(name="list-requests") +@click.option( + "-a", + "--include-cleared-requests/--exclude-cleared-requests", + default=False, + help="Show requests without any blocking state", +) +@click.pass_context +def list_requests(ctx: click.Context, include_cleared_requests: bool) -> None: + """List blocking requests""" + + def list_output() -> Iterator[str]: + for request, block_count in ctx.obj["blocking_admin"].get_requests( + include_cleared_requests=include_cleared_requests + ): + yield f"📄 {click.style(request.slug, bold=True)}\n" + yield f"📊 {block_count} object{'s' if block_count != 1 else ''}\n" + yield f"🪪 {request.id}\n" + # XXX: humanize would be nice here as well + yield f"📅 {request.date.strftime('%c %z')}\n" + yield from output_message(request.reason) + yield "\n" + + click.echo_via_pager(list_output()) + + +class BlockedStateType(click.Choice): + def __init__(self): + from .db import BlockingState + + super().__init__( + [format_blocked_state(state) for state in BlockingState], + case_sensitive=False, + ) + + +def parse_blocked_state(str: str) -> "BlockingState": + from .db import BlockingState + + return BlockingState[str.replace("-", "_").upper()] + + +def format_blocked_state(state: "BlockingState") -> str: + return state.name.lower().replace("_", "-") + + +def read_origins(file: TextIO) -> List[str]: + import re + + filter_re = re.compile(r"^(#|$)") + return [ + line.strip() for line in file.read().split("\n") if not filter_re.match(line) + ] + + +@blocking_cli_group.command(name="update-objects") +@click.option("-m", "--message", help="an explanation for this change") +@click.option("-f", "--file", type=click.File(), help="a file with one Origin per line") +@click.argument("request", metavar="SLUG", type=RequestType()) +@click.argument("new-state", metavar="NEW_STATE", type=BlockedStateType()) +@click.pass_context +def update_objects( + ctx: click.Context, + request: "BlockingRequest", + new_state: str, + message: Optional[str] = None, + file: Optional[TextIO] = None, +) -> None: + """Update the blocking state of given objects + + The blocked state of the provided Origins will be updated to NEW_STATE for the + request SLUG. + + NEW_STATE must be one of “blockedâ€, “decision-pending†or “non_blockedâ€. + + origins must be provided one per line, either via the standard input or a + file specified via the `-f` option. `-` is synonymous for the standard + input. + + An explanation for this change must be added to the request history. It can + either be specified by the `-m` option or via the provided editor. + """ + + import sys + + from .db import RequestNotFound + + if file is None: + file = sys.stdin + + blocked_state = parse_blocked_state(new_state) + origins = read_origins(file) + + if len(origins) == 0: + raise click.ClickException("No origin given!") + + if message is None or message == "": + try: + message = edit_message( + f"Please enter an explanation for this update on request “{request.slug}â€." + ) + except EditAborted: + raise click.ClickException("Aborting due to an empty explanation.") + + try: + ctx.obj["blocking_admin"].set_origins_state(request.id, blocked_state, origins) + ctx.obj["blocking_admin"].record_history(request.id, message) + except RequestNotFound: + raise click.ClickException(f"Request with id “{request.id}†not found.") + + click.echo( + f"Updated blocking state for {len(origins)} origins in request “{request.slug}â€." + ) + + +@blocking_cli_group.command(name="status") +@click.argument("request", metavar="SLUG", type=RequestType()) +@click.pass_context +def status_cmd(ctx: click.Context, request: "BlockingRequest") -> None: + """Get the blocking states defined by a request""" + + states = ctx.obj["blocking_admin"].get_states_for_request(request.id) + for swhid, state in states.items(): + click.echo(f"{swhid} {format_blocked_state(state)}") + + +@blocking_cli_group.command(name="history") +@click.argument("request", metavar="SLUG", type=RequestType()) +@click.pass_context +def history_cmd(ctx: click.Context, request: "BlockingRequest") -> None: + """Get the history for a request""" + + def history_output() -> Iterator[str]: + # XXX: we have no way to tie a record to a set of swhids or their state + # the only thing we can do is display the messages + yield f"History for request “{request.slug}†({request.id}) \n" + yield from output_message(request.reason) + yield "\n" + for record in ctx.obj["blocking_admin"].get_history(request.id): + # XXX: if we agree to bring humanize in the dependencies + # relative_time = humanize.naturaltime( + # dt.datetime.now(tz=dt.timezone.utc) - record.date + # ) + yield f"📅 {record.date.strftime('%c %z')}\n" + for i, line in enumerate(record.message.splitlines()): + yield from output_message(record.message) + + click.echo_via_pager(history_output()) + + +@blocking_cli_group.command(name="origin-state") +@click.argument("origins", metavar="ORIGIN", nargs=-1, type=str) +@click.pass_context +def object_state(ctx: click.Context, origins: List[str]) -> None: + """Get the blocking state for a set of Origins + + If an object given in the arguments is not listed in the output, + it means no blocking state is set in any requests. + """ + + import itertools + + from .db import BlockingState + + STATE_TO_COLORS = { + BlockingState.NON_BLOCKED: "bright_green", + BlockingState.DECISION_PENDING: "bright_yellow", + BlockingState.BLOCKED: "bright_red", + } + STATE_TO_LABEL = { + BlockingState.NON_BLOCKED: "allowed", + BlockingState.DECISION_PENDING: "decision-pending", + BlockingState.BLOCKED: "blocked", + } + + def state_output() -> Iterator[str]: + # find_blocks() will group blocks for the same Origin + for origin, blocks_iter in itertools.groupby( + ctx.obj["blocking_admin"].find_blocking_states(origins), + key=lambda block: block.url_pattern, + ): + blocks = list(blocks_iter) + blocked = any(block.state != BlockingState.NON_BLOCKED for block in blocks) + yield ( + f"{'blocked ' if blocked else 'visible'} " + f"{click.style(str(blocks[0].url_pattern), bold=blocked)}\n" + ) + for block in blocks: + yield click.style( + f" {block.request_slug}: {STATE_TO_LABEL[block.state]}\n", + fg=STATE_TO_COLORS[block.state], + ) + + click.echo_via_pager(state_output()) + + +@blocking_cli_group.command(name="clear-request") +@click.option("-m", "--message", help="an explanation for this change") +@click.argument("request", metavar="SLUG", type=RequestType()) +@click.pass_context +def clear_request( + ctx: click.Context, request: "BlockingRequest", message: Optional[str] +) -> None: + """Remove all blocking states for the given request""" + + from .db import RequestNotFound + + if message is None or message == "": + states = ctx.obj["blocking_admin"].get_states_for_request(request.id) + extra_lines = ["Associated object states:", ""] + for url, state in states.items(): + extra_lines.append(f"{url} {format_blocked_state(state)}") + try: + message = edit_message( + "Please enter an explanation for clearing blocks" + f" from request “{request.slug}â€.", + extra_lines=extra_lines, + ) + except EditAborted: + raise click.ClickException("Aborting due to an empty explanation.") + + try: + ctx.obj["blocking_admin"].delete_blocking_states(request.id) + ctx.obj["blocking_admin"].record_history(request.id, message) + except RequestNotFound: + raise click.ClickException(f"Request with id “{request.id}†not found.") + + click.echo(f"Blockings cleared for request “{request.slug}â€.") diff --git a/swh/storage/proxies/blocking/db.py b/swh/storage/proxies/blocking/db.py new file mode 100644 index 000000000..69488035d --- /dev/null +++ b/swh/storage/proxies/blocking/db.py @@ -0,0 +1,430 @@ +# Copyright (C) 2024 The Software Heritage developers + +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import enum +import logging +from typing import Dict, List, Optional, Tuple +from urllib.parse import urlparse +from uuid import UUID + +import attr +import psycopg2.errors +from psycopg2.extras import execute_values + +from swh.core.db import BaseDb +from swh.core.statsd import statsd +from swh.storage.proxies.masking.db import DuplicateRequest, RequestNotFound + +METRIC_QUERY_TOTAL = "swh_storage_blocking_queried_total" +METRIC_BLOCKED_TOTAL = "swh_storage_blocking_blocked_total" +KNOWN_SUFFIXES = ("git", "svn", "hg", "cvs", "CVS") + +logger = logging.getLogger(__name__) + + +class BlockingState(enum.Enum): + """Value recording "how much" an url associated to a blocking request is blocked""" + + NON_BLOCKED = enum.auto() + """The origin url can be ingested/updated""" + DECISION_PENDING = enum.auto() + """Ingestion from origin url is temporarily blocked until the request is reviewed""" + BLOCKED = enum.auto() + """Ingestion from origin url is permanently blocked""" + + +@attr.s +class BlockingStatus: + """Return value when requesting if an origin url ingestion is blocked""" + + state = attr.ib(type=BlockingState) + request = attr.ib(type=UUID) + + +@attr.s +class BlockingRequest: + """A request for blocking a set of origins from being ingested""" + + id = attr.ib(type=UUID) + """Unique id for the request (will be returned to requesting clients)""" + slug = attr.ib(type=str) + """Unique, human-readable id for the request (for administrative interactions)""" + date = attr.ib(type=datetime.datetime) + """Date the request was received""" + reason = attr.ib(type=str) + """Why the request was made""" # TODO: should this be stored here? + + +@attr.s +class RequestHistory: + request = attr.ib(type=UUID) + """id of the blocking request""" + date = attr.ib(type=datetime.datetime) + """Date the history entry has been added""" + message = attr.ib(type=str) + """Free-form history information (e.g. "policy decision made")""" + + +@attr.s +class BlockedOrigin: + request_slug = attr.ib(type=str) + url_pattern = attr.ib(type=str) + state = attr.ib(type=BlockingState) + + +class BlockingDb(BaseDb): + current_version = 1 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.conn.autocommit = True + + +class BlockingAdmin(BlockingDb): + def create_request(self, slug: str, reason: str) -> BlockingRequest: + """Record a new blocking request + + Arguments: + slug: human-readable unique identifier for the request + reason: free-form text recording why the request was made + + Raises: + DuplicateRequest when the slug already exists + """ + cur = self.cursor() + + try: + cur.execute( + """ + INSERT INTO blocking_request (slug, reason) + VALUES (%s, %s) RETURNING id, date + """, + (slug, reason), + ) + except psycopg2.errors.UniqueViolation: + raise DuplicateRequest(slug) + + res = cur.fetchone() + assert res is not None, "PostgreSQL returned an inconsistent result" + id, date = res + return BlockingRequest(id=id, date=date, slug=slug, reason=reason) + + def find_request(self, slug: str) -> Optional[BlockingRequest]: + """Find a blocking request using its slug + + Returns: :const:`None` if a request with the given slug doesn't exist + """ + cur = self.cursor() + cur.execute( + """ + SELECT id, slug, date, reason + FROM blocking_request + WHERE slug = %s + """, + (slug,), + ) + + res = cur.fetchone() + if not res: + return None + id, slug, date, reason = res + return BlockingRequest(id=id, date=date, slug=slug, reason=reason) + + def find_request_by_id(self, id: UUID) -> Optional[BlockingRequest]: + """Find a blocking request using its id + + Returns: :const:`None` if a request with the given request doesn't exist + """ + cur = self.cursor() + cur.execute( + """ + SELECT id, slug, date, reason + FROM blocking_request + WHERE id = %s + """, + (id,), + ) + + res = cur.fetchone() + if not res: + return None + id, slug, date, reason = res + return BlockingRequest(id=id, date=date, slug=slug, reason=reason) + + def get_requests( + self, include_cleared_requests: bool = False + ) -> List[Tuple[BlockingRequest, int]]: + """Get known requests + + Args: + include_cleared_requests: also include requests with no associated + blocking states + """ + cur = self.cursor() + + query = """SELECT id, slug, date, reason, COUNT(url_match) AS blocking_count + FROM blocking_request + LEFT JOIN blocked_origin ON (request = id) + GROUP BY id""" + if not include_cleared_requests: + query += " HAVING COUNT(url_match) > 0" + query += " ORDER BY date DESC" + cur.execute(query) + result = [] + for id, slug, date, reason, block_count in cur: + result.append( + ( + BlockingRequest(id=id, slug=slug, date=date, reason=reason), + block_count, + ) + ) + return result + + def set_origins_state( + self, request_id: UUID, new_state: BlockingState, urls: List[str] + ): + """Within the request with the given id, record the state of the given + objects as ``new_state``. + + This creates entries or updates them as appropriate. + + Raises: :exc:`RequestNotFound` if the request is not found. + """ + cur = self.cursor() + + cur.execute("SELECT 1 FROM blocking_request WHERE id = %s", (request_id,)) + if cur.fetchone() is None: + raise RequestNotFound(request_id) + + execute_values( + cur, + """ + INSERT INTO blocked_origin (url_match, request, state) + VALUES %s + ON CONFLICT (url_match, request) + DO UPDATE SET state = EXCLUDED.state + """, + ( + ( + url, + request_id, + new_state.name.lower(), + ) + for url in urls + ), + ) + + def get_states_for_request(self, request_id: UUID) -> Dict[str, BlockingState]: + """Get the state of urls associated with the given request. + + Raises :exc:`RequestNotFound` if the request is not found. + """ + + cur = self.cursor() + + cur.execute("SELECT 1 FROM blocking_request WHERE id = %s", (request_id,)) + if cur.fetchone() is None: + raise RequestNotFound(request_id) + + result = {} + cur.execute( + """SELECT url_match, state + FROM blocked_origin + WHERE request = %s""", + (request_id,), + ) + for url, state in cur: + result[url] = BlockingState[state.upper()] + return result + + def find_blocking_states(self, urls: List[str]) -> List[BlockedOrigin]: + """Lookup the blocking state and associated requests for the given urls + (exact match).""" + + cur = self.cursor() + result = [] + for ( + url, + request_slug, + state, + ) in psycopg2.extras.execute_values( + cur, + """SELECT url_match, slug, state + FROM blocked_origin + INNER JOIN (VALUES %s) v(url_match) + USING (url_match) + LEFT JOIN blocking_request ON (blocking_request.id = request) + ORDER BY url_match, blocking_request.date DESC + """, + ((url,) for url in urls), + fetch=True, + ): + result.append( + BlockedOrigin( + request_slug=request_slug, + url_pattern=url, + state=BlockingState[state.upper()], + ) + ) + return result + + def delete_blocking_states(self, request_id: UUID) -> None: + """Remove all blocking states for the given request. + + Raises: :exc:`RequestNotFound` if the request is not found. + """ + + cur = self.cursor() + cur.execute("SELECT 1 FROM blocking_request WHERE id = %s", (request_id,)) + if cur.fetchone() is None: + raise RequestNotFound(request_id) + + cur.execute("DELETE FROM blocked_origin WHERE request = %s", (request_id,)) + + def record_history(self, request_id: UUID, message: str) -> RequestHistory: + """Add an entry to the history of the given request. + + Raises: :exc:`RequestNotFound` if the request is not found. + """ + cur = self.cursor() + try: + cur.execute( + """ + INSERT INTO blocking_request_history (request, message) + VALUES (%s, %s) RETURNING date + """, + (request_id, message), + ) + except psycopg2.errors.ForeignKeyViolation: + raise RequestNotFound(request_id) + + res = cur.fetchone() + assert res is not None, "PostgreSQL returned an inconsistent result" + + return RequestHistory(request=request_id, date=res[0], message=message) + + def get_history(self, request_id: UUID) -> List[RequestHistory]: + """Get the history of a given request. + + Raises: :exc:`RequestNotFound` if the request if not found. + """ + cur = self.cursor() + + cur.execute("SELECT 1 FROM blocking_request WHERE id = %s", (request_id,)) + if cur.fetchone() is None: + raise RequestNotFound(request_id) + + cur.execute( + """SELECT date, message + FROM blocking_request_history + WHERE request = %s + ORDER BY date DESC""", + (request_id,), + ) + records = [] + for date, message in cur: + records.append( + RequestHistory(request=request_id, date=date, message=message) + ) + return records + + +class BlockingQuery(BlockingDb): + def origins_are_blocked( + self, urls: List[str], all_statuses=False + ) -> Dict[str, BlockingStatus]: + """Return the blocking status for eeach origin url given in urls + + If all_statuses is False, do not return urls whose blocking status is + defined as NON_BLOCKING (so only return actually blocked urls). + Otherwise, return all matching blocking status. + + """ + ret = {} + for url in urls: + status = self.origin_is_blocked(url) + if status and (all_statuses or status.state != BlockingState.NON_BLOCKED): + ret[url] = status + return ret + + def origin_is_blocked(self, url: str) -> Optional[BlockingStatus]: + """Checks if the origin URL should be blocked. + + If the given url matches a set of registered blocking rules, return the + most appropriate one. Otherwise, return None. + """ + logging.debug("url: %s", url) + cur = self.cursor() + statsd.increment(METRIC_QUERY_TOTAL, 1) + + # first look for an exact match on + # 1. the given URL + # 2. the trimmed URL (if trailing /) + # 3. the extension-less URL (if any) + checked_urls = [url] + if url.endswith("/"): + trimmed_url = url.rstrip("/") + checked_urls.insert(0, trimmed_url) + else: + trimmed_url = url + if trimmed_url.rsplit(".", 1)[-1] in KNOWN_SUFFIXES: + checked_urls.append(trimmed_url.rsplit(".", 1)[0]) + logger.debug("Checked urls for exact match: %s", checked_urls) + + psycopg2.extras.execute_values( + cur, + """ + SELECT url_match, request, state + FROM blocked_origin + INNER JOIN (VALUES %s) v(url_match) + USING (url_match) + ORDER BY char_length(url_match) DESC LIMIT 1 + """, + ((_url,) for _url in checked_urls), + ) + row = cur.fetchone() + if not row: + # no exact match found, check for prefix matches on sub-path urls + checked_urls = [] + parsed_url = urlparse(trimmed_url) + path = parsed_url.path + baseurl = f"{parsed_url.scheme}://{parsed_url.netloc}" + checked_urls.append(baseurl) + for path_element in path.split("/"): + if path_element: + checked_urls.append(f"{checked_urls[-1]}/{path_element}") + + logger.debug("Checked urls for prefix match: %s", checked_urls) + # the request below is a bit tricky; joining on something like + # ON v.url LIKE blocked_origin.url_match || '/%%' + # works but prevent pg from using the btree index. + # This below should allow pg to use index. This trick (suggested by + # vlorentz) is to know that '0' is the next character after '/'. + psycopg2.extras.execute_values( + cur, + """ + SELECT url_match, request, state + FROM blocked_origin + INNER JOIN (VALUES %s) v(url) + ON v.url > blocked_origin.url_match || '/' + AND v.url < blocked_origin.url_match || '0' + ORDER BY char_length(url_match) DESC LIMIT 1 + """, + ((_url,) for _url in checked_urls), + ) + row = cur.fetchone() + + if row: + url_match, request_id, state = row + status = BlockingStatus( + state=BlockingState[state.upper()], request=request_id + ) + logger.debug("Matching status for %s: %s", url_match, status) + statsd.increment(METRIC_BLOCKED_TOTAL, 1) + return status + return None diff --git a/swh/storage/proxies/blocking/sql/10-superuser-init.sql b/swh/storage/proxies/blocking/sql/10-superuser-init.sql new file mode 100644 index 000000000..d7b82ed3b --- /dev/null +++ b/swh/storage/proxies/blocking/sql/10-superuser-init.sql @@ -0,0 +1 @@ +create extension if not exists "uuid-ossp"; -- for masking proxy diff --git a/swh/storage/proxies/blocking/sql/30-schema.sql b/swh/storage/proxies/blocking/sql/30-schema.sql new file mode 100644 index 000000000..6a25a0971 --- /dev/null +++ b/swh/storage/proxies/blocking/sql/30-schema.sql @@ -0,0 +1,40 @@ + +create type blocked_state as enum ('non_blocked', 'decision_pending', 'blocked'); +comment on type blocked_state is 'The degree to which an origin url is blocked'; + +create table if not exists blocking_request ( + id uuid primary key default uuid_generate_v4(), + slug text not null, + date timestamptz not null default now(), + reason text not null +); + +comment on table blocking_request is 'A recorded request for blocking certain objects'; +comment on column blocking_request.id is 'Opaque id of the request'; +comment on column blocking_request.slug is 'Human-readable id of the request'; +comment on column blocking_request.date is 'Date when the request was recorded'; +comment on column blocking_request.reason is 'Free-form description of the request'; + +create table if not exists blocking_request_history ( + request uuid references blocking_request(id), + date timestamptz not null default now(), + message text not null, + primary key (request, date) +); + +comment on table blocking_request_history is 'History of a blocking request'; +comment on column blocking_request_history.request is 'Opaque id of the request'; +comment on column blocking_request_history.date is 'Date at which the message was recorded'; +comment on column blocking_request_history.message is 'History message'; + +create table if not exists blocked_origin ( + url_match text not null, + request uuid references blocking_request(id) not null, + state blocked_state not null, + primary key (url_match, request) +); + +comment on table blocked_origin is 'All the origin known to be affected by a specific request'; +comment on column blocked_origin.url_match is 'The url matching scheme to be blocked from being ingested'; +comment on column blocked_origin.request is 'Reference to the affecting request'; +comment on column blocked_origin.state is 'The degree to which the origin is blocked as a result of the request'; diff --git a/swh/storage/proxies/blocking/sql/60-indexes.sql b/swh/storage/proxies/blocking/sql/60-indexes.sql new file mode 100644 index 000000000..30eefdec9 --- /dev/null +++ b/swh/storage/proxies/blocking/sql/60-indexes.sql @@ -0,0 +1,5 @@ + +create unique index if not exists blocking_request_slug_idx on blocking_request using btree(slug); + +create index if not exists blocked_origin_request_idx on blocked_origin using btree(request, url_match); +comment on index blocked_origin_request_idx is 'Allow listing all the objects associated by request, ordered by url'; diff --git a/swh/storage/tests/blocking/__init__.py b/swh/storage/tests/blocking/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/swh/storage/tests/blocking/conftest.py b/swh/storage/tests/blocking/conftest.py new file mode 100644 index 000000000..8e6a3836d --- /dev/null +++ b/swh/storage/tests/blocking/conftest.py @@ -0,0 +1,37 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import partial + +import pytest +from pytest_postgresql import factories + +from swh.core.db.db_utils import initialize_database_for_module +from swh.storage.proxies.blocking.db import BlockingAdmin, BlockingQuery + +blocking_db_postgresql_proc = factories.postgresql_proc( + load=[ + partial( + initialize_database_for_module, + modname="storage.proxies.blocking", + version=BlockingAdmin.current_version, + ), + ], +) + + +blocking_db_postgresql = factories.postgresql( + "blocking_db_postgresql_proc", +) + + +@pytest.fixture +def blocking_admin(blocking_db_postgresql) -> BlockingAdmin: + return BlockingAdmin.connect(blocking_db_postgresql.info.dsn) + + +@pytest.fixture +def blocking_query(blocking_db_postgresql) -> BlockingQuery: + return BlockingQuery.connect(blocking_db_postgresql.info.dsn) diff --git a/swh/storage/tests/blocking/test_cli.py b/swh/storage/tests/blocking/test_cli.py new file mode 100644 index 000000000..c90c7c87a --- /dev/null +++ b/swh/storage/tests/blocking/test_cli.py @@ -0,0 +1,585 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import closing +import datetime as dt +from io import StringIO +import socket +import textwrap + +from click.testing import CliRunner +import pytest + +from swh.core.cli.db import db as swhdb +from swh.core.db.db_utils import get_database_info +from swh.storage.tests.storage_data import StorageData as data + +from ...proxies.blocking.cli import ( + EditAborted, + blocking_cli_group, + clear_request, + edit_message, + history_cmd, + list_requests, + new_request, + object_state, + read_origins, + status_cmd, + update_objects, +) +from ...proxies.blocking.db import ( + BlockedOrigin, + BlockingAdmin, + BlockingRequest, + BlockingState, + RequestHistory, + RequestNotFound, +) + + +def test_cli_db_create(postgresql): + """Create a db then initializing it should be ok""" + module_name = "storage.proxies.blocking" + + db_params = postgresql.info + dbname = "blocking-db" + conninfo = ( + f"postgresql://{db_params.user}@{db_params.host}:{db_params.port}/{dbname}" + ) + + # This creates the db and installs the necessary admin extensions + result = CliRunner().invoke(swhdb, ["create", module_name, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + # This initializes the schema and data + result = CliRunner().invoke(swhdb, ["init", module_name, "--dbname", conninfo]) + + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + dbmodule, dbversion, dbflavor = get_database_info(conninfo) + assert dbmodule == "storage.proxies.blocking" + assert dbversion == BlockingAdmin.current_version + assert dbflavor is None + + +@pytest.fixture +def blocking_admin_config(blocking_db_postgresql): + return {"blocking_admin": {"blocking_db": blocking_db_postgresql.info.dsn}} + + +def test_blocking_admin_not_defined(): + runner = CliRunner(mix_stderr=False) + result = runner.invoke( + blocking_cli_group, + ["list-requests"], + obj={"config": {}}, + ) + assert result.exit_code == 2 + assert "blocking_admin" in result.stderr + + +def find_free_port(): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("localhost", 0)) + # We explicitly not set SO_REUSEADDR because we want the port + # to stay free so we can get our connection refused. + return s.getsockname()[1] + + +def test_blocking_admin_unreachable(): + erroneous_postgresql_port = find_free_port() + runner = CliRunner(mix_stderr=False) + result = runner.invoke( + blocking_cli_group, + ["list-requests"], + obj={ + "config": { + "blocking_admin": { + "blocking_db": ( + "postgresql://localhost/postgres?" + f"port={erroneous_postgresql_port}" + ) + } + } + }, + ) + assert result.exit_code == 1 + assert "failed: Connection refused" in result.stderr + + +def test_edit_message_success(mocker): + mocked_click_edit = mocker.patch( + "swh.storage.proxies.blocking.cli.click.edit", + return_value="Sometimes I'm round", + ) + assert ( + edit_message("Hello!", extra_lines=["Sometimes I'm alone", "Sometimes I'm not"]) + == "Sometimes I'm round" + ) + mocked_click_edit.assert_called_once_with( + textwrap.dedent( + """ + + # Hello! + # Lines starting with “#†will be ignored. An empty message will abort the operation. + # + # Sometimes I'm alone + # Sometimes I'm not""" + ) + ) + + +def test_edit_message_removes_comment(mocker): + mocker.patch( + "swh.storage.proxies.blocking.cli.click.edit", + return_value=textwrap.dedent( + """\ + Hello! + + # This is a comment""" + ), + ) + assert edit_message("RANDOM PROMPT") == "Hello!" + + +def test_edit_message_empty_message_aborts(mocker): + mocker.patch("swh.storage.proxies.blocking.cli.click.edit", return_value="") + with pytest.raises(EditAborted): + edit_message("RANDOM PROMPT") + + +@pytest.fixture +def request01(): + return BlockingRequest( + id="da785a27-7e59-4a35-b82a-a5ae3714407c", + slug="request-01", + date=dt.datetime.now(dt.timezone.utc), + reason="one", + ) + + +@pytest.fixture +def request02(): + return BlockingRequest( + id="4fd42e35-2b6c-4536-8447-bc213cd0118b", + slug="request-02", + date=dt.datetime.now(dt.timezone.utc), + reason="two", + ) + + +@pytest.fixture +def blocked_origin(request01): + return BlockedOrigin( + request_slug=request01.slug, + url_pattern=data.origins[0].url, + state=BlockingState.BLOCKED, + ) + + +@pytest.fixture +def blocked_origin2(request01): + return BlockedOrigin( + request_slug=request01.slug, + url_pattern=data.origins[1].url, + state=BlockingState.DECISION_PENDING, + ) + + +def blocked_origin3(request02): + return BlockedOrigin( + request_slug=request02.slug, + url_pattern=data.origins[2].url, + state=BlockingState.NOT_BLOCKED, + ) + + +@pytest.fixture +def history01(request01): + return RequestHistory( + request=request01.id, + date=dt.datetime.now(dt.timezone.utc), + message="record one", + ) + + +@pytest.fixture +def history02(request01): + return RequestHistory( + request=request01.id, + date=dt.datetime.now(dt.timezone.utc), + message="record two", + ) + + +@pytest.fixture +def mocked_blocking_admin( + mocker, request01, request02, blocked_origin, blocked_origin2, history01, history02 +): + def mock_find_request(slug): + if slug == "request-01": + return request01 + elif slug == "request-02": + return request02 + else: + return None + + def mock_find_request_by_id(id): + if str(id) == "da785a27-7e59-4a35-b82a-a5ae3714407c": + return request01 + return None + + def mock_get_states_for_request(request_id): + if request_id == request01.id: + return { + blocked_origin.url_pattern: blocked_origin.state, + blocked_origin2.url_pattern: blocked_origin2.state, + } + return RequestNotFound(request_id) + + def mock_get_history(request_id): + if request_id == request01.id: + return [history02, history01] + return RequestNotFound(request_id) + + mock = mocker.Mock(spec=BlockingAdmin) + mock.find_request.side_effect = mock_find_request + mock.find_request_by_id.side_effect = mock_find_request_by_id + mock.get_states_for_request.side_effect = mock_get_states_for_request + mock.get_history.side_effect = mock_get_history + mock.get_requests.return_value = [(request02, 2), (request01, 1)] + return mock + + +def test_new_request(mocker, mocked_blocking_admin, request01): + mocked_blocking_admin.create_request.return_value = request01 + mocker.patch("swh.storage.proxies.blocking.cli.click.edit", return_value="one") + runner = CliRunner() + result = runner.invoke( + new_request, ["request-01"], obj={"blocking_admin": mocked_blocking_admin} + ) + assert result.exit_code == 0 + mocked_blocking_admin.create_request.assert_called_once_with("request-01", "one") + assert "da785a27-7e59-4a35-b82a-a5ae3714407c" in result.output + assert "request-01" in result.output + + +def test_new_request_with_message(mocker, mocked_blocking_admin, request01): + mocked_blocking_admin.create_request.return_value = request01 + mocked_click_edit = mocker.patch("swh.storage.proxies.blocking.cli.click.edit") + runner = CliRunner() + result = runner.invoke( + new_request, + ["--message=one", "request-01"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + mocked_click_edit.assert_not_called() + mocked_blocking_admin.create_request.assert_called_once_with("request-01", "one") + assert "da785a27-7e59-4a35-b82a-a5ae3714407c" in result.output + assert "request-01" in result.output + + +def test_new_request_empty_message_aborts(mocker, mocked_blocking_admin): + mocked_blocking_admin.create_request.return_value = request01 + mocker.patch("swh.storage.proxies.blocking.cli.click.edit", side_effect=EditAborted) + runner = CliRunner() + result = runner.invoke( + new_request, + ["request-01"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 1 + assert "Aborting" in result.output + mocked_blocking_admin.create_request.assert_not_called() + + +def test_list_requests_default(mocked_blocking_admin): + runner = CliRunner() + result = runner.invoke( + list_requests, [], obj={"blocking_admin": mocked_blocking_admin} + ) + assert result.exit_code == 0 + mocked_blocking_admin.get_requests.assert_called_once_with( + include_cleared_requests=False + ) + assert "request-01" in result.output + assert "da785a27-7e59-4a35-b82a-a5ae3714407c" in result.output + assert "request-02" in result.output + assert "4fd42e35-2b6c-4536-8447-bc213cd0118b" in result.output + + +def test_list_requests_include_cleared_requests(mocked_blocking_admin): + runner = CliRunner() + result = runner.invoke( + list_requests, + ["--include-cleared-requests"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + mocked_blocking_admin.get_requests.assert_called_once_with( + include_cleared_requests=True + ) + + +ORIGINS_INPUT = f"""\ +# origin1 +{data.origin.url} + +# origin2 +{data.origin2.url} +""" + +ORIGINS_INPUT_PARSED = [ + data.origin.url, + data.origin2.url, +] + + +def test_read_origins(): + file = StringIO(ORIGINS_INPUT) + assert read_origins(file) == ORIGINS_INPUT_PARSED + + +def test_update_objects(mocker, mocked_blocking_admin, request01): + mocked_click_edit = mocker.patch( + "swh.storage.proxies.blocking.cli.click.edit", return_value="action made" + ) + runner = CliRunner() + result = runner.invoke( + update_objects, + ["request-01", "decision-pending"], + input=ORIGINS_INPUT, + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + assert "2 origins" in result.output + assert "request-01" in result.output + mocked_click_edit.assert_called_once() + mocked_blocking_admin.set_origins_state.assert_called_once_with( + request01.id, BlockingState.DECISION_PENDING, ORIGINS_INPUT_PARSED + ) + mocked_blocking_admin.record_history.assert_called_once_with( + request01.id, "action made" + ) + + +def test_update_objects_with_message(mocker, mocked_blocking_admin, request01): + mocked_click_edit = mocker.patch( + "swh.storage.proxies.blocking.cli.click.edit", return_value="action made" + ) + runner = CliRunner() + result = runner.invoke( + update_objects, + ["request-01", "decision-pending", "--message=action made"], + input=ORIGINS_INPUT, + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + mocked_click_edit.assert_not_called() + mocked_blocking_admin.set_origins_state.assert_called_once() + mocked_blocking_admin.record_history.assert_called_once_with( + request01.id, "action made" + ) + + +def test_update_objects_empty_message_aborts(mocker, mocked_blocking_admin): + mocker.patch("swh.storage.proxies.blocking.cli.click.edit", side_effect=EditAborted) + runner = CliRunner() + result = runner.invoke( + update_objects, + ["request-01", "decision-pending"], + input=ORIGINS_INPUT, + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 1 + mocked_blocking_admin.record_history.assert_not_called() + + +def test_update_objects_with_file(tmp_path, mocker, mocked_blocking_admin, request01): + mocker.patch( + "swh.storage.proxies.blocking.cli.click.edit", return_value="action made" + ) + origins_file = tmp_path / "origins" + origins_file.write_text(ORIGINS_INPUT) + runner = CliRunner() + result = runner.invoke( + update_objects, + ["request-01", "decision-pending", f"--file={str(origins_file)}"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + assert "2 origins" in result.output + mocked_blocking_admin.set_origins_state.assert_called_once_with( + request01.id, BlockingState.DECISION_PENDING, ORIGINS_INPUT_PARSED + ) + + +def test_update_objects_no_origins(mocked_blocking_admin): + runner = CliRunner() + result = runner.invoke( + update_objects, + ["request-01", "decision-pending", "--message=message"], + input="", + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 1 + assert "No origin given!" in result.output + mocked_blocking_admin.set_origins_state.assert_not_called() + mocked_blocking_admin.record_history.assert_not_called() + + +def test_status(mocked_blocking_admin, request01): + runner = CliRunner() + result = runner.invoke( + status_cmd, ["request-01"], obj={"blocking_admin": mocked_blocking_admin} + ) + assert result.exit_code == 0 + mocked_blocking_admin.get_states_for_request.assert_called_once_with(request01.id) + assert result.output == textwrap.dedent( + """\ + https://github.com/user1/repo1 blocked + https://github.com/user2/repo1 decision-pending + """ + ) + + +def test_status_request_not_found(mocked_blocking_admin): + runner = CliRunner() + result = runner.invoke( + status_cmd, ["garbage"], obj={"blocking_admin": mocked_blocking_admin} + ) + assert result.exit_code == 1 + assert "Error: Request “garbage†not found" in result.output + + +def test_status_request_via_uuid(mocked_blocking_admin, request01): + runner = CliRunner() + result = runner.invoke( + status_cmd, + ["da785a27-7e59-4a35-b82a-a5ae3714407c"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + mocked_blocking_admin.get_states_for_request.assert_called_once_with(request01.id) + + +def test_history(mocked_blocking_admin, request01): + runner = CliRunner() + result = runner.invoke( + history_cmd, ["request-01"], obj={"blocking_admin": mocked_blocking_admin} + ) + assert result.exit_code == 0 + mocked_blocking_admin.get_history.assert_called_once_with(request01.id) + assert ( + "History for request “request-01†(da785a27-7e59-4a35-b82a-a5ae3714407c)" + in result.output + ) + assert "record two" in result.output + assert "record one" in result.output + + +def test_history_not_found(mocked_blocking_admin): + runner = CliRunner() + result = runner.invoke( + history_cmd, ["garbage"], obj={"blocking_admin": mocked_blocking_admin} + ) + assert result.exit_code == 1 + assert "Error: Request “garbage†not found" in result.output + + +def test_object_state(mocked_blocking_admin, blocked_origin, blocked_origin2): + blocked_origin_in_request02 = BlockedOrigin( + request_slug="request-02", + url_pattern=blocked_origin.url_pattern, + state=BlockingState.NON_BLOCKED, + ) + mocked_blocking_admin.find_blocking_states.return_value = [ + blocked_origin_in_request02, + blocked_origin, + blocked_origin2, + ] + runner = CliRunner() + result = runner.invoke( + object_state, + [ + data.origins[0].url, + data.origins[1].url, + data.origins[2].url, + ], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + assert result.output == textwrap.dedent( + """\ + blocked https://github.com/user1/repo1 + request-02: allowed + request-01: blocked + blocked https://github.com/user2/repo1 + request-01: decision-pending + + """ + ) + + +def test_clear_request(mocker, mocked_blocking_admin, request01): + mocked_click_edit = mocker.patch( + "swh.storage.proxies.blocking.cli.click.edit", return_value="request cleared" + ) + runner = CliRunner() + result = runner.invoke( + clear_request, + ["request-01"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + assert "Blockings cleared for request “request-01â€." in result.output + mocked_click_edit.assert_called_once() + mocked_blocking_admin.delete_blocking_states.assert_called_once_with(request01.id) + mocked_blocking_admin.record_history.assert_called_once_with( + request01.id, "request cleared" + ) + + +def test_clear_request_with_message(mocker, mocked_blocking_admin, request01): + mocked_click_edit = mocker.patch("swh.storage.proxies.blocking.cli.click.edit") + runner = CliRunner() + result = runner.invoke( + clear_request, + ["--message=request cleared", "request-01"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 0 + assert "Blockings cleared for request “request-01â€." in result.output + mocked_click_edit.assert_not_called() + mocked_blocking_admin.delete_blocking_states.assert_called_once_with(request01.id) + mocked_blocking_admin.record_history.assert_called_once_with( + request01.id, "request cleared" + ) + + +def test_clear_request_empty_message_aborts(mocker, mocked_blocking_admin): + mocker.patch("swh.storage.proxies.blocking.cli.click.edit", side_effect=EditAborted) + runner = CliRunner() + result = runner.invoke( + clear_request, + ["request-01"], + obj={"blocking_admin": mocked_blocking_admin}, + ) + assert result.exit_code == 1 + assert "Aborting" in result.output + mocked_blocking_admin.delete_blocking_states.assert_not_called() + mocked_blocking_admin.record_history.assert_not_called() + + +def test_clear_request_not_found(mocked_blocking_admin): + runner = CliRunner() + result = runner.invoke( + clear_request, ["garbage"], obj={"blocking_admin": mocked_blocking_admin} + ) + assert result.exit_code == 1 + assert "Error: Request “garbage†not found" in result.output + mocked_blocking_admin.delete_blocking_states.assert_not_called() + mocked_blocking_admin.record_history.assert_not_called() diff --git a/swh/storage/tests/blocking/test_db.py b/swh/storage/tests/blocking/test_db.py new file mode 100644 index 000000000..3cb287097 --- /dev/null +++ b/swh/storage/tests/blocking/test_db.py @@ -0,0 +1,280 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from unittest.mock import call +import uuid + +import pytest + +from swh.core.db.db_utils import get_database_info +from swh.storage.proxies.blocking.db import ( + BlockingAdmin, + BlockingQuery, + BlockingState, + BlockingStatus, + DuplicateRequest, + RequestNotFound, +) +from swh.storage.tests.storage_data import StorageData + + +def test_db_version(blocking_admin: BlockingAdmin): + dbmodule, dbversion, dbflavor = get_database_info(blocking_admin.conn.dsn) + assert dbmodule == "storage.proxies.blocking" + assert dbversion == BlockingAdmin.current_version + assert dbflavor is None + + +def test_create_find_request(blocking_admin: BlockingAdmin): + created = blocking_admin.create_request(slug="foo", reason="bar") + + assert created.slug == "foo" + assert created.reason == "bar" + assert created.id is not None + assert created.date is not None + + assert blocking_admin.find_request("foo") == created + + +def test_create_request_conflict(blocking_admin: BlockingAdmin): + blocking_admin.create_request(slug="foo", reason="bar") + + with pytest.raises(DuplicateRequest) as exc_info: + blocking_admin.create_request(slug="foo", reason="quux") + + assert exc_info.value.args == ("foo",) + + +def test_find_request_not_found(blocking_admin): + slug = "notfound" + assert blocking_admin.find_request(slug=slug) is None + + +def test_find_request_by_id(blocking_admin: BlockingAdmin): + created = blocking_admin.create_request(slug="foo", reason="bar") + + assert blocking_admin.find_request_by_id(created.id) == created + + +NON_EXISTING_UUID: uuid.UUID = uuid.UUID("da785a27-acab-4a35-b82a-a5ae3714407c") + + +def test_find_request_by_id_not_found(blocking_admin: BlockingAdmin): + assert blocking_admin.find_request_by_id(NON_EXISTING_UUID) is None + + +@pytest.fixture +def populated_blocking_admin(blocking_admin): + pending_request1 = blocking_admin.create_request(slug="pending1", reason="one") + blocking_admin.set_origins_state( + request_id=pending_request1.id, + new_state=BlockingState.DECISION_PENDING, + urls=[StorageData.origin.url], + ) + pending_request2 = blocking_admin.create_request(slug="pending2", reason="two") + blocking_admin.set_origins_state( + request_id=pending_request2.id, + new_state=BlockingState.NON_BLOCKED, + urls=[StorageData.origin.url], + ) + blocking_admin.set_origins_state( + request_id=pending_request2.id, + new_state=BlockingState.DECISION_PENDING, + urls=[StorageData.origin2.url], + ) + blocking_admin.create_request(slug="cleared", reason="handled") + # We add no blocks to this last one + return blocking_admin + + +def test_get_requests_excluding_cleared_requests(populated_blocking_admin): + assert [ + (request.slug, count) + for request, count in populated_blocking_admin.get_requests( + include_cleared_requests=False + ) + ] == [("pending2", 2), ("pending1", 1)] + + +def test_get_requests_including_cleared_requests(populated_blocking_admin): + assert [ + (request.slug, count) + for request, count in populated_blocking_admin.get_requests( + include_cleared_requests=True + ) + ] == [("cleared", 0), ("pending2", 2), ("pending1", 1)] + + +def test_get_states_for_request(populated_blocking_admin): + request = populated_blocking_admin.find_request("pending2") + states = populated_blocking_admin.get_states_for_request(request.id) + assert states == { + StorageData.origin.url: BlockingState.NON_BLOCKED, + StorageData.origin2.url: BlockingState.DECISION_PENDING, + } + + +def test_get_states_for_request_not_found(blocking_admin: BlockingAdmin): + with pytest.raises(RequestNotFound) as exc_info: + blocking_admin.get_states_for_request(NON_EXISTING_UUID) + assert exc_info.value.args == (NON_EXISTING_UUID,) + + +def test_find_blocking_states(populated_blocking_admin: BlockingAdmin): + urls = [ + StorageData.origin.url, + StorageData.origin2.url, + # This one does not exist in the blocking db + StorageData.origins[3].url, + ] + blocked_origins = populated_blocking_admin.find_blocking_states(urls) + # The order in the output should be grouped by SWHID + assert [ + (blocked.url_pattern, blocked.state, blocked.request_slug) + for blocked in blocked_origins + ] == [ + (StorageData.origin.url, BlockingState.NON_BLOCKED, "pending2"), + ( + StorageData.origin.url, + BlockingState.DECISION_PENDING, + "pending1", + ), + ( + StorageData.origin2.url, + BlockingState.DECISION_PENDING, + "pending2", + ), + ] + + +def test_delete_blocking_states(populated_blocking_admin): + request = populated_blocking_admin.find_request("pending2") + populated_blocking_admin.delete_blocking_states(request.id) + + assert populated_blocking_admin.get_states_for_request(request.id) == {} + + +def test_delete_blocking_states_not_found(blocking_admin: BlockingAdmin): + with pytest.raises(RequestNotFound) as exc_info: + blocking_admin.delete_blocking_states(NON_EXISTING_UUID) + assert exc_info.value.args == (NON_EXISTING_UUID,) + + +def test_record_history(blocking_admin: BlockingAdmin): + messages = [f"message {i}" for i in range(3)] + + request = blocking_admin.create_request(slug="foo", reason="bar") + + for message in messages: + msg = blocking_admin.record_history(request.id, message) + assert msg.date is not None + + +def test_record_history_not_found(blocking_admin: BlockingAdmin): + with pytest.raises(RequestNotFound) as exc_info: + blocking_admin.record_history(NON_EXISTING_UUID, "kaboom") + assert exc_info.value.args == (NON_EXISTING_UUID,) + + +def test_get_history(blocking_admin: BlockingAdmin): + request = blocking_admin.create_request(slug="foo", reason="bar") + blocking_admin.record_history(request.id, "one") + blocking_admin.record_history(request.id, "two") + blocking_admin.record_history(request.id, "three") + + history = blocking_admin.get_history(request.id) + assert all(record.request == request.id for record in history) + assert ["three", "two", "one"] == [record.message for record in history] + + +def test_get_history_not_found(blocking_admin: BlockingAdmin): + with pytest.raises(RequestNotFound) as exc_info: + blocking_admin.get_history(NON_EXISTING_UUID) + assert exc_info.value.args == (NON_EXISTING_UUID,) + + +def test_swhid_lifecycle(blocking_admin: BlockingAdmin, blocking_query: BlockingQuery): + # Create a request + request = blocking_admin.create_request(slug="foo", reason="bar") + + all_origins = [origin.url for origin in StorageData.origins] + blocked_origins = all_origins[::2] + + blocking_admin.set_origins_state( + request_id=request.id, + new_state=BlockingState.DECISION_PENDING, + urls=blocked_origins, + ) + + expected = { + url: BlockingStatus(state=BlockingState.DECISION_PENDING, request=request.id) + for url in blocked_origins + } + + assert blocking_query.origins_are_blocked(all_origins) == expected + + restricted = blocked_origins[0:2] + + blocking_admin.set_origins_state( + request_id=request.id, new_state=BlockingState.BLOCKED, urls=restricted + ) + + for url in restricted: + expected[url] = BlockingStatus(state=BlockingState.BLOCKED, request=request.id) + + assert blocking_query.origins_are_blocked(all_origins) == expected + + visible = blocked_origins[2:4] + + blocking_admin.set_origins_state( + request_id=request.id, new_state=BlockingState.NON_BLOCKED, urls=visible + ) + + for url in visible: + del expected[url] + + assert blocking_query.origins_are_blocked(all_origins) == expected + + for url in visible: + expected[url] = BlockingStatus( + state=BlockingState.NON_BLOCKED, request=request.id + ) + + assert ( + blocking_query.origins_are_blocked(all_origins, all_statuses=True) == expected + ) + + +def test_query_metrics( + blocking_admin: BlockingAdmin, blocking_query: BlockingQuery, mocker +): + increment = mocker.patch("swh.core.statsd.statsd.increment") + + # Create a request + request = blocking_admin.create_request(slug="foo", reason="bar") + + all_origins = [origin.url for origin in StorageData.origins] + blocked_origins = all_origins[::2] + + assert blocking_query.origins_are_blocked(all_origins) == {} + assert increment.call_count == len(all_origins) + increment.assert_has_calls( + [call("swh_storage_blocking_queried_total", 1)] * len(all_origins) + ) + + increment.reset_mock() + blocking_admin.set_origins_state( + request_id=request.id, + new_state=BlockingState.DECISION_PENDING, + urls=blocked_origins, + ) + + assert len(blocking_query.origins_are_blocked(all_origins)) == len(blocked_origins) + assert increment.call_args_list.count( + call("swh_storage_blocking_queried_total", 1) + ) == len(all_origins) + assert increment.call_args_list.count( + call("swh_storage_blocking_blocked_total", 1) + ) == len(blocked_origins) diff --git a/swh/storage/tests/blocking/test_proxy.py b/swh/storage/tests/blocking/test_proxy.py new file mode 100644 index 000000000..da91e6dcd --- /dev/null +++ b/swh/storage/tests/blocking/test_proxy.py @@ -0,0 +1,368 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +from itertools import chain +from typing import Iterable + +import pytest + +from swh.model.model import Origin, OriginVisit, OriginVisitStatus +from swh.storage.exc import BlockedOriginException +from swh.storage.proxies.blocking import BlockingProxyStorage +from swh.storage.proxies.blocking.db import BlockingState + + +def now() -> datetime.datetime: + return datetime.datetime.now(tz=datetime.timezone.utc) + + +@pytest.fixture +def swh_storage_backend_config(): + return { + "cls": "memory", + "journal_writer": { + "cls": "memory", + }, + } + + +@pytest.fixture +def swh_storage(blocking_db_postgresql, swh_storage_backend): + return BlockingProxyStorage( + blocking_db=blocking_db_postgresql.info.dsn, storage=swh_storage_backend + ) + + +def set_origin_visibility(blocking_admin, slug="foo", reason="bar"): + # Create a request + request = blocking_admin.create_request(slug=slug, reason=reason) + + def set_visibility(urls: Iterable[str], new_state: BlockingState): + blocking_admin.set_origins_state( + request_id=request.id, + new_state=new_state, + urls=list(urls), + ) + + return request, set_visibility + + +def mk_origin_visit(origin: Origin): + return OriginVisit(origin=origin.url, date=now(), type="git") + + +def mk_origin_visit_status(origin: Origin): + return OriginVisitStatus( + origin=origin.url, visit=1, date=now(), status="created", snapshot=None + ) + + +OBJ_FACTORY = { + "origin_add": lambda x: x, + "origin_visit_add": mk_origin_visit, + "origin_visit_status_add": mk_origin_visit_status, +} + + +@pytest.mark.parametrize( + "endpoint", ["origin_add", "origin_visit_add", "origin_visit_status_add"] +) +def test_blocking_prefix_match_simple(swh_storage, blocking_admin, endpoint): + request, set_visibility = set_origin_visibility(blocking_admin) + method = getattr(swh_storage, endpoint) + obj_factory = OBJ_FACTORY[endpoint] + backend_storage = swh_storage.storage + + # beware of the mix of example.com vs example.org URLs below + url_patterns = { + BlockingState.DECISION_PENDING: ["https://example.com/user1/repo1"], + BlockingState.BLOCKED: ["https://example.org/user1"], + } + for decision, urls in url_patterns.items(): + set_visibility(urls, decision) + + origins = { + BlockingState.DECISION_PENDING: [ + "https://example.com/user1/repo1", + "https://example.com/user1/repo1/", + "https://example.com/user1/repo1.git", + "https://example.com/user1/repo1.git/", + "https://example.com/user1/repo1/subrepo1", + "https://example.com/user1/repo1/subrepo1.git", + ], + BlockingState.BLOCKED: [ + "https://example.org/user1", + "https://example.org/user1/repo1", + ], + BlockingState.NON_BLOCKED: [ + "https://example.com/user1", + "https://example.com/user1/repo2", + "https://example.com/user1/repo2/", + "https://example.com/user1/repo2.git", + "https://example.com/user1/repo2.git/", + "https://example.com/user1/repo2/subrepo1", + "https://example.com/user1/repo2/subrepo1.git", + "https://example.com/user2/repo1", + "https://example.com/user2/repo1", + "https://example.com/user11", + "https://example.com/user11/repo1", + "https://example.org/user11", + "https://example.org/user11/repo1", + ], + } + all_origins = [Origin(url) for url in chain(*(origins.values()))] + + if endpoint != "origin_add": + # if the endpoint is not 'origin_add', actually insert the origins; no + # visit or visit status should be able to be inserted nonetheless + assert backend_storage.origin_add(all_origins) + if endpoint == "origin_visit_status_add": + # if the endpoint is 'origin_visit_status_add', actually insert the + # origin visits in the backend; but no visit status should be able to + # be inserted via the proxy + assert backend_storage.origin_visit_add( + [mk_origin_visit(origin) for origin in all_origins] + ) + + for state, urls in origins.items(): + if state != BlockingState.NON_BLOCKED: + for url in urls: + origin = Origin(url=url) + with pytest.raises(BlockedOriginException) as exc_info: + method([obj_factory(origin)]) + print("URL should not be inserted", origin.url) + assert { + url: status.state for url, status in exc_info.value.blocked.items() + } == {origin.url: state} + else: + # this should not be blocked + for url in urls: + origin = Origin(url=url) + assert method([obj_factory(origin)]) + + +@pytest.mark.parametrize( + "endpoint", ["origin_add", "origin_visit_add", "origin_visit_status_add"] +) +def test_blocking_prefix_match_with_overload(swh_storage, blocking_admin, endpoint): + request, set_visibility = set_origin_visibility(blocking_admin) + method = getattr(swh_storage, endpoint) + obj_factory = OBJ_FACTORY[endpoint] + backend_storage = swh_storage.storage + + url_patterns = { + BlockingState.DECISION_PENDING: [ + "https://example.com/user1/repo1", + "https://example.com/user2", + ], + BlockingState.NON_BLOCKED: [ + "https://example.com/user2/repo1", + ], + BlockingState.BLOCKED: [ + "https://example.com/user2/repo1/subrepo1", + ], + } + + for decision, urls in url_patterns.items(): + set_visibility(urls, decision) + + origins = { + BlockingState.DECISION_PENDING: [ + "https://example.com/user1/repo1", + "https://example.com/user1/repo1/", + "https://example.com/user1/repo1.git", + "https://example.com/user1/repo1.git/", + "https://example.com/user1/repo1/subrepo1", + "https://example.com/user1/repo1/subrepo1.git", + # everything under https://example.com/user2 should be blocked (but + # https://example.com/user2/repo1) + "https://example.com/user2", + "https://example.com/user2.git", + "https://example.com/user2.git/", + "https://example.com/user2/", + "https://example.com/user2/repo2", + "https://example.com/user2/repo2/", + "https://example.com/user2/repo2.git", + "https://example.com/user2/repo2.git/", + ], + BlockingState.BLOCKED: [ + # "https://example.com/user2/repo1/subrepo1 is explicitly blocked + "https://example.com/user2/repo1/subrepo1", + "https://example.com/user2/repo1/subrepo1.git", + "https://example.com/user2/repo1/subrepo1/subsubrepo", + ], + BlockingState.NON_BLOCKED: [ + "https://example.com/user1", + "https://example.com/user1/repo2", + # https://example.com/user2/repo1 is explicitly enabled + "https://example.com/user2/repo1", + "https://example.com/user2/repo1/", + "https://example.com/user2/repo1.git", + "https://example.com/user2/repo1/subrepo2", + ], + } + all_origins = [Origin(url) for url in chain(*(origins.values()))] + + if endpoint != "origin_add": + # if the endpoint is not 'origin_add', actually insert the origins; no + # visit or visit status should be able to be inserted nonetheless + assert backend_storage.origin_add(all_origins) + if endpoint == "origin_visit_status_add": + # if the endpoint is 'origin_visit_status_add', actually insert the + # origin visits in the backend; but no visit status should be able to + # be inserted via the proxy + assert backend_storage.origin_visit_add( + [mk_origin_visit(origin) for origin in all_origins] + ) + + for state, urls in origins.items(): + if state != BlockingState.NON_BLOCKED: + for url in urls: + origin = Origin(url=url) + with pytest.raises(BlockedOriginException) as exc_info: + method([obj_factory(origin)]) + print("URL should not be inserted", origin.url) + assert { + url: status.state for url, status in exc_info.value.blocked.items() + } == {origin.url: state} + else: + # this should not be blocked + for url in urls: + origin = Origin(url=url) + assert method([obj_factory(origin)]) + + +@pytest.mark.parametrize( + "endpoint", ["origin_add", "origin_visit_add", "origin_visit_status_add"] +) +def test_blocking_prefix_match_add_multi(swh_storage, blocking_admin, endpoint): + request, set_visibility = set_origin_visibility(blocking_admin) + method = getattr(swh_storage, endpoint) + obj_factory = OBJ_FACTORY[endpoint] + backend_storage = swh_storage.storage + + url_patterns = { + BlockingState.DECISION_PENDING: [ + "https://example.com/user1/repo1", + "https://example.com/user2", + ], + BlockingState.NON_BLOCKED: [ + "https://example.com/user2/repo1", + ], + BlockingState.BLOCKED: [ + "https://example.com/user2/repo1/subrepo1", + ], + } + + requests = {} + set_vs = {} + for decision, urls in url_patterns.items(): + request, set_visibility = set_origin_visibility( + blocking_admin, slug=f"foo_{decision.name}" + ) + set_visibility(urls, decision) + requests[decision] = request + set_vs[decision] = set_visibility + + origins = { + BlockingState.DECISION_PENDING: [ + "https://example.com/user1/repo1", + "https://example.com/user1/repo1/", + "https://example.com/user1/repo1.git", + "https://example.com/user1/repo1.git/", + "https://example.com/user1/repo1/subrepo1", + "https://example.com/user1/repo1/subrepo1.git", + # everything under https://example.com/user2 should be blocked (but + # https://example.com/user2/repo1) + "https://example.com/user2", + "https://example.com/user2.git", + "https://example.com/user2.git/", + "https://example.com/user2/", + "https://example.com/user2/repo2", + "https://example.com/user2/repo2/", + "https://example.com/user2/repo2.git", + "https://example.com/user2/repo2.git/", + ], + BlockingState.NON_BLOCKED: [ + # these should work OK by themselves + "https://example.com/user1", + "https://example.com/user1/repo2", + # https://example.com/user2/repo1 is explicitly enabled + "https://example.com/user2/repo1", + "https://example.com/user2/repo1/", + "https://example.com/user2/repo1.git", + "https://example.com/user2/repo1/subrepo2", + ], + BlockingState.BLOCKED: [ + # "https://example.com/user2/repo1/subrepo1 is explicitly blocked + "https://example.com/user2/repo1/subrepo1", + "https://example.com/user2/repo1/subrepo1.git", + "https://example.com/user2/repo1/subrepo1/subsubrepo", + ], + } + + # insert them all at once, none should be inserted and the error give + # reasons for rejected urls + all_origins = [Origin(url) for url in chain(*(origins.values()))] + + if endpoint != "origin_add": + # if the endpoint is not 'origin_add', insert the origins; no + # visit or visit status should be able to be inserted nonetheless + assert backend_storage.origin_add(all_origins) + if endpoint == "origin_visit_status_add": + # if the endpoint is 'origin_visit_status_add', insert the + # origin visits in the backend; but no visit status should be able to + # be inserted via the proxy + assert backend_storage.origin_visit_add( + [mk_origin_visit(origin) for origin in all_origins] + ) + # the insertion of all the objects at once fails because there are blocked + # elements in the batch + with pytest.raises(BlockedOriginException) as exc_info: + method([obj_factory(origin) for origin in all_origins]) + # check the reported list of actually blocked origins is ok + assert { + url + for url, status in exc_info.value.blocked.items() + if status.state == BlockingState.DECISION_PENDING + } == set(origins[BlockingState.DECISION_PENDING]) + assert { + url + for url, status in exc_info.value.blocked.items() + if status.state == BlockingState.BLOCKED + } == set(origins[BlockingState.BLOCKED]) + + # now, set the decision pending pattern to non_blocked and rerun the whole insertion thing + # use the set_visibility helper coming the the request that created decision pending rules + set_visibility = set_vs[BlockingState.DECISION_PENDING] + set_visibility( + url_patterns[BlockingState.DECISION_PENDING], BlockingState.NON_BLOCKED + ) + + with pytest.raises(BlockedOriginException) as exc_info: + method([obj_factory(origin) for origin in all_origins]) + # urls from the (formerly) decision pending batch should now not be reported + # as blocked in the exception + assert not { + url + for url, status in exc_info.value.blocked.items() + if status.state == BlockingState.DECISION_PENDING + } + assert { + url + for url, status in exc_info.value.blocked.items() + if status.state == BlockingState.BLOCKED + } == set(origins[BlockingState.BLOCKED]) + + # now if we ingest only non blocked urls, it should be ok (aka all but the + # urls in origins[BLOCKED]) + assert method( + [ + obj_factory(origin) + for origin in all_origins + if origin.url not in origins[BlockingState.BLOCKED] + ] + ) diff --git a/swh/storage/tests/blocking/test_proxy_blocking.py b/swh/storage/tests/blocking/test_proxy_blocking.py new file mode 100644 index 000000000..a597e47d4 --- /dev/null +++ b/swh/storage/tests/blocking/test_proxy_blocking.py @@ -0,0 +1,87 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools + +import pytest + +from swh.storage.exc import BlockedOriginException +from swh.storage.proxies.blocking import BlockingProxyStorage +from swh.storage.proxies.blocking.db import BlockingState +from swh.storage.tests.storage_data import StorageData +from swh.storage.tests.test_in_memory import TestInMemoryStorage as _TestStorage + + +@pytest.fixture +def swh_storage_backend_config(): + yield { + "cls": "memory", + "journal_writer": { + "cls": "memory", + }, + } + + +# This simply test that with the last 2 origins found in StorageData.origins +# marked as blocked, most standard tests or OK but a few of them (the ones +# actually inserting these 2 now blocked origins). These few tests need to be +# overloaded accordingly. + +BLOCKED_ORIGINS = set(origin.url for origin in StorageData.origins[-2:]) + + +@pytest.fixture +def swh_storage(blocking_db_postgresql, blocking_admin, swh_storage_backend): + # Create a request + request = blocking_admin.create_request(slug="foo", reason="bar") + + blocking_admin.set_origins_state( + request_id=request.id, + new_state=BlockingState.DECISION_PENDING, + urls=list(BLOCKED_ORIGINS), + ) + + return BlockingProxyStorage( + blocking_db=blocking_db_postgresql.info.dsn, storage=swh_storage_backend + ) + + +class TestStorage(_TestStorage): + @pytest.mark.xfail(reason="typing.Protocol instance check is annoying") + def test_types(self, *args, **kwargs): + super().test_types(*args, **kwargs) + + +EXPECTED_BLOCKED_EXCEPTIONS = { + "test_origin_add", + "test_origin_visit_status_get_random_nothing_found", +} + +for method_name in dir(TestStorage): + if not method_name.startswith("test_"): + continue + + method = getattr(TestStorage, method_name) + + @functools.wraps(method) + def wrapped_test(*args, method=method, **kwargs): + try: + method(*args, **kwargs) + except BlockedOriginException as exc: + assert ( + method.__name__ in EXPECTED_BLOCKED_EXCEPTIONS + ), f"{method.__name__} shouldn't raise a BlockedOriginException" + + assert exc.blocked, "The exception was raised with no blocked origin" + + assert ( + set(exc.blocked.keys()) <= BLOCKED_ORIGINS + ), "Found an unexpectedly blocked origin" + else: + assert ( + method.__name__ not in EXPECTED_BLOCKED_EXCEPTIONS + ), f"{method.__name__} should raise a BlockedOriginException" + + setattr(TestStorage, method_name, wrapped_test) diff --git a/swh/storage/tests/blocking/test_proxy_no_blocking.py b/swh/storage/tests/blocking/test_proxy_no_blocking.py new file mode 100644 index 000000000..950ca4d86 --- /dev/null +++ b/swh/storage/tests/blocking/test_proxy_no_blocking.py @@ -0,0 +1,36 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import pytest + +from swh.storage.proxies.blocking import BlockingProxyStorage +from swh.storage.tests.test_in_memory import TestInMemoryStorage as _TestStorage + +# This simply test that without any blocked URL defined in the blocking proxy, +# all standard tests are OK + + +@pytest.fixture +def swh_storage_backend_config(): + yield { + "cls": "memory", + "journal_writer": { + "cls": "memory", + }, + } + + +@pytest.fixture +def swh_storage(blocking_db_postgresql, swh_storage_backend): + return BlockingProxyStorage( + blocking_db=blocking_db_postgresql.info.dsn, storage=swh_storage_backend + ) + + +class TestStorage(_TestStorage): + @pytest.mark.xfail(reason="typing.Protocol instance check is annoying") + def test_types(self, *args, **kwargs): + super().test_types(*args, **kwargs) -- GitLab From 5251f367c1caf95137a1fc62e3dfc23b8f99279b Mon Sep 17 00:00:00 2001 From: David Douard <david.douard@sdfa3.org> Date: Fri, 3 May 2024 13:42:44 +0200 Subject: [PATCH 2/2] Add a blocking event logging facility for the blocking proxy This add a 'blocking_origin_log' table in the database where each filtering event is logged, be it a deny or an expliciy accept event. --- swh/storage/proxies/blocking/db.py | 60 ++++++ .../proxies/blocking/sql/30-schema.sql | 15 ++ swh/storage/tests/blocking/test_proxy.py | 173 ++++++++++++++++++ 3 files changed, 248 insertions(+) diff --git a/swh/storage/proxies/blocking/db.py b/swh/storage/proxies/blocking/db.py index 69488035d..f3bf9022e 100644 --- a/swh/storage/proxies/blocking/db.py +++ b/swh/storage/proxies/blocking/db.py @@ -69,6 +69,20 @@ class RequestHistory: """Free-form history information (e.g. "policy decision made")""" +@attr.s +class BlockingLogEntry: + url = attr.ib(type=str) + """origin url that have been blocked""" + url_match = attr.ib(type=str) + """url matching pattern that caused the blocking of the origin url""" + request = attr.ib(type=UUID) + """id of the blocking request""" + date = attr.ib(type=datetime.datetime) + """Date the blocking event occurred""" + state = attr.ib(type=BlockingState) + """Blocking state responsible for the blocking event""" + + @attr.s class BlockedOrigin: request_slug = attr.ib(type=str) @@ -333,6 +347,42 @@ class BlockingAdmin(BlockingDb): ) return records + def get_log( + self, request_id: Optional[UUID] = None, url: Optional[str] = None + ) -> List[BlockingLogEntry]: + cur = self.cursor() + where = [] + args = [] + if request_id: + where.append("request = %s") + args.append(str(request_id)) + if url: + where.append("url = %s") + args.append(url) + if where: + condition = "WHERE " + " AND ".join(where) + else: + condition = "" + cur.execute( + f"""SELECT date, url, url_match, request, state + FROM blocked_origin_log + {condition} + ORDER BY date DESC""", + args, + ) + records = [] + for db_date, db_url, db_url_match, db_request_id, db_state in cur: + records.append( + BlockingLogEntry( + url=db_url, + url_match=db_url_match, + request=db_request_id, + date=db_date, + state=BlockingState[db_state.upper()], + ) + ) + return records + class BlockingQuery(BlockingDb): def origins_are_blocked( @@ -357,6 +407,8 @@ class BlockingQuery(BlockingDb): If the given url matches a set of registered blocking rules, return the most appropriate one. Otherwise, return None. + + Log the blocking event in the database (log only a matching events). """ logging.debug("url: %s", url) cur = self.cursor() @@ -426,5 +478,13 @@ class BlockingQuery(BlockingDb): ) logger.debug("Matching status for %s: %s", url_match, status) statsd.increment(METRIC_BLOCKED_TOTAL, 1) + # log the event; even a NON_BLOCKED decision is logged + cur.execute( + """ + INSERT INTO blocked_origin_log (url, url_match, request, state) + VALUES (%s, %s, %s, %s) + """, + (url, url_match, status.request, status.state.name.lower()), + ) return status return None diff --git a/swh/storage/proxies/blocking/sql/30-schema.sql b/swh/storage/proxies/blocking/sql/30-schema.sql index 6a25a0971..42d6e2d0d 100644 --- a/swh/storage/proxies/blocking/sql/30-schema.sql +++ b/swh/storage/proxies/blocking/sql/30-schema.sql @@ -38,3 +38,18 @@ comment on table blocked_origin is 'All the origin known to be affected by a spe comment on column blocked_origin.url_match is 'The url matching scheme to be blocked from being ingested'; comment on column blocked_origin.request is 'Reference to the affecting request'; comment on column blocked_origin.state is 'The degree to which the origin is blocked as a result of the request'; + +create table if not exists blocked_origin_log ( + url text not null, + url_match text not null, + request uuid references blocking_request(id) not null, + state blocked_state not null, + date timestamptz not null default now(), + primary key (url, date) +); +comment on table blocked_origin_log is 'Log origins that got blocked by the blocking proxy'; +comment on column blocked_origin_log.url is 'The url of the origin'; +comment on column blocked_origin_log.url_match is 'The url pattern matching the origin'; +comment on column blocked_origin_log.request is 'Reference to the request which caused the blocking'; +comment on column blocked_origin_log.state is 'The degree to which the origin has been blocked'; +comment on column blocked_origin_log.date is 'The date the origin got blocked'; diff --git a/swh/storage/tests/blocking/test_proxy.py b/swh/storage/tests/blocking/test_proxy.py index da91e6dcd..294660991 100644 --- a/swh/storage/tests/blocking/test_proxy.py +++ b/swh/storage/tests/blocking/test_proxy.py @@ -366,3 +366,176 @@ def test_blocking_prefix_match_add_multi(swh_storage, blocking_admin, endpoint): if origin.url not in origins[BlockingState.BLOCKED] ] ) + + +@pytest.mark.parametrize( + "endpoint", ["origin_add", "origin_visit_add", "origin_visit_status_add"] +) +def test_blocking_log(swh_storage, blocking_admin, endpoint): + method = getattr(swh_storage, endpoint) + obj_factory = OBJ_FACTORY[endpoint] + backend_storage = swh_storage.storage + + url_patterns = { + BlockingState.DECISION_PENDING: [ + "https://example.com/user1/repo1", + "https://example.com/user2", + ], + BlockingState.NON_BLOCKED: [ + "https://example.com/user2/repo1", + ], + BlockingState.BLOCKED: [ + "https://example.com/user2/repo1/subrepo1", + ], + } + requests = {} + set_vs = {} + for decision, urls in url_patterns.items(): + request, set_visibility = set_origin_visibility( + blocking_admin, slug=f"foo_{decision.name}" + ) + set_visibility(urls, decision) + requests[decision] = request + set_vs[decision] = set_visibility + + origins = { + BlockingState.DECISION_PENDING: [ + "https://example.com/user1/repo1", + "https://example.com/user1/repo1/", + "https://example.com/user1/repo1.git", + "https://example.com/user1/repo1.git/", + "https://example.com/user1/repo1/subrepo1", + "https://example.com/user1/repo1/subrepo1.git", + # everything under https://example.com/user2 should be blocked (but + # https://example.com/user2/repo1) + "https://example.com/user2", + "https://example.com/user2.git", + "https://example.com/user2.git/", + "https://example.com/user2/", + "https://example.com/user2/repo2", + "https://example.com/user2/repo2/", + "https://example.com/user2/repo2.git", + "https://example.com/user2/repo2.git/", + ], + None: [ + # these should work OK by themselves, not matching any rule + "https://example.com/user1", + "https://example.com/user1/repo2", + "https://example.org/user2/repo2", + "https://example.org/user3/repo2/", + ], + BlockingState.NON_BLOCKED: [ + # https://example.com/user2/repo1 is explicitly enabled + "https://example.com/user2/repo1", + "https://example.com/user2/repo1/", + "https://example.com/user2/repo1.git", + "https://example.com/user2/repo1/subrepo2", + ], + BlockingState.BLOCKED: [ + # "https://example.com/user2/repo1/subrepo1 is explicitly blocked + "https://example.com/user2/repo1/subrepo1", + "https://example.com/user2/repo1/subrepo1.git", + "https://example.com/user2/repo1/subrepo1/subsubrepo", + ], + } + + # insert them all at once, none should be inserted and the error give + # reasons for rejected urls + all_origins = [Origin(url) for url in chain(*(origins.values()))] + + ts_before = now() + if endpoint != "origin_add": + # if the endpoint is not 'origin_add', insert the origins; no + # visit or visit status should be able to be inserted nonetheless + assert backend_storage.origin_add(all_origins) + if endpoint == "origin_visit_status_add": + # if the endpoint is 'origin_visit_status_add', insert the + # origin visits in the backend; but no visit status should be able to + # be inserted via the proxy + assert backend_storage.origin_visit_add( + [mk_origin_visit(origin) for origin in all_origins] + ) + + with pytest.raises(BlockedOriginException): + method([obj_factory(origin) for origin in all_origins]) + ts_after = now() + + # Check blocking journal log + log = blocking_admin.get_log() + assert set(origins[BlockingState.DECISION_PENDING]) == set( + x.url for x in log if x.state == BlockingState.DECISION_PENDING + ) + assert set(origins[BlockingState.BLOCKED]) == set( + x.url for x in log if x.state == BlockingState.BLOCKED + ) + assert set(origins[BlockingState.NON_BLOCKED]) == set( + x.url for x in log if x.state == BlockingState.NON_BLOCKED + ) + urls_with_event = {x.url for x in log} + for url in origins[None]: + assert url not in urls_with_event + + for logentry in log: + assert logentry.request == requests[logentry.state].id + assert ts_before < logentry.date < ts_after + + # check the get_log(request_id=xx) method call + for state, request in requests.items(): + log = blocking_admin.get_log(request_id=request.id) + assert set(origins[state]) == set(x.url for x in log) + + # check the get_log(url=xx) method call + for origin in all_origins: + url = origin.url + log = blocking_admin.get_log(url=url) + if url in origins[None]: + assert not log + else: + assert len(log) == 1 + logentry = log[0] + assert logentry.url == url + assert logentry.request == requests[logentry.state].id + + # now, set the decision pending pattern to non_blocked and rerun the whole + # insertion thing; use the set_visibility helper coming with the request + # that created decision pending rules + set_visibility = set_vs[BlockingState.DECISION_PENDING] + set_visibility( + url_patterns[BlockingState.DECISION_PENDING], BlockingState.NON_BLOCKED + ) + + with pytest.raises(BlockedOriginException): + method([obj_factory(origin) for origin in all_origins]) + + # for each url in origin[BLOCKED], we should have 2 'blocked' log entries + for url in origins[BlockingState.BLOCKED]: + log = blocking_admin.get_log(url=url) + assert len(log) == 2 + assert {entry.state for entry in log} == {BlockingState.BLOCKED} + + # for each url in origin[NON_BLOCKED], we should have 2 'non-blocked' log entries + for url in origins[BlockingState.NON_BLOCKED]: + log = blocking_admin.get_log(url=url) + assert len(log) == 2 + assert {entry.state for entry in log} == {BlockingState.NON_BLOCKED} + + # for each url in origin[DESCISION_PENDING], we should have 1 'decision-pending' and + # 1 'non-blocked' log entry + for url in origins[BlockingState.DECISION_PENDING]: + log = blocking_admin.get_log(url=url) + assert len(log) == 2 + assert {entry.state for entry in log} == { + BlockingState.NON_BLOCKED, + BlockingState.DECISION_PENDING, + } + + # for each url in origin[NON_BLOCKED], we should have 2 'non-blocked' log entries + for url in origins[BlockingState.NON_BLOCKED]: + log = blocking_admin.get_log(url=url) + assert len(log) == 2 + assert {entry.state for entry in log} == {BlockingState.NON_BLOCKED} + + # for each url in origin[None], we should have no log entry + for url in origins[None]: + log = blocking_admin.get_log(url=url) + assert len(log) == 0 -- GitLab