import os from typing import List, Tuple from kubernetes import client, config OIDC_USERS_NAMESPACE = os.environ["OIDC_USERS_NAMESPACE"] def users_with_group(requiredGroup: str) -> List[str]: config.load_incluster_config() api_instance = client.CustomObjectsApi() users: List[str] = [] ret = api_instance.list_namespaced_custom_object( "codemowers.cloud", "v1beta1", OIDC_USERS_NAMESPACE, "oidcusers" ) for item in ret["items"]: for group in item.get("status", {}).get("groups", []): groupName = group.get("prefix", "") + ":" + group.get("name", "") if groupName == requiredGroup: users.append(item["metadata"]["name"]) continue print(f"INFO: {len(users)} users in group {requiredGroup}") return users # -> (groups[], username) def by_slackid(slack_id: str) -> Tuple[List[str], str]: config.load_incluster_config() api_instance = client.CustomObjectsApi() ret = api_instance.list_namespaced_custom_object( "codemowers.cloud", "v1beta1", OIDC_USERS_NAMESPACE, "oidcusers" ) for item in ret["items"]: if slack_id == item.get("status", {}).get("slackId", None): return item.get("status", {}).get("groups", []), item.get( "metadata", {} ).get("name", "") return [], ""