Skip to content
Snippets Groups Projects
Commit 4b44d4dd authored by vlorentz's avatar vlorentz
Browse files

athena_inventory_to_sorted_sha1s.py: Add error handling

There's are 1.1M files with unexpected names (sharded or two short)
parent 3bee45af
No related branches found
No related tags found
No related merge requests found
......@@ -10,7 +10,7 @@ import tqdm
from swh.dataset.athena import human_size, query, _s3_url_to_bucket_path
(_, output_filename) = sys.argv
(_, output_filename, error_log) = sys.argv
athena = boto3.client("athena")
athena.output_location = "s3://vlorentz-test2/tmp/athena/inventory"
......@@ -49,7 +49,15 @@ query_string = f"""
SELECT key FROM swhinventory.inventory WHERE dt='{last_inventory_dt}';
"""
print(query_string)
result = query(athena, query_string)
print(result)
if result["Statistics"]["TotalExecutionTimeInMillis"] < 1000:
# suspiciously low
logging.warning("Request too fast, repairing table and retrying")
query(athena, "MSCK REPAIR TABLE swhinventory.inventory")
result = query(athena, query_string)
logging.info(
"Scanned %s in %s",
human_size(result["Statistics"]["DataScannedInBytes"]),
......@@ -104,15 +112,33 @@ HEX_SHA1_SIZE = 40
LINE_SIZE = HEX_SHA1_SIZE + 1 # sha1 + \n
BATCH_SIZE = 10000
buf = b""
with tqdm.tqdm(total=rows_count, unit_scale=True, unit="rows", desc="Writing") as pbar:
with open(output_filename, "wb") as output_file:
with open(output_filename, "wb") as output_file, open(error_log, "wb") as error_file:
while True:
chunk = sort_proc.stdout.read(LINE_SIZE * BATCH_SIZE)
if not chunk:
break
(new_lines_count, remainder) = divmod(len(chunk), LINE_SIZE)
assert remainder == 0
pbar.update(new_lines_count)
output_file.write(
b"".join(bytes.fromhex(line.decode()) for line in chunk.split(b"\n"))
)
pbar.update(chunk.count(b"\n"))
buf += chunk
(chunk, buf) = chunk.rsplit(b"\n", 1)
try:
output_file.write(
b"".join(bytes.fromhex(line.decode()) for line in chunk.split(b"\n"))
)
except ValueError:
# wat
for line in chunk.split(b"\n"):
if b"/" in line:
error_file.write(line + b"\n")
continue
if len(line) != 40:
error_file.write(line + b"\n")
continue
try:
output_file.write(bytes.fromhex(line.decode()))
except ValueError:
error_file.write(line + b"\n")
assert buf == b"", buf
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment