There is going to be an issue with the content replayer in steady state: content objects (without the data) are written to Kafka before the data is written to the objstorage.
So if content replayers are fast enough (and they probably will), they'll try to access the data in objstorage before it's there
(And if we wrote to Kafka after writing to the objstorage, then there would a risk of Kafka missing some content in case of failure, which is worse)
A possible solution is to have the content replayer retry reading objects, until they are available.
There is however the issue of missing objects (swh/devel/experiments/swh-db-audit#1817 (moved)), so it can't retry forever for all objects or it will get stuck. We see two possible solutions:
a retry timeout, but it means that some objects might be skipped when they shouldn't (eg. if the object takes a lot of time to be available in the objstorage)
"hardcoding" a list of missing objects in the configuration, but it could possibly grow large with time (hopefully it won't)
My guts on this task tell me that what we need (what we really really need) is a 3rd solution:
Have a client-specific FIFO queue in which failed content ids are pushed. Then a garbage-collection process is in charge of pulling these content ids from the queue and attempt to copy them again a number of times (and give up after that, if any).
This would prevent the main 'steady-state' content replayers from being curbed by temp failures on some sort.
This needs to be marked out to ensure we do not overflow this queue (i.e. if a majority of object copies are in failure, we do not want to fille this queue as fast as kafka messages are pushed on the content topic.)
A simple solution would be to simply limit the capacity of this queue and make the content-replayer stop/crash if it needs to push an object id in this garbage-collector queue but cannot do so for some reason (like the queue is full).
This queue can be stored as a kafka topic, but this is an implementation detail, so it could be done by any FIFO capable provider. In fact, since it's purely a client-side, we do not want to provide any such 'service' as allowing a journal consumer (client) to create new topics in the main kafka.
I agree with @douardda that the "failed content" queue + separate processor approach would be the most sensible.
I also agree that the depth of this queue must be limited, and that reaching the queue depth should prevent further reads from being committed to kafka.
We can't really know the size of a kafka topic, so I don't think this would be an appropriate solution. The queue needs to be resilient across restarts of the client infrastructure, so memcached is out; It also needs to be shared across workers (ideally), so we should probably look at something rabbitmq (ugh) or redis-based.
So, now that swh-dataset#1914 is stuck, I'm giving this a harder think, and I'm wondering whether we shouldn't have a generic buffering/filtering component in the journal instead:
journal writing and "archive integrity" checking are currently concurrent processes, and the main archive storage transaction can fail while the messages are still written to the journal (for any and all object types). We don't have a (reasonably performant) way of interlocking the main storage and journal transactions.
We would like our mirrors to be as close to the main archive as possible (for instance, in terms of object counts, it'd be weird for a mirror to have more objects and be "ahead" of the original)...
distributing the "retry getting this object because maybe it hasn't arrived yet" logic among clients feels pretty brittle to me
we're already running a copy of the kafka topics from one cluster (the rocquencourt "buffer") to another (the azure "full copy")
I'm therefore tempted to just write a new swh.journal.filter (swh.journal.mirror_maker to replicate the kafka terminology?) component, which would read from a set of raw journal topics, check that the objects exist in a given storage + objstorage backend (or even on the readonly mirror), and if so push them to "cleaned" kafka topics.
This component would centralize the "has this object already appeared?" logic, as well as the queueing+retry logic, and would replace the current kafka mirror component.
increment num_attempts, set latest_attempt to now(), set in_flight to true
if the retry policy (num_attempts or retry delay) is exceeded, sets given_up = true
A housekeeping routine would clear old enough given_up entries.
Key metrics for the filter component:
kafka consumer offset
min(latest_attempt) where in_flight = true (time it takes for a message from submission in the buffer to (re-)processing by the filter; should stay close to the current time)
count(*) where given_up = false group by topic (number of objects pending a retry, should be small)
count(*) where in_flight = true group by topic (number of objects buffered for reprocessing, should be small)
max(latest_attempt) (last processing time by the requeuing process)
count(*) where given_up = true (checks whether the housekeeping process)
Note: haven't read the other comment below, just reacting at this one as I am reading it.
! In #2003 (closed), @olasd wrote:
So, now that swh-dataset#1914 is stuck, I'm giving this a harder think, and I'm wondering whether we shouldn't have a generic buffering/filtering component in the journal instead:
journal writing and "archive integrity" checking are currently concurrent processes, and the main archive storage transaction can fail while the messages are still written to the journal (for any and all object types). We don't have a (reasonably performant) way of interlocking the main storage and journal transactions.
We would like our mirrors to be as close to the main archive as possible (for instance, in terms of object counts, it'd be weird for a mirror to have more objects and be "ahead" of the original)...
distributing the "retry getting this object because maybe it hasn't arrived yet" logic among clients feels pretty brittle to me
we're already running a copy of the kafka topics from one cluster (the rocquencourt "buffer") to another (the azure "full copy")
I'm therefore tempted to just write a new swh.journal.filter (swh.journal.mirror_maker to replicate the kafka terminology?) component, which would read from a set of raw journal topics, check that the objects exist in a given storage + objstorage backend (or even on the readonly mirror), and if so push them to "cleaned" kafka topics.
This component would centralize the "has this object already appeared?" logic, as well as the queueing+retry logic, and would replace the current kafka mirror component.
How does that sound?
Question: do we (expect to) have the problem (temporary inconsistent state of the archive emitting messages in the kafka topics) for other object type than content?
I expect the content objects to be the more prone to this failure mode due to the "distributed" nature of storing them (storage + objstorage). If we focus on object content, maybe a simple solution would be to have the objstorage public a topic on kafka as well, with a message for each content added in the objstorage. So a content replayer would be dedicated to the objstorage replication.
I believe this solution can be seen as a simplified version of your swh.journal.filter proposal.
min(latest_attempt) where in_flight = true (time it takes for a message from submission in the buffer to (re-)processing by the filter; should stay close to the current time)
count(*) where given_up = false group by topic (number of objects pending a retry, should be small)
count(*) where in_flight = true group by topic (number of objects buffered for reprocessing, should be small)
max(latest_attempt) (last processing time by the requeuing process)
count(*) where given_up = true (checks whether the housekeeping process)
excellent idea to add this section in your description!
! In #2003 (closed), @olasd wrote:
This component would centralize the "has this object already appeared?" logic, as well as the queueing+retry logic, and would replace the current kafka mirror component.
How does that sound?
I'm a bit worried this would be a first step of "let's implement a transaction layer on our nosql distributed database". Maybe I'm a bit pessimistic here...
I mean, if we want the messages in kafka to be published only after the objects have been inserted for sure, why don't we just write these messages in kafka after we insert them in the DB in the first place?
@olasd I'm worried that implementing your idea would result in some complex piece of code. It also adds a new postgresql database and new kafka topics, that will need extra resources and management. And if at some point that queue database becomes too large, the retrier will become slower, causing the queue to grow even more.
Alternative idea, which I think is much simpler:
Writes to Kafka are handled by a specific swh-storage "backend" (which also copies to the objstorage), after input validation (using swh-model)
A Kafka consumer writes data to postgresql. If the data is refused by postgresql, then it's either a temporary failure or a bug in the input validation.
There's however the issue of dealing with input validation being too laxist if there's already a message in Kafka that shouldn't be there.
! In #2003 (closed), @douardda wrote:
Question: do we (expect to) have the problem (temporary inconsistent state of the archive emitting messages in the kafka topics) for other object type than content?
No. postgresql may fail or time out on any object insertion. There's nothing special about contents.
! In #2003 (closed), @douardda wrote:
I'm a bit worried this would be a first step of "let's implement a transaction layer on our nosql distributed database". Maybe I'm a bit pessimistic here...
I mean, if we want the messages in kafka to be published only after the objects have been inserted for sure, why don't we just write these messages in kafka after we insert them in the DB in the first place?
We already discussed this at the time we replaced the journal-publisher with journal-writer. Adding to Kafka after inserting to the DB means that Kafka will be missing some messages, and we would need to run a backfiller on a regular basis to fix it.
! In #2003 (closed), @vlorentz wrote:
@olasd I'm worried that implementing your idea would result in some complex piece of code.
We're already using kafka's built-in mirror maker . It's also a complex piece of code, which sometimes mysteriously stops working (it's currently running in a for loop...).
It also adds a new postgresql database and new kafka topics, that will need extra resources and management.
Technically, we already have two sets of kafka topics. This idea would just repurpose them.
And if at some point that queue database becomes too large, the retrier will become slower, causing the queue to grow even more.
The retrier would only add entries for the objects which fail the existence check (which should be just a few), and will keep them around for just a few retries. We don't even /have/ to enable it for objects other than contents.
Alternative idea, which I think is much simpler:
Writes to Kafka are handled by a specific swh-storage "backend" (which also copies to the objstorage), after input validation (using swh-model)
A Kafka consumer writes data to postgresql. If the data is refused by postgresql, then it's either a temporary failure or a bug in the input validation.
There's however the issue of dealing with input validation being too laxist if there's already a message in Kafka that shouldn't be there.
I really like how your idea pushes us forward with several of our goals:
distributing the ingestion load to a, well, distributed system
centralizing the load on additions to the main archive to a single, more manageable component
adding input validation everywhere
disentangling the loading process away from the main storage technology towards our generic data model
The journal consumer component already exists, it's the replayer. My main concern here is how battle-tested it is, but I guess the currently running cassandra replay, as well as @douardda's earlier tests of the mirroring infra, can give us a good idea there.
As for the caveat of input validation being too laxist, or messages in kafka that shouldn't be there, we already have a handler for object format quirks in the replayer.
I guess the next step to move this forward would be implementing the kafka-first storage, and kicking its tires in the docker environment?
! In #2003 (closed), @vlorentz wrote:
We already discussed this at the time we replaced the journal-publisher with journal-writer. Adding to Kafka after inserting to the DB means that Kafka will be missing some messages, and we would need to run a backfiller on a regular basis to fix it.
I know this discussion already took place, but I could not remember why we chose to serialize write operations the way we did. One question could be 'what is the definitive source of truth in our stack?' Because if we use pg as such a trustworthy truth keeper (which is what we do for now), I don't see why we should run any backfiller process if an object could not reach it.
Indeed we need better data validation all along the path, especially at the very beginning of the ingestion process pipeline of "the storage" (whatever this means in the context of this discussion) as @olasd (and I believe everyone) already mentioned, but since sh*t happen, we must make sure we remain reasonably consistent and efficient.
Maybe we need to make sure we have a clear causality/dependency graph for data ingestion in the graph, including all the whistles and belts we have / plan to have (aka consumers of the graph; replication, mirrors, indexers, etc.) so we can estimate the consequences of any misbehavior of one node or edge of this graph...
The pipeline(s) would be then just an implementation of this model I guess...
! In #2003 (closed), @olasd wrote:
Now that I think of it, we can decompose this in stages in the storage pipeline:
add an input validating proxy high up the stack
replace the journal writer calls sprinkled in all methods with a journal writing proxy
add a "don't insert objects" filter low down the stack
so we'd end up with the following pipeline for workers:
input validation proxy
object bundling proxy
object deduplication against read-only proxy
journal writer proxy
addition-blocking filter
underlying read-only storage
and the following pipeline for the "main storage replayer":
underlying read-write storage
(it's a very short pipeline... a pipedash?)
Ok I am not sure I get how this (interesting) idea solves our current problem of space-time causality violation.
If an ingestion worker only writes to the journal (if I understand your idea correctly), who is responsible for pushing actual content in the (master) objstorage? Is it a missing step of the worker's pipeline?