From a0727184360f9f4341190b05604f080e136f1990 Mon Sep 17 00:00:00 2001
From: "Antoine R. Dumont (@ardumont)" <antoine.romain.dumont@gmail.com>
Date: Wed, 3 Mar 2021 09:54:32 +0100
Subject: [PATCH] Add swh.web.auth.KeycloakOpenIDConnect to swh.auth module

Related to T3079
---
 requirements.txt     |   2 +-
 swh/auth/__init__.py | 140 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 141 insertions(+), 1 deletion(-)

diff --git a/requirements.txt b/requirements.txt
index 54ce666..a536a34 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
 # Add here external Python modules dependencies, one per line. Module names
 # should match https://pypi.python.org/pypi names. For the full spec or
 # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
-
+python-keycloak >= 0.19.0
diff --git a/swh/auth/__init__.py b/swh/auth/__init__.py
index e69de29..6741769 100644
--- a/swh/auth/__init__.py
+++ b/swh/auth/__init__.py
@@ -0,0 +1,140 @@
+# Copyright (C) 2020-2021  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict, Optional, Tuple
+from urllib.parse import urlencode
+
+from keycloak import KeycloakOpenID
+
+
+class KeycloakOpenIDConnect:
+    """
+    Wrapper class around python-keycloak to ease the interaction with Keycloak
+    for managing authentication and user permissions with OpenID Connect.
+    """
+
+    def __init__(
+        self,
+        server_url: str,
+        realm_name: str,
+        client_id: str,
+        realm_public_key: str = "",
+    ):
+        """
+        Args:
+            server_url: URL of the Keycloak server
+            realm_name: The realm name
+            client_id: The OpenID Connect client identifier
+            realm_public_key: The realm public key (will be dynamically
+                retrieved if not provided)
+        """
+        self._keycloak = KeycloakOpenID(
+            server_url=server_url, client_id=client_id, realm_name=realm_name,
+        )
+
+        self.server_url = server_url
+        self.realm_name = realm_name
+        self.client_id = client_id
+        self.realm_public_key = realm_public_key
+
+    def well_known(self) -> Dict[str, Any]:
+        """
+        Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
+
+        Returns:
+            A dictionary filled with OpenID Connect URIS.
+        """
+        return self._keycloak.well_know()
+
+    def authorization_url(self, redirect_uri: str, **extra_params: str) -> str:
+        """
+        Get OpenID Connect authorization URL to authenticate users.
+
+        Args:
+            redirect_uri: URI to redirect to once a user is authenticated
+            extra_params: Extra query parameters to add to the
+                authorization URL
+        """
+        auth_url = self._keycloak.auth_url(redirect_uri)
+        if extra_params:
+            auth_url += "&%s" % urlencode(extra_params)
+        return auth_url
+
+    def authorization_code(
+        self, code: str, redirect_uri: str, **extra_params: str
+    ) -> Dict[str, Any]:
+        """
+        Get OpenID Connect authentication tokens using Authorization
+        Code flow.
+
+        Args:
+            code: Authorization code provided by Keycloak
+            redirect_uri: URI to redirect to once a user is authenticated
+                (must be the same as the one provided to authorization_url):
+            extra_params: Extra parameters to add in the authorization request
+                payload.
+        """
+        return self._keycloak.token(
+            grant_type="authorization_code",
+            code=code,
+            redirect_uri=redirect_uri,
+            **extra_params,
+        )
+
+    def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
+        """
+        Request a new access token from Keycloak using a refresh token.
+
+        Args:
+            refresh_token: A refresh token provided by Keycloak
+
+        Returns:
+            A dictionary filled with tokens info
+        """
+        return self._keycloak.refresh_token(refresh_token)
+
+    def decode_token(
+        self, token: str, options: Optional[Dict[str, Any]] = None
+    ) -> Dict[str, Any]:
+        """
+        Try to decode a JWT token.
+
+        Args:
+            token: A JWT token to decode
+            options: Options for jose.jwt.decode
+
+        Returns:
+            A dictionary filled with decoded token content
+        """
+        if not self.realm_public_key:
+            realm_public_key = self._keycloak.public_key()
+            self.realm_public_key = "-----BEGIN PUBLIC KEY-----\n"
+            self.realm_public_key += realm_public_key
+            self.realm_public_key += "\n-----END PUBLIC KEY-----"
+
+        return self._keycloak.decode_token(
+            token, key=self.realm_public_key, options=options
+        )
+
+    def logout(self, refresh_token: str) -> None:
+        """
+        Logout a user by closing its authenticated session.
+
+        Args:
+            refresh_token: A refresh token provided by Keycloak
+        """
+        self._keycloak.logout(refresh_token)
+
+    def userinfo(self, access_token: str) -> Dict[str, Any]:
+        """
+        Return user information from its access token.
+
+        Args:
+            access_token: An access token provided by Keycloak
+
+        Returns:
+            A dictionary fillled with user information
+        """
+        return self._keycloak.userinfo(access_token)
-- 
GitLab