Files
inventory-app/inventory-app/oidc.py
Madis Mägi a2b3d21f27
All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
Rename lambda argument
2023-08-16 03:01:04 +03:00

112 lines
3.7 KiB
Python

import os
import const
import jwt
import base64
import requests
from pymongo import MongoClient
from flask import Blueprint, abort, g, make_response, redirect, render_template, request, Flask, request, url_for, session
from common import CustomForm, build_query, flatten, format_name, spam
from functools import wraps
page_oidc = Blueprint("oidc", __name__)
db = MongoClient(const.MONGO_URI).get_default_database()
gw_uri = os.getenv("OIDC_GATEWAY_URI")
metadata = requests.get(f"{gw_uri}.well-known/openid-configuration").json()
def login_required(_f=None, *, groups=[]):
def login_required_inner(f):
@wraps(f)
def decorated_function(*args, **kwargs):
print(groups)
user = read_user()
if not user:
print("doing login redirect")
session["original_url"] = request.full_path
return do_login()
if groups and not any(group in groups for group in user["groups"]):
return "not allowed", 401
return f(*args, **kwargs)
return decorated_function
if _f is None:
return login_required_inner
else:
return login_required_inner(_f)
def do_login():
url = add_url_params(metadata["authorization_endpoint"], {
"client_id": os.getenv("OIDC_CLIENT_ID"),
"redirect_uri": url_for("oidc.login_callback", _external=True, _scheme='https'),
"response_type": "code",
"scope": "openid profile",
})
return redirect(url)
def add_url_params(url, params):
req = requests.models.PreparedRequest()
req.prepare_url(url, params)
return req.url
@page_oidc.route('/login-callback')
def login_callback():
code = request.args.get('code')
r = requests.post(metadata["token_endpoint"], {
"code": code,
"grant_type": "authorization_code",
"redirect_uri": url_for("oidc.login_callback", _external=True, _scheme='https'),
"client_id": os.getenv("OIDC_CLIENT_ID"),
"client_secret": os.getenv("OIDC_CLIENT_SECRET"),
}).json()
if "error" in r:
return "failed to fetch tokens", 500
if not validate_id_token(r["id_token"]) or not read_user(r["access_token"]):
return "tokens validation failed", 500
print("authenticated")
session["id_token"] = r["id_token"]
session["access_token"] = r["access_token"]
return redirect(session.pop("original_url", "/"))
@page_oidc.route("/logout")
def logout():
token = session.pop("access_token", "asdf")
session.clear()
s = os.getenv("OIDC_CLIENT_ID") + ":" + os.getenv("OIDC_CLIENT_SECRET")
r = requests.post(
url = metadata["revocation_endpoint"],
headers = {"Authorization": "Basic " + base64.b64encode(s.encode()).decode()},
data = {"token": token, "token_type_hint": "access_token"}
)
if r.status_code != 200:
return "oops", 500
return redirect("/")
def read_user(token=None):
token = token or session.get("access_token", False)
if not token:
return False
r = requests.get(url = metadata["userinfo_endpoint"], headers = {
"Authorization": "Bearer " + token
})
if r.status_code == 200:
return r.json()
else:
return False
def validate_id_token(token=None):
token = token or session.get("id_token", False)
if not token:
return False
jwks_client = jwt.PyJWKClient(metadata["jwks_uri"])
signing_key = jwks_client.get_signing_key_from_jwt(token)
try:
return jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=os.getenv("OIDC_CLIENT_ID"),
options={"verify_exp": True},
)
except jwt.InvalidTokenError as e:
return False