Skip to content
Snippets Groups Projects
Unverified Commit 593be3d8 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

runner: Update task status only after sending the tasks to rabbitmq

Prior to this, the runner called the `grab_ready{_priority}_tasks` method.
Those method update the task's status to 'next_run_scheduled' at the listing
time. So it actually writes immediately to postgresql.

So, failing to write to rabbitmq would update the status anyway. So we change
the runner's calls to use the `peek_ready{_priority}_tasks` methods
instead. This now only gets the task list to schedule. And at the end of the
runner, there is a call of `mass_schedule_task_runs` method. This method is
now in charge to update the tasks' status to 'next_run_scheduled' within the
same transaction.

Refs. swh/infra/sysadm-environment#5512
parent b98f6f92
No related branches found
No related tags found
No related merge requests found
Pipeline #14006 failed
......@@ -982,6 +982,12 @@ class SchedulerBackend:
- backend_id
- scheduled
"""
# Mark the associated task as next_run_scheduled in the same transaction
query = """update task set status='next_run_scheduled'
where id in %s"""
cur.execute(query, (tuple([tr.task for tr in task_runs]),))
cur.execute("select swh_scheduler_mktemp_task_run()")
db.copy_to(
(task_run.to_dict() for task_run in task_runs),
......
# Copyright (C) 2015-2024 The Software Heritage developers
# Copyright (C) 2015-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -129,7 +129,7 @@ def run_ready_tasks(
if with_priority:
# grab max_queue_length (or 10) potential tasks with any priority for
# the same type (limit the result to avoid too long running queries)
grabbed_priority_tasks = backend.grab_ready_priority_tasks(
grabbed_priority_tasks = backend.peek_ready_priority_tasks(
task_type_name, num_tasks=max_queue_length or 10
)
if grabbed_priority_tasks:
......@@ -150,7 +150,7 @@ def run_ready_tasks(
# full), to help postgresql use properly indexed queries.
if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5:
# Only grab num_tasks tasks with no priority
grabbed_tasks = backend.grab_ready_tasks(
grabbed_tasks = backend.peek_ready_tasks(
task_type_name, num_tasks=num_tasks
)
if grabbed_tasks:
......
# Copyright (C) 2021-2024 The Software Heritage developers
# Copyright (C) 2021-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -30,7 +30,7 @@ def scheduler_runner_process(
remaining = queue.slots_remaining()
if remaining < min_batch_size:
continue
next_tasks = env.scheduler.grab_ready_tasks(
next_tasks = env.scheduler.peek_ready_tasks(
f"load-{visit_type}", num_tasks=remaining, timestamp=env.time
)
logger.debug(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment