Skip to content
Snippets Groups Projects
Commit 9cece1f6 authored by Antoine Pietri's avatar Antoine Pietri
Browse files

graph backend: use global config

parent 97e79c28
No related branches found
Tags v0.2.2
No related merge requests found
......@@ -6,9 +6,7 @@
import asyncio
import contextlib
import io
import logging
import os
import pathlib
import struct
import subprocess
import sys
......@@ -16,6 +14,7 @@ import tempfile
from py4j.java_gateway import JavaGateway
from swh.graph.config import check_config
from swh.graph.pid import NodeToPidMap, PidToNodeMap
from swh.model.identifiers import PID_TYPES
......@@ -26,30 +25,6 @@ NODE2PID_EXT = 'node2pid.bin'
PID2NODE_EXT = 'pid2node.bin'
def find_graph_jar():
"""find swh-graph.jar, containing the Java part of swh-graph
look both in development directories and installed data (for in-production
deployments who fecthed the JAR from pypi)
"""
swh_graph_root = pathlib.Path(__file__).parents[2]
try_paths = [
swh_graph_root / 'java/target/',
pathlib.Path(sys.prefix) / 'share/swh-graph/',
pathlib.Path(sys.prefix) / 'local/share/swh-graph/',
]
for path in try_paths:
glob = list(path.glob('swh-graph-*.jar'))
if glob:
if len(glob) > 1:
logging.warn('found multiple swh-graph JARs, '
'arbitrarily picking one')
logging.info('using swh-graph JAR: {0}'.format(glob[0]))
return str(glob[0])
raise RuntimeError('swh-graph JAR not found. Have you run `make java`?')
def _get_pipe_stderr():
# Get stderr if possible, or pipe to stdout if running with Jupyter.
try:
......@@ -61,28 +36,17 @@ def _get_pipe_stderr():
class Backend:
def __init__(self, graph_path):
def __init__(self, graph_path, config=None):
self.gateway = None
self.entry = None
self.graph_path = graph_path
self.config = check_config(config or {})
def __enter__(self):
# TODO: make all of that configurable with sane defaults
java_opts = [
'-Xmx200G',
'-server',
'-XX:PretenureSizeThreshold=512M',
'-XX:MaxNewSize=4G',
'-XX:+UseLargePages',
'-XX:+UseTransparentHugePages',
'-XX:+UseNUMA',
'-XX:+UseTLAB',
'-XX:+ResizeTLAB',
]
self.gateway = JavaGateway.launch_gateway(
java_path=None,
javaopts=java_opts,
classpath=find_graph_jar(),
javaopts=self.config['java_tool_options'].split(),
classpath=self.config['classpath'],
die_on_exit=True,
redirect_stdout=sys.stdout,
redirect_stderr=_get_pipe_stderr(),
......
......@@ -215,7 +215,7 @@ def map_lookup(graph, identifier):
@click.pass_context
def serve(ctx, host, port, graph):
"""run the graph REST service"""
backend = Backend(graph_path=graph)
backend = Backend(graph_path=graph, config=ctx.obj['config'])
app = make_app(backend=backend)
with backend:
......
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import psutil
import sys
from pathlib import Path
def find_graph_jar():
"""find swh-graph.jar, containing the Java part of swh-graph
look both in development directories and installed data (for in-production
deployments who fecthed the JAR from pypi)
"""
swh_graph_root = Path(__file__).parents[2]
try_paths = [
swh_graph_root / 'java/target/',
Path(sys.prefix) / 'share/swh-graph/',
Path(sys.prefix) / 'local/share/swh-graph/',
]
for path in try_paths:
glob = list(path.glob('swh-graph-*.jar'))
if glob:
if len(glob) > 1:
logging.warn('found multiple swh-graph JARs, '
'arbitrarily picking one')
logging.info('using swh-graph JAR: {0}'.format(glob[0]))
return str(glob[0])
raise RuntimeError('swh-graph JAR not found. Have you run `make java`?')
def check_config(conf):
"""check configuration and propagate defaults
"""
conf = conf.copy()
if 'batch_size' not in conf:
conf['batch_size'] = '1000000000' # 1 billion
if 'max_ram' not in conf:
conf['max_ram'] = str(psutil.virtual_memory().total)
if 'java_tool_options' not in conf:
conf['java_tool_options'] = ' '.join([
'-Xmx{max_ram}',
'-XX:PretenureSizeThreshold=512M',
'-XX:MaxNewSize=4G',
'-XX:+UseLargePages',
'-XX:+UseTransparentHugePages',
'-XX:+UseNUMA',
'-XX:+UseTLAB',
'-XX:+ResizeTLAB',
])
conf['java_tool_options'] = conf['java_tool_options'].format(
max_ram=conf['max_ram'])
if 'java' not in conf:
conf['java'] = 'java'
if 'classpath' not in conf:
conf['classpath'] = find_graph_jar()
return conf
def check_config_compress(config, graph_name, in_dir, out_dir):
"""check compression-specific configuration and initialize its execution
environment.
"""
conf = check_config(config)
conf['graph_name'] = graph_name
conf['in_dir'] = str(in_dir)
conf['out_dir'] = str(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if 'tmp_dir' not in conf:
tmp_dir = out_dir / 'tmp'
conf['tmp_dir'] = str(tmp_dir)
else:
tmp_dir = Path(conf['tmp_dir'])
tmp_dir.mkdir(parents=True, exist_ok=True)
if 'logback' not in conf:
logback_confpath = tmp_dir / 'logback.xml'
with open(logback_confpath, 'w') as conffile:
conffile.write("""
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d %r %p [%t] %logger{1} - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
""")
conf['logback'] = str(logback_confpath)
conf['java_tool_options'] += ' -Dlogback.configurationFile={logback}'
conf['java_tool_options'] = conf['java_tool_options'].format(
logback=conf['logback'])
print(conf)
return conf
......@@ -43,7 +43,6 @@ class TestCompress(unittest.TestCase):
graph:
compress:
batch_size: 1000
java_tool_options: -Dlogback.configurationFile={logback}
""")
tmpconf.close()
self.conffile = Path(tmpconf.name)
......
......@@ -15,12 +15,9 @@ from enum import Enum
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set
import psutil
from click import ParamType
from swh.graph.backend import find_graph_jar
from swh.graph.config import check_config_compress
class CompressionStep(Enum):
......@@ -162,60 +159,6 @@ def do_step(step, conf):
return rc
def check_config(conf, graph_name, in_dir, out_dir):
"""check compression configuration, propagate defaults, and initialize
execution environment
"""
conf = conf.copy()
conf['graph_name'] = graph_name
conf['in_dir'] = str(in_dir)
conf['out_dir'] = str(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if 'tmp_dir' not in conf:
tmp_dir = out_dir / 'tmp'
conf['tmp_dir'] = str(tmp_dir)
else:
tmp_dir = Path(conf['tmp_dir'])
tmp_dir.mkdir(parents=True, exist_ok=True)
if 'batch_size' not in conf:
conf['batch_size'] = '1000000000' # 1 billion
if 'logback' not in conf:
logback_confpath = tmp_dir / 'logback.xml'
with open(logback_confpath, 'w') as conffile:
conffile.write("""
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d %r %p [%t] %logger{1} - %m%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
""")
conf['logback'] = str(logback_confpath)
if 'max_ram' not in conf:
conf['max_ram'] = str(psutil.virtual_memory().total)
if 'java_tool_options' not in conf:
assert 'logback' in conf
conf['java_tool_options'] = ' '.join([
'-Xmx{max_ram}', '-XX:PretenureSizeThreshold=512M',
'-XX:MaxNewSize=4G', '-XX:+UseLargePages',
'-XX:+UseTransparentHugePages', '-XX:+UseNUMA', '-XX:+UseTLAB',
'-XX:+ResizeTLAB', '-Dlogback.configurationFile={logback}'
])
conf['java_tool_options'] = conf['java_tool_options'].format(
max_ram=conf['max_ram'], logback=conf['logback'])
if 'java' not in conf:
conf['java'] = 'java'
if 'classpath' not in conf:
conf['classpath'] = find_graph_jar()
return conf
def compress(graph_name: str, in_dir: Path, out_dir: Path,
steps: Set[CompressionStep] = set(COMP_SEQ),
conf: Dict[str, str] = {}):
......@@ -248,7 +191,7 @@ def compress(graph_name: str, in_dir: Path, out_dir: Path,
if not steps:
steps = set(COMP_SEQ)
conf = check_config(conf, graph_name, in_dir, out_dir)
conf = check_config_compress(conf, graph_name, in_dir, out_dir)
compression_start_time = datetime.now()
logging.info(f'starting compression at {compression_start_time}')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment