From bb90bd23c924deb9e87b354a22982f050edfd09d Mon Sep 17 00:00:00 2001
From: Antoine Lambert <antoine.lambert@inria.fr>
Date: Thu, 22 Apr 2021 18:15:29 +0200
Subject: [PATCH] django: Add keycloak realm roles in user permissions set

Keycloak also allow to define user roles at realm level to define
permissions at a global level not tight to a client.

Include these extra roles in the user permissions set from the
decoded token content.

Related to T3213
---
 swh/auth/django/utils.py               | 10 ++++++----
 swh/auth/pytest_plugin.py              | 20 +++++++++++++-------
 swh/auth/tests/django/test_backends.py | 26 ++++++++++++++++----------
 swh/auth/tests/django/test_utils.py    |  5 +++--
 4 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/swh/auth/django/utils.py b/swh/auth/django/utils.py
index 9697c12..fe47b9e 100644
--- a/swh/auth/django/utils.py
+++ b/swh/auth/django/utils.py
@@ -47,15 +47,17 @@ def oidc_user_from_decoded_token(
     if "groups" in decoded_token:
         user.is_staff = "/staff" in decoded_token["groups"]
 
+    realm_access = decoded_token.get("realm_access", {})
+    permissions = realm_access.get("roles", [])
+
     if client_id:
         # extract user permissions if any
         resource_access = decoded_token.get("resource_access", {})
         client_resource_access = resource_access.get(client_id, {})
-        permissions = client_resource_access.get("roles", [])
-    else:
-        permissions = []
+        permissions += client_resource_access.get("roles", [])
 
-    user.permissions = set(permissions)
+    # set user permissions and filter out default keycloak realm roles
+    user.permissions = set(permissions) - {"offline_access", "uma_authorization"}
 
     # add user sub to custom User proxy model
     user.sub = decoded_token["sub"]
diff --git a/swh/auth/pytest_plugin.py b/swh/auth/pytest_plugin.py
index e8581a1..330cd45 100644
--- a/swh/auth/pytest_plugin.py
+++ b/swh/auth/pytest_plugin.py
@@ -34,7 +34,8 @@ class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
         auth_success: boolean flag to simulate authentication success or failure
         exp: expiration delay
         user_groups: user groups configuration (if any)
-        user_permissions: user permissions configuration (if any)
+        realm_permissions: user permissions configuration at realm level (if any)
+        client_permissions: user permissions configuration at client level (if any)
         oidc_profile: Dict response from a call to a token authentication query (cf.
             :py:data:`swh.auth.tests.sample_data.OIDC_PROFILE`)
         user_info: Dict response from a call to userinfo query (cf.
@@ -52,7 +53,8 @@ class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
         auth_success: bool = True,
         exp: Optional[int] = None,
         user_groups: List[str] = [],
-        user_permissions: List[str] = [],
+        realm_permissions: List[str] = [],
+        client_permissions: List[str] = [],
         oidc_profile: Dict = OIDC_PROFILE,
         user_info: Dict = USER_INFO,
         raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
@@ -62,7 +64,8 @@ class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
         )
         self.exp = exp
         self.user_groups = user_groups
-        self.user_permissions = user_permissions
+        self.realm_permissions = realm_permissions
+        self.client_permissions = client_permissions
         self._keycloak.public_key = lambda: raw_realm_public_key
         self._keycloak.well_know = lambda: {
             "issuer": f"{self.server_url}realms/{self.realm_name}",
@@ -121,9 +124,10 @@ class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
         decoded["groups"] = self.user_groups
         decoded["aud"] = [self.client_id, "account"]
         decoded["azp"] = self.client_id
-        if self.user_permissions:
+        decoded["realm_access"]["roles"] += self.realm_permissions
+        if self.client_permissions:
             decoded["resource_access"][self.client_id] = {
-                "roles": self.user_permissions
+                "roles": self.client_permissions
             }
         return decoded
 
@@ -169,7 +173,8 @@ def keycloak_oidc_factory(
     auth_success: bool = True,
     exp: Optional[int] = None,
     user_groups: List[str] = [],
-    user_permissions: List[str] = [],
+    realm_permissions: List[str] = [],
+    client_permissions: List[str] = [],
     oidc_profile: Dict = OIDC_PROFILE,
     user_info: Dict = USER_INFO,
     raw_realm_public_key: str = RAW_REALM_PUBLIC_KEY,
@@ -188,7 +193,8 @@ def keycloak_oidc_factory(
             auth_success=auth_success,
             exp=exp,
             user_groups=user_groups,
-            user_permissions=user_permissions,
+            realm_permissions=realm_permissions,
+            client_permissions=client_permissions,
             oidc_profile=oidc_profile,
             user_info=user_info,
             raw_realm_public_key=raw_realm_public_key,
diff --git a/swh/auth/tests/django/test_backends.py b/swh/auth/tests/django/test_backends.py
index 9b57813..19eabca 100644
--- a/swh/auth/tests/django/test_backends.py
+++ b/swh/auth/tests/django/test_backends.py
@@ -143,12 +143,15 @@ def test_oidc_code_pkce_auth_backend_permissions(keycloak_oidc, request_factory)
     Checks that a permission defined with OpenID Connect is correctly mapped
     to a Django one when logging from Web UI.
     """
-    permission = "webapp.some-permission"
-    keycloak_oidc.user_permissions = [permission]
+    realm_permission = "swh.some-permission"
+    client_permission = "webapp.some-permission"
+    keycloak_oidc.realm_permissions = [realm_permission]
+    keycloak_oidc.client_permissions = [client_permission]
     user = _authenticate_user(request_factory)
-    assert user.has_perm(permission)
-    assert user.get_all_permissions() == {permission}
-    assert user.get_group_permissions() == {permission}
+    assert user.has_perm(realm_permission)
+    assert user.has_perm(client_permission)
+    assert user.get_all_permissions() == {realm_permission, client_permission}
+    assert user.get_group_permissions() == {realm_permission, client_permission}
     assert user.has_module_perms("webapp")
     assert not user.has_module_perms("foo")
 
@@ -239,8 +242,10 @@ def test_drf_oidc_bearer_token_auth_backend_permissions(
     Checks that a permission defined with OpenID Connect is correctly mapped
     to a Django one when using bearer token authentication.
     """
-    permission = "webapp.some-permission"
-    keycloak_oidc.user_permissions = [permission]
+    realm_permission = "swh.some-permission"
+    client_permission = "webapp.some-permission"
+    keycloak_oidc.realm_permissions = [realm_permission]
+    keycloak_oidc.client_permissions = [client_permission]
 
     drf_auth_backend = OIDCBearerTokenAuthentication()
     oidc_profile = keycloak_oidc.login()
@@ -249,8 +254,9 @@ def test_drf_oidc_bearer_token_auth_backend_permissions(
     request = api_request_factory.get(url, HTTP_AUTHORIZATION=f"Bearer {refresh_token}")
     user, _ = drf_auth_backend.authenticate(request)
 
-    assert user.has_perm(permission)
-    assert user.get_all_permissions() == {permission}
-    assert user.get_group_permissions() == {permission}
+    assert user.has_perm(realm_permission)
+    assert user.has_perm(client_permission)
+    assert user.get_all_permissions() == {realm_permission, client_permission}
+    assert user.get_group_permissions() == {realm_permission, client_permission}
     assert user.has_module_perms("webapp")
     assert not user.has_module_perms("foo")
diff --git a/swh/auth/tests/django/test_utils.py b/swh/auth/tests/django/test_utils.py
index a7878d4..c2be128 100644
--- a/swh/auth/tests/django/test_utils.py
+++ b/swh/auth/tests/django/test_utils.py
@@ -63,14 +63,15 @@ def test_oidc_user_from_decoded_token():
     _check_user(user)
 
 
-def test_oidc_user_from_decoded_token2():
+def test_oidc_user_with_permissions_from_decoded_token():
     decoded_token = copy(DECODED_TOKEN)
     decoded_token["groups"] = ["/staff", "api"]
+    decoded_token["realm_access"] = {"roles": ["swh.ambassador"]}
     decoded_token["resource_access"] = {CLIENT_ID: {"roles": ["read-api"]}}
 
     user = oidc_user_from_decoded_token(decoded_token, client_id=CLIENT_ID)
 
-    _check_user(user, is_staff=True, permissions={"read-api"})
+    _check_user(user, is_staff=True, permissions={"swh.ambassador", "read-api"})
 
 
 @pytest.mark.parametrize(
-- 
GitLab