Skip to content
Snippets Groups Projects
Commit a208f882 authored by David Douard's avatar David Douard
Browse files

Make the stream journal client accept a string as output_stream config entry

and support the "-" value (for stdout).
parent 0e90328a
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,7 @@ def fill_writer(writer: JournalWriterInterface) -> List[Tuple[str, Dict]]:
objd.pop("data")
expected.append((object_type, objd))
writer.flush()
return expected
......@@ -41,3 +42,38 @@ def test_stream_journal_writer_stream():
for i, (objtype, objd) in enumerate(unpacker, start=1):
assert (objtype, objd) in expected
assert len(expected) == i
def test_stream_journal_writer_filename(tmp_path):
out_fname = str(tmp_path / "journal.msgpack")
writer = get_journal_writer(
cls="stream",
value_sanitizer=model_object_dict_sanitizer,
output_stream=out_fname,
)
expected = fill_writer(writer)
with open(out_fname, "rb") as outs:
unpacker = kafka_stream_to_value(outs)
for i, (objtype, objd) in enumerate(unpacker, start=1):
assert (objtype, objd) in expected
assert len(expected) == i
def test_stream_journal_writer_stdout(capfdbinary):
writer = get_journal_writer(
cls="stream",
value_sanitizer=model_object_dict_sanitizer,
output_stream="-",
)
expected = fill_writer(writer)
captured = capfdbinary.readouterr()
assert captured.err == b""
outs = io.BytesIO(captured.out)
unpacker = kafka_stream_to_value(outs)
for i, (objtype, objd) in enumerate(unpacker, start=1):
assert (objtype, objd) in expected
assert len(expected) == i
......@@ -3,7 +3,9 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Type
import os
import sys
from typing import Any, BinaryIO, Dict, Type
import warnings
from .interface import JournalWriterInterface
......@@ -50,6 +52,14 @@ def get_journal_writer(cls, **kwargs) -> JournalWriterInterface:
JournalWriter = StreamJournalWriter
assert "output_stream" in kwargs
outstream: BinaryIO
if kwargs["output_stream"] in ("-", b"-"):
outstream = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
elif isinstance(kwargs["output_stream"], (str, bytes)):
outstream = open(kwargs["output_stream"], "wb")
else:
outstream = kwargs["output_stream"]
kwargs["output_stream"] = outstream
else:
raise ValueError("Unknown journal writer class `%s`" % cls)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment