Skip to content
Snippets Groups Projects
Commit 719bcd7a authored by Jayesh's avatar Jayesh :cat2:
Browse files

Add a starlette auth backend for bearer tokens

accepts Keycloak server details and an aiocache cache instance
returns a starlette SimpleUser along with permissions
parent f5227f49
No related branches found
No related tags found
No related merge requests found
......@@ -21,3 +21,9 @@ ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-starlette.*]
ignore_missing_imports = True
[mypy-aiocache.*]
ignore_missing_imports = True
starlette
httpx
aiocache
......@@ -52,6 +52,7 @@ setup(
use_scm_version=True,
extras_require={
"django": parse_requirements("django"),
"starlette": parse_requirements("starlette"),
"testing": parse_requirements("test"),
},
include_package_data=True,
......
# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import hashlib
from typing import Any, Dict, Optional, Tuple
from aiocache.base import BaseCache
from starlette.authentication import (
AuthCredentials,
AuthenticationBackend,
AuthenticationError,
SimpleUser,
)
from starlette.requests import HTTPConnection
from swh.auth.keycloak import (
ExpiredSignatureError,
KeycloakError,
KeycloakOpenIDConnect,
keycloak_error_message,
)
class BearerTokenAuthBackend(AuthenticationBackend):
"""
Starlette authentication backend using Keycloak OpenID Connect authorization
An Keycloak server, realm and a cache to store access tokens must be provided
"""
def __init__(
self, server_url: str, realm_name: str, client_id: str, cache: BaseCache
):
"""
Args:
server_url: Keycloak URL
realm_name: Keycloak realm name
client_id: Keycloak client ID
cache: An aiocache cache instance
"""
self.client_id = client_id
self.oidc_client = KeycloakOpenIDConnect(
server_url=server_url,
realm_name=realm_name,
client_id=client_id,
)
self.cache = cache
def _get_token_from_header(self, auth_header: str) -> str:
try:
auth_type, bearer_token = auth_header.split(" ", 1)
except ValueError:
raise AuthenticationError("Invalid auth header")
if auth_type != "Bearer":
raise AuthenticationError("Invalid or unsupported authorization type")
return bearer_token
def _get_token_cache_key(self, refresh_token) -> str:
hasher = hashlib.sha1()
hasher.update(refresh_token.encode("ascii"))
return f"api_token_{hasher.hexdigest()}"
def _get_new_access_token(self, refresh_token: str) -> Dict[str, Any]:
try:
access_token = self.oidc_client.refresh_token(refresh_token)
except KeycloakError as e:
raise AuthenticationError(
"Invalid or expired user token", keycloak_error_message(e)
)
return access_token
def _decode_token(self, access_token: str) -> Optional[Dict[str, Any]]:
if not access_token:
return None
try:
decoded_token = self.oidc_client.decode_token(access_token)
except (KeycloakError, UnicodeEncodeError, ExpiredSignatureError):
# token is eitehr too old or an invalid one
decoded_token = None
return decoded_token
async def authenticate(
self, conn: HTTPConnection
) -> Optional[Tuple[AuthCredentials, SimpleUser]]:
auth_header = conn.headers.get("Authorization")
if auth_header is None:
# anonymous user
return None
refresh_token = self._get_token_from_header(auth_header)
# get the cache key
cache_key = self._get_token_cache_key(refresh_token)
# read access token from the cache
access_token = await self.cache.get(cache_key)
decoded_token = self._decode_token(access_token)
if not access_token or not decoded_token:
access_token = self._get_new_access_token(refresh_token)["access_token"]
decoded_token = self._decode_token(access_token)
if not decoded_token:
raise AuthenticationError("Access token failed to be decoded")
exp = datetime.fromtimestamp(decoded_token["exp"])
ttl = int(exp.timestamp() - datetime.now().timestamp())
await self.cache.set(cache_key, access_token, ttl=ttl)
# set user scopes
realm_access = decoded_token.get("realm_access", {})
user_scopes = realm_access.get("roles", [])
resource_access = decoded_token.get("resource_access", {})
client_resource_access = resource_access.get(self.client_id, {})
user_scopes += client_resource_access.get("roles", [])
return AuthCredentials(scopes=user_scopes), SimpleUser(
decoded_token["preferred_username"]
)
# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from aiocache import Cache
import pytest
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.responses import PlainTextResponse
from starlette.routing import Route
from starlette.testclient import TestClient
from swh.auth.starlette import backends
from swh.auth.tests.sample_data import USER_INFO
@pytest.fixture
def app_with_auth_backend(keycloak_oidc):
backend = backends.BearerTokenAuthBackend(
server_url="https://example.com",
realm_name="example",
client_id="example",
cache=Cache(), # Dummy cache
)
backend.oidc_client = keycloak_oidc
middleware = [
Middleware(
AuthenticationMiddleware,
backend=backend,
)
]
def homepage(request):
if request.user.is_authenticated:
return PlainTextResponse("Hello " + request.user.username)
return PlainTextResponse("Hello")
app = Starlette(routes=[Route("/", homepage)], middleware=middleware)
return app
@pytest.fixture
def client(app_with_auth_backend):
return TestClient(app_with_auth_backend)
def test_anonymous_access(client):
response = client.get("/")
assert response.status_code == 200
assert response.text == "Hello"
def test_invalid_auth_header(client):
client.headers = {"Authorization": "invalid"}
response = client.get("/")
assert response.status_code == 400
assert response.text == "Invalid auth header"
def test_invalid_auth_type(client):
client.headers = {"Authorization": "Basic invalid"}
response = client.get("/")
assert response.status_code == 400
assert response.text == "Invalid or unsupported authorization type"
def test_invalid_refresh_token(client, keycloak_oidc):
keycloak_oidc.set_auth_success(False)
client.headers = {"Authorization": "Bearer invalid-valid-token"}
response = client.get("/")
assert response.status_code == 400
assert "Invalid or expired user token" in response.text
def test_success_token(client, keycloak_oidc):
client.headers = {"Authorization": "Bearer valid-token"}
response = client.get("/")
assert response.status_code == 200
assert response.text == f'Hello {USER_INFO["preferred_username"]}'
......@@ -5,6 +5,7 @@ envlist=black,flake8,mypy,py3
extras =
testing
django
starlette
deps =
pytest-cov
dev: pdbpp
......@@ -47,6 +48,7 @@ usedevelop = true
extras =
django
testing
starlette
deps =
# fetch and install swh-docs in develop mode
-e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs
......@@ -67,6 +69,7 @@ usedevelop = true
extras =
django
testing
starlette
deps =
# install swh-docs in develop mode
-e ../swh-docs
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment