with task_with_delay as ( select type, status, policy, current_interval, extract(epoch from (now() - next_run)) as delay from task), task_with_bucket as (select ( case when delay between - 24 * 3600 and 24 * 3600 then ceil(delay / 3600) * 3600 else ceil(delay / (24 * 3600)) * 24 * 3600 end) as delay_bucket, type, status, policy, current_interval from task_with_delay), grouped_tasks as (select type, status, policy, current_interval, delay_bucket, count(*) from task_with_bucket group by type, status, policy, current_interval, delay_bucket order by type, status, policy, current_interval, delay_bucket) select type, status, policy, current_interval, delay_bucket as lt, sum(count) over (partition by type, status, policy, current_interval order by delay_bucket) from grouped_tasks;
This generates a bunch of gauges that should be compatible with what prometheus expects for histograms.
The tasks are bucketed by delay, with buckets that are 1-hour wide between -24 and +24 hours, and 24 hours wide outside this range.
The last bit of the query should probably make sure that all buckets get a line in the result set: right now only the buckets with contents get a line, which will make prometheus think the buckets are wider than they actually are, and break its quantile calculation heuristics.
with task_with_delay as ( select "type", status, "policy", current_interval, extract(epoch from (now() - next_run)) as delay from task),delay_bounds as ( select extract(epoch from min(now() - next_run)) as min, extract(epoch from max(now() - next_run)) as max from task),task_with_bucket as ( select ( case when delay between - 24 * 3600 and 24 * 3600 then ceil(delay / 3600) * 3600 else ceil(delay / (24 * 3600)) * 24 * 3600 end) as delay_bucket, "type", status, "policy", current_interval from task_with_delay),task_count_by_bucket as ( select "type", status, "policy", current_interval, delay_bucket, count(*) from task_with_bucket group by "type", status, "policy", current_interval, delay_bucket order by "type", status, "policy", current_interval, delay_bucket),delay_buckets as ( select generate_series(- 23, 23) * 3600 as delay_bucket union select generate_series(ceil(min / (24 * 3600))::bigint, ceil(max / (24 * 3600))::bigint) * 24 * 3600 as delay_bucket from delay_bounds),task_buckets as ( select * from ( select distinct ("type") from task_count_by_bucket) as t cross join ( select distinct (status) from task_count_by_bucket) as s cross join ( select distinct ("policy") from task_count_by_bucket) as p cross join ( select distinct (current_interval) from task_count_by_bucket) as i cross join ( select distinct (delay_bucket) from delay_buckets) as db),task_count_for_all_buckets as ( select "type", status, "policy", current_interval, delay_bucket, coalesce(count, 0) as count from task_buckets left join task_count_by_bucket using ("type", status, "policy", current_interval, delay_bucket)),cumulative_buckets as ( select "type", status, "policy", current_interval, delay_bucket as lt, sum(count) over (partition by "type", status, "policy", current_interval order by delay_bucket) from task_count_for_all_bucketsunion allselect "type", status, "policy", current_interval, '+Inf' as lt, sum(count)from task_count_for_all_bucketsgroup by "type", status, "policy", current_interval order by "type", status, "policy", current_interval, lt)select "type", status, policy, current_interval, ( case when lt = '+Inf' then '+Inf' else lt::text end), sumfrom cumulative_buckets;
with task_count_by_bucket as ( -- get the count of tasks by delay bucket. Tasks are grouped by their -- characteristics (type, status, policy, priority, current interval), -- then by delay buckets that are 1 hour wide between -24 and +24 hours, -- and 1 day wide outside of this range. -- A positive delay means the task execution is late wrt scheduling. select "type", status, "policy", priority, current_interval, ( -- select the bucket widths case when delay between - 24 * 3600 and 24 * 3600 then (ceil(delay / 3600)::bigint) * 3600 else (ceil(delay / (24 * 3600))::bigint) * 24 * 3600 end ) as delay_bucket, count(*) from task join lateral ( -- this is where the "positive = late" convention is set select extract(epoch from (now() - next_run)) as delay ) as d on true group by "type", status, "policy", priority, current_interval, delay_bucket order by "type", status, "policy", priority, current_interval, delay_bucket ), delay_bounds as ( -- get the minimum and maximum delay bucket for each task group. This will -- let us generate all the buckets, even the empty ones in the next CTE. select "type", status, "policy", priority, current_interval, min(delay_bucket) as min, max(delay_bucket) as max from task_count_by_bucket group by "type", status, "policy", priority, current_interval ), task_buckets as ( -- Generate all time buckets for all categories. select "type", status, "policy", priority, current_interval, delay_bucket from delay_bounds join lateral ( -- 1 hour buckets select generate_series(- 23, 23) * 3600 as delay_bucket union -- 1 day buckets. The "- 1" is used to make sure we generate an empty -- bucket as lowest delay bucket, so prometheus quantile calculations -- stay accurate select generate_series(min / (24 * 3600) - 1, max / (24 * 3600)) * 24 * 3600 as delay_bucket ) as buckets on true ), task_count_for_all_buckets as ( -- This join merges the non-empty buckets (task_count_by_bucket) with -- the full list of buckets (task_buckets). -- The join clause can't use the "using (x, y, z)" syntax, as it uses -- equality and priority and current_interval can be null. This also -- forces us to label all the fields in the select. Ugh. select task_buckets."type", task_buckets.status, task_buckets."policy", task_buckets.priority, task_buckets.current_interval, task_buckets.delay_bucket, coalesce(count, 0) as count -- make sure empty buckets have a 0 count instead of null from task_buckets left join task_count_by_bucket on task_count_by_bucket."type" = task_buckets."type" and task_count_by_bucket.status = task_buckets.status and task_count_by_bucket. "policy" = task_buckets."policy" and task_count_by_bucket.priority is not distinct from task_buckets.priority and task_count_by_bucket.current_interval is not distinct from task_buckets.current_interval and task_count_by_bucket.delay_bucket = task_buckets.delay_bucket ), cumulative_buckets as ( -- Prometheus wants cumulative histograms: for each bucket, the value -- needs to be the total of all measurements below the given value (this -- allows downsampling by just throwing away some buckets). We use the -- "sum over partition" window function to compute this. -- Prometheus also expects a "+Inf" bucket for the total count. We -- generate it with a null lt value so we can sort it after the rest of -- the buckets. -- cumulative data select "type", status, "policy", priority, current_interval, delay_bucket as lt, sum(count) over ( partition by "type", status, "policy", priority, current_interval order by delay_bucket ) from task_count_for_all_buckets union all -- +Inf data select "type", status, "policy", priority, current_interval, null as lt, sum(count) from task_count_for_all_buckets group by "type", status, "policy", priority, current_interval -- sorting of all buckets order by "type", status, "policy", priority, current_interval, lt asc NULLS last -- make sure +Inf ends up last ) -- The final query, which at this point just has to make sure that all -- labels are text (or the SQL exporter croaks) select -- we retrieve the backend name here as that's what we have e.g. on the celery side (select backend_name from task_type where cumulative_buckets."type" = task_type."type") as task, status::text as status, policy::text as policy, coalesce(priority::text, '') as priority, coalesce(current_interval::text, '') as current_interval, coalesce(lt::text, '+Inf') as lt, sum from cumulative_buckets
the bottom panel shows that we're lagging (not a surprise), but now we have a better idea of how much (a lot ;_;).
The performance of the quantile computations on tasks with a lot of spread (like git loader tasks) is pretty abysmal considering the (very) large amount of buckets. We should be able to reduce the precision in very extremal cases (e.g. over a week of delay) to reduce the number of buckets in prometheus (at the expense of lower precision).