43 lines
1.2 KiB
Python
43 lines
1.2 KiB
Python
import os
|
|
from typing import List, Tuple
|
|
|
|
from kubernetes import client, config
|
|
|
|
OIDC_USERS_NAMESPACE = os.environ["OIDC_USERS_NAMESPACE"]
|
|
|
|
|
|
def users_with_group(group: str) -> List[str]:
|
|
config.load_incluster_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
|
|
users = List[str]
|
|
|
|
ret = api_instance.list_namespaced_custom_object(
|
|
"codemowers.cloud", "v1beta1", OIDC_USERS_NAMESPACE, "oidcusers"
|
|
)
|
|
for item in ret["items"]:
|
|
if group not in item.get("status", {}).get("groups", []):
|
|
continue
|
|
|
|
users.append(item["metadata"]["name"])
|
|
|
|
print(f"INFO: {len(users)} users in group {group}")
|
|
return users
|
|
|
|
|
|
# -> (groups[], username)
|
|
def by_slackid(slack_id: str) -> Tuple[List[str], str]:
|
|
config.load_incluster_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
|
|
ret = api_instance.list_namespaced_custom_object(
|
|
"codemowers.cloud", "v1beta1", OIDC_USERS_NAMESPACE, "oidcusers"
|
|
)
|
|
for item in ret["items"]:
|
|
if slack_id == item.get("status", {}).get("slackId", None):
|
|
return item.get("status", {}).get("groups", []), item.get(
|
|
"metadata", {}
|
|
).get("name", "")
|
|
|
|
return [], ""
|