Skip to content
Snippets Groups Projects
Commit 5fa878e4 authored by Franck Bret's avatar Franck Bret
Browse files

Apply Antoine Lambert review patch

Store authentication tokens in global configuration file with following
structure:

keycloak:
  client_id: swh-web
  realm_name: SoftwareHeritage
  server_url: https://auth.softwareheritage.org/auth/
keycloak_tokens:
  SoftwareHeritage:
    swh-web: xxxtokenxxx
parent 59f3326f
No related branches found
No related tags found
No related merge requests found
# Copyright (C) 2021 The Software Heritage developers
# Copyright (C) 2021-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -8,21 +8,18 @@
import os
import sys
from typing import Any, Dict
from typing import Any, Dict, Optional
import click
from click.core import Context
from swh.core.cli import swh as swh_cli_group
from swh.core.config import SWH_GLOBAL_CONFIG
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
# TODO (T1410): All generic config code should reside in swh.core.config
DEFAULT_CONFIG_PATH = os.environ.get(
"SWH_AUTH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "auth.yml")
)
DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), SWH_GLOBAL_CONFIG)
# Keycloak OpenID Connect defaults
DEFAULT_CONFIG: Dict[str, Any] = {
"keycloak": {
"server_url": "https://auth.softwareheritage.org/auth/",
......@@ -33,48 +30,49 @@ DEFAULT_CONFIG: Dict[str, Any] = {
@swh_cli_group.group(name="auth", context_settings=CONTEXT_SETTINGS)
@click.option(
"-C",
"--config-file",
default=DEFAULT_CONFIG_PATH,
type=click.Path(dir_okay=False, path_type=str),
help="Path to configuration file",
envvar="SWH_CONFIG_FILENAME",
show_default=True,
)
@click.option(
"--oidc-server-url",
"--server-url",
"server_url",
default=f"{DEFAULT_CONFIG['keycloak']['server_url']}",
"-u",
default=None,
help=(
"URL of OpenID Connect server (default to "
f"\"{DEFAULT_CONFIG['keycloak']['server_url']}\")"
"URL of OpenID Connect server, default to "
+ repr(DEFAULT_CONFIG["keycloak"]["server_url"])
),
)
@click.option(
"--realm-name",
"realm_name",
default=f"{DEFAULT_CONFIG['keycloak']['realm_name']}",
"-r",
default=None,
help=(
"Name of the OpenID Connect authentication realm "
f"(default to \"{DEFAULT_CONFIG['keycloak']['realm_name']}\")"
"Name of the OpenID Connect authentication realm, default to "
+ repr(DEFAULT_CONFIG["keycloak"]["realm_name"])
),
)
@click.option(
"--client-id",
"client_id",
default=f"{DEFAULT_CONFIG['keycloak']['client_id']}",
"-c",
default=None,
help=(
"OpenID Connect client identifier in the realm "
f"(default to \"{DEFAULT_CONFIG['keycloak']['client_id']}\")"
"OpenID Connect client identifier in the realm, default to "
+ repr(DEFAULT_CONFIG["keycloak"]["realm_name"])
),
)
@click.option(
"-C",
"--config-file",
default=None,
type=click.Path(exists=True, dir_okay=False, path_type=str),
help=f"Path to authentication configuration file (default: {DEFAULT_CONFIG_PATH})",
)
@click.pass_context
def auth(
ctx: Context,
server_url: str,
realm_name: str,
client_id: str,
config_file: str,
oidc_server_url: Optional[str] = None,
realm_name: Optional[str] = None,
client_id: Optional[str] = None,
):
"""
Software Heritage Authentication tools.
......@@ -85,37 +83,27 @@ def auth(
from swh.auth.keycloak import KeycloakOpenIDConnect
from swh.core import config
# Env var takes precedence on params
# Params takes precedence on "auth.yml" configuration file
# Configuration file takes precedence on default auth config values
# Set auth config to default values
# default config
cfg = DEFAULT_CONFIG
# Merge with default auth config file
default_cfg_from_file = config.load_named_config("auth", global_conf=False)
cfg = config.merge_configs(cfg, default_cfg_from_file)
# Merge with user config file if any
if config_file:
user_cfg_from_file = config.read_raw_config(config_file)
cfg = config.merge_configs(cfg, user_cfg_from_file)
else:
config_file = DEFAULT_CONFIG_PATH
# Merge with params if any (params load env var too)
ctx.ensure_object(dict)
params = {}
for key in DEFAULT_CONFIG["keycloak"].keys():
if key in ctx.params:
params[key] = ctx.params[key]
if params:
cfg = config.merge_configs(cfg, {"keycloak": params})
# merge config located in config file if any
cfg = config.merge_configs(cfg, config.read_raw_config(config_file))
assert "keycloak" in cfg
# override config with command parameters if provided
if "keycloak" not in cfg:
cfg["keycloak"] = {}
if oidc_server_url is not None:
cfg["keycloak"]["server_url"] = oidc_server_url
if realm_name is not None:
cfg["keycloak"]["realm_name"] = realm_name
if client_id is not None:
cfg["keycloak"]["client_id"] = client_id
ctx.ensure_object(dict)
ctx.obj["config_file"] = config_file
ctx.obj["keycloak"] = cfg["keycloak"]
ctx.obj["config"] = cfg
# Instantiate an OpenId connect client from keycloak auth configuration
# The 'keycloak' key is mandatory
ctx.obj["oidc_client"] = KeycloakOpenIDConnect.from_config(keycloak=cfg["keycloak"])
......@@ -154,11 +142,12 @@ def generate_token(ctx: Context, username: str, password):
oidc_info = ctx.obj["oidc_client"].login(
username, password, scope="openid offline_access"
)
print(oidc_info["refresh_token"])
return oidc_info["refresh_token"]
if "invoked_by_config" in ctx.parent.__dict__:
return oidc_info["refresh_token"]
else:
click.echo(oidc_info["refresh_token"])
except KeycloakError as ke:
print(keycloak_error_message(ke))
sys.exit(1)
ctx.fail(keycloak_error_message(ke))
@auth.command("revoke-token")
......@@ -210,66 +199,81 @@ def auth_config(ctx: Context, username: str, token: str):
import yaml
from swh.auth.keycloak import KeycloakError, keycloak_error_message
from swh.auth.utils import get_token_from_config
from swh.core import config
assert "oidc_client" in ctx.obj
cfg = ctx.obj["config"]
config_file = ctx.obj["config_file"]
kc_config = cfg["keycloak"]
oidc_client = ctx.obj["oidc_client"]
# params > config
# Ensure we get a token
raw_token: str = ""
refresh_token = get_token_from_config(
cfg, kc_config["realm_name"], kc_config["client_id"]
)
if token:
# Verify the token is valid
raw_token = token
elif "token" in ctx.obj["keycloak"] and ctx.obj["keycloak"]["token"]:
# A token entry exists in keycloak auth config object
msg = f"A token entry exists in {ctx.obj['config_file']}\n"
if refresh_token:
msg = (
f"A token was found in {config_file} for realm '{kc_config['realm_name']}' "
f"and client '{kc_config['client_id']}'"
)
click.echo(click.style(msg, fg="green"))
else:
refresh_token = token
if refresh_token:
next_action = click.prompt(
text="Would you like to verify it or generate a new one?",
text="Would you like to verify the token or generate a new one?",
type=click.Choice(["verify", "generate"]),
default="verify",
)
if next_action == "verify":
raw_token = ctx.obj["keycloak"]["token"]
if next_action == "generate":
refresh_token = None
if not raw_token:
if not refresh_token:
if not username:
click.echo(
f"A new token will be generated for realm '{kc_config['realm_name']}'"
f"and client '{kc_config['client_id']}'"
)
username = click.prompt(text="Username")
raw_token = ctx.invoke(generate_token, username=username)
assert raw_token
refresh_token = raw_token.strip()
else:
click.echo(
f"A new token for username '{username}' will be generated for realm "
f"'{kc_config['realm_name']}' and client '{kc_config['client_id']}'"
)
setattr(ctx, "invoked_by_config", True)
refresh_token = ctx.invoke(generate_token, username=username)
msg = f"Token generation success for username {username}"
click.echo(click.style(msg, fg="green"))
# Ensure the token is valid by getting user info
# Ensure the token is valid
try:
# userinfo endpoint needs an access_token
access_token = oidc_client.refresh_token(refresh_token=refresh_token)[
"access_token"
]
oidc_info = oidc_client.userinfo(access_token=access_token)
msg = (
f"Token verification success for username {oidc_info['preferred_username']}"
)
# check if an access token can be generated from the refresh token
oidc_client.refresh_token(refresh_token=refresh_token)["access_token"]
msg = f"Token verification success for username {username}"
click.echo(click.style(msg, fg="green"))
# Store the valid token into keycloak auth config object
ctx.obj["keycloak"]["token"] = refresh_token
# Store the valid token into config object
cfg = config.merge_configs(
cfg,
{
"keycloak_tokens": {
kc_config["realm_name"]: {
kc_config["client_id"]: refresh_token,
}
}
},
)
except KeycloakError as ke:
msg = keycloak_error_message(ke)
click.echo(click.style(msg, fg="red"))
ctx.exit(1)
ctx.fail(keycloak_error_message(ke))
# Save auth configuration file?
if not click.confirm(
"Save authentication settings to\n" f"{ctx.obj['config_file']}?"
):
sys.exit(1)
if not click.confirm(f"Save authentication settings to {config_file}?"):
ctx.exit(0)
# Save configuration to file
config_path = Path(ctx.obj["config_file"])
config_path = Path(config_file)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(yaml.safe_dump({"keycloak": ctx.obj["keycloak"]}))
config_path.write_text(yaml.safe_dump(cfg))
msg = "\nAuthentication configuration file '%s' written successfully"
msg %= click.format_filename(str(config_path))
msg = f"Authentication configuration file {config_file} written successfully"
click.echo(click.style(msg, fg="green"))
......@@ -14,14 +14,16 @@ runner = CliRunner()
@pytest.fixture()
def keycloak_oidc(keycloak_oidc, mocker):
def _keycloak_oidc(server_url, realm_name, client_id):
keycloak_oidc.server_url = server_url
keycloak_oidc.realm_name = realm_name
keycloak_oidc.client_id = client_id
def _keycloak_oidc_from_config(keycloak):
keycloak_oidc.server_url = keycloak["server_url"]
keycloak_oidc.realm_name = keycloak["realm_name"]
keycloak_oidc.client_id = keycloak["client_id"]
return keycloak_oidc
keycloak_oidc_client = mocker.patch("swh.auth.keycloak.KeycloakOpenIDConnect")
keycloak_oidc_client.side_effect = _keycloak_oidc
keycloak_oidc_client_from_config = mocker.patch(
"swh.auth.keycloak.KeycloakOpenIDConnect.from_config"
)
keycloak_oidc_client_from_config.side_effect = _keycloak_oidc_from_config
return keycloak_oidc
......@@ -74,8 +76,8 @@ def test_auth_generate_token_error(keycloak_oidc, mocker, user_credentials):
result = _run_auth_command(
command, keycloak_oidc, input=f"{user_credentials['password']}\n"
)
assert result.exit_code == 1
assert result.output[:-1] == "invalid_grant: Invalid user credentials"
assert result.exit_code != 0
assert "invalid_grant: Invalid user credentials" in result.output
def test_auth_remove_token_ok(keycloak_oidc):
......
......@@ -6,7 +6,7 @@
from base64 import urlsafe_b64encode
import hashlib
import secrets
from typing import Tuple
from typing import Any, Dict, Optional, Tuple
def gen_oidc_pkce_codes() -> Tuple[str, str]:
......@@ -33,3 +33,9 @@ def gen_oidc_pkce_codes() -> Tuple[str, str]:
code_challenge_str = code_challenge_str.replace("=", "")
return code_verifier_str, code_challenge_str
def get_token_from_config(
config: Dict[str, Any], realm_name: str, client_id: str
) -> Optional[str]:
return config.get("keycloak_tokens", {}).get(realm_name, {}).get((client_id))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment