Skip to content
Snippets Groups Projects

cli: Pass all journal_client config keys to the JournalClient

Compare and
1 file
+ 12
11
Compare changes
  • Side-by-side
  • Inline
+ 12
11
@@ -288,17 +288,22 @@ def journal_client(
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url)
brokers = brokers or journal_cfg.get("brokers")
if not brokers:
if brokers:
journal_cfg["brokers"] = brokers
if not journal_cfg.get("brokers"):
raise ValueError("The brokers configuration is mandatory.")
prefix = prefix or journal_cfg.get("prefix")
group_id = group_id or journal_cfg.get("group_id")
if prefix:
journal_cfg["prefix"] = prefix
if group_id:
journal_cfg["group_id"] = group_id
origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get(
"origin_metadata_task_type"
)
stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects")
batch_size = batch_size or journal_cfg.get("batch_size", 200)
if stop_after_objects:
journal_cfg["stop_after_objects"] = stop_after_objects
if batch_size:
journal_cfg["batch_size"] = batch_size
object_types = set()
worker_fns: List[Callable[[ObjectsDict], Dict]] = []
@@ -359,12 +364,8 @@ def journal_client(
client = get_journal_client(
cls="kafka",
brokers=brokers,
prefix=prefix,
group_id=group_id,
object_types=list(object_types),
stop_after_objects=stop_after_objects,
batch_size=batch_size,
**journal_cfg,
)
def worker_fn(objects: ObjectsDict):
Loading