Skip to content
Snippets Groups Projects
Commit ef7cd991 authored by Nicolas Dandrimont's avatar Nicolas Dandrimont
Browse files

Improve logging in the API client and the revision layer

parent 3edf3690
No related branches found
No related tags found
No related merge requests found
......@@ -155,7 +155,7 @@ class MetaRabbitMQClient(type):
data=batch,
)
)
return self.wait_for_acks(acks_expected)
return self.wait_for_acks(meth_name, acks_expected)
except BaseException as ex:
self.request_termination(str(ex))
return False
......@@ -293,14 +293,14 @@ class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQCl
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def open_channel(self) -> None:
LOGGER.info("Creating a new channel")
LOGGER.debug("Creating a new channel")
assert self._connection is not None
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel: pika.channel.Channel) -> None:
LOGGER.info("Channel opened")
LOGGER.debug("Channel opened")
self._channel = channel
LOGGER.info("Adding channel close callback")
LOGGER.debug("Adding channel close callback")
assert self._channel is not None
self._channel.add_on_close_callback(callback=self.on_channel_closed)
self.setup_queue()
......@@ -312,14 +312,14 @@ class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQCl
self.close_connection()
def setup_queue(self) -> None:
LOGGER.info("Declaring callback queue")
LOGGER.debug("Declaring callback queue")
assert self._channel is not None
self._channel.queue_declare(
queue="", exclusive=True, callback=self.on_queue_declare_ok
)
def on_queue_declare_ok(self, frame: pika.frame.Method) -> None:
LOGGER.info("Binding queue to default exchanger")
LOGGER.debug("Binding queue to default exchanger")
assert self._channel is not None
self._callback_queue = frame.method.queue
self._channel.basic_qos(
......@@ -327,12 +327,12 @@ class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQCl
)
def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None:
LOGGER.info("QOS set to: %d", self._prefetch_count)
LOGGER.debug("QOS set to: %d", self._prefetch_count)
self.start_consuming()
def start_consuming(self) -> None:
LOGGER.info("Issuing consumer related RPC commands")
LOGGER.info("Adding consumer cancellation callback")
LOGGER.debug("Issuing consumer related RPC commands")
LOGGER.debug("Adding consumer cancellation callback")
assert self._channel is not None
self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled)
assert self._callback_queue is not None
......@@ -342,7 +342,7 @@ class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQCl
self._consuming = True
def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:
LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame)
LOGGER.debug("Consumer was cancelled remotely, shutting down: %r", method_frame)
if self._channel:
self._channel.close()
......@@ -370,16 +370,16 @@ class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQCl
def stop_consuming(self) -> None:
if self._channel:
LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ")
LOGGER.debug("Sending a Basic.Cancel RPC command to RabbitMQ")
self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok)
def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None:
self._consuming = False
LOGGER.info(
LOGGER.debug(
"RabbitMQ acknowledged the cancellation of the consumer: %s",
self._consumer_tag,
)
LOGGER.info("Closing the channel")
LOGGER.debug("Closing the channel")
assert self._channel is not None
self._channel.close()
......@@ -442,20 +442,27 @@ class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQCl
),
)
def wait_for_acks(self, acks_expected: int) -> bool:
def wait_for_acks(self, meth_name: str, acks_expected: int) -> bool:
acks_received = 0
timeout = max(
(acks_expected / self._batch_size) * self._wait_per_batch,
self._wait_min,
)
start = time.monotonic()
end = start + timeout
while acks_received < acks_expected:
local_timeout = end - time.monotonic()
if local_timeout < 1.0:
local_timeout = 1.0
try:
acks_received += self.wait_for_response(timeout=timeout)
acks_received += self.wait_for_response(timeout=local_timeout)
except ResponseTimeout:
LOGGER.warning(
"Timed out waiting for acks, %s received, %s expected",
"Timed out waiting for acks in %s, %s received, %s expected (in %ss)",
meth_name,
acks_received,
acks_expected,
time.monotonic() - start,
)
return False
return acks_received == acks_expected
......
......@@ -123,7 +123,7 @@ class Provenance:
def flush_if_necessary(self) -> bool:
"""Flush if the number of cached information reached a limit."""
LOGGER.info("Cache stats: %s", self._get_cache_stats())
LOGGER.debug("Cache stats: %s", self._get_cache_stats())
if self._flush_limit_reached():
self.flush()
return True
......
......@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import logging
from typing import Generator, Iterable, Iterator, List, Optional, Tuple
from swh.core.statsd import statsd
......@@ -17,6 +18,8 @@ from .model import DirectoryEntry, RevisionEntry
REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds"
logger = logging.getLogger(__name__)
class CSVRevisionIterator:
"""Iterator over revisions typically present in the given CSV file.
......@@ -69,6 +72,13 @@ def revision_add(
# Processed content starting from the revision's root directory.
date = provenance.revision_get_date(revision)
if date is None or revision.date < date:
logger.debug(
"Processing revision %s on %s (root %s)",
revision.id.hex(),
revision.date,
revision.root.hex(),
)
logger.debug("provenance date: %s, building isochrone graph", date)
graph = build_isochrone_graph(
provenance,
archive,
......@@ -76,6 +86,7 @@ def revision_add(
DirectoryEntry(revision.root),
minsize=minsize,
)
logger.debug("isochrone graph built, processing content")
revision_process_content(
provenance,
archive,
......@@ -88,6 +99,7 @@ def revision_add(
minsize=minsize,
)
if commit:
logger.debug("flushing batch")
provenance.flush()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment