Skip to content

Commit

Permalink
move authz methods from ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
daisieh committed Jan 11, 2025
1 parent 10255cf commit b7833f5
Showing 1 changed file with 164 additions and 0 deletions.
164 changes: 164 additions & 0 deletions src/authx/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ def get_s3_url(s3_endpoint=None, bucket=None, object_id=None, access_key=None, s
client = response["client"]
result = client.stat_object(bucket_name=response["bucket"], object_name=object_id)
url = client.presigned_get_object(bucket_name=response["bucket"], object_name=object_id)
logger.debug(f"FOO3 {url}")
except Exception as e:
return {"error": str(e)}, 500
return {"metadata": result, "url": url}, 200
Expand Down Expand Up @@ -659,6 +660,169 @@ def remove_program_from_opa(program_id):
return {"message": f"{program_id} not removed"}, status_code


#####
# Site roles
#####

def get_role_type_in_opa(role_type):
result, status_code = get_service_store_secret("opa", key=f"site_roles")
if status_code == 200:
if role_type in result['site_roles']:
return {role_type: result['site_roles'][role_type]}, 200
return {"error": f"role type {role_type} does not exist"}, 404
return result, status_code


def set_role_type_in_opa(role_type, members):
result, status_code = get_service_store_secret("opa", key=f"site_roles")
if status_code == 200:
if role_type in result['site_roles']:
result['site_roles'][role_type] = members
result, status_code = set_service_store_secret("opa", key=f"site_roles", value=json.dumps(result))
if status_code == 200:
return result['site_roles'][role_type], status_code
return {"error": f"role type {role_type} does not exist"}, 404
return result, status_code


#####
# DAC authorization for users
#####

def write_user_in_opa(user_dict):
safe_name = urllib.parse.quote_plus(user_dict['user']['user_name'])
response, status_code = set_service_store_secret("opa", key=f"users/{safe_name}", value=json.dumps(user_dict))
return response, status_code


def get_user_in_opa(user_name):
safe_name = urllib.parse.quote_plus(user_name)
response, status_code = get_service_store_secret("opa", key=f"users/rejected_users")
if status_code == 200:
if safe_name in response["rejected_users"]:
return {"error": f"User {safe_name} has been rejected from CanDIG"}, 403

response, status_code = get_service_store_secret("opa", key=f"users/{safe_name}")
# return 404 if the user is not found
if status_code == 404:
response = {"error": f"User {safe_name} is not an authorized CanDIG user"}
return response, status_code


def get_self_in_opa(token):
safe_name = urllib.parse.quote_plus(get_user_name(token))
response, status_code = get_service_store_secret("opa", key=f"users/{safe_name}")
return response, status_code


def remove_user_from_opa(user_name):
safe_name = urllib.parse.quote_plus(user_name)
response, status_code = delete_service_store_secret("opa", key=f"users/{safe_name}")
return response, status_code


#####
# Pending user authorizations
#####

def add_pending_user_to_opa(user_token):
# check to see if this user has already been rejected:
response, status_code = get_service_store_secret("opa", key=f"rejected_users")
if status_code != 200:
return response, status_code
if user_name in response["rejected_users"]:
return {"error": "This user has already been rejected by CanDIG"}, 403

# NB: any user that has been authenticated by the IDP should be able to add themselves to the pending user list
response, status_code = get_service_store_secret("opa", key=f"pending_users")
if status_code != 200:
return response, status_code

user_name = get_user_name(user_token)
if user_name is None:
return {"error": "Could not verify jwt or obtain user ID"}, 403

user_dict = {
"user": {
"user_name": user_name,
"sample_jwt": user_token
},
"programs": {}
}

response["pending_users"][user_name] = user_dict

response, status_code = set_service_store_secret("opa", key=f"pending_users", value=json.dumps(response))
return response, status_code


def list_pending_users_in_opa(token):
response, status_code = get_service_store_secret("opa", key=f"pending_users")
if status_code == 200:
response = list(response["pending_users"].keys())
return response, status_code


def is_self_pending(token):
response, status_code = get_service_store_secret("opa", key=f"pending_users")
if status_code == 200:
user_name = get_user_name(token)
response = user_name in response["pending_users"]
else:
response = False
return response, status_code


def approve_pending_user_in_opa(user_name, token):
response, status_code = get_service_store_secret("opa", key=f"pending_users")
if status_code != 200:
return response, status_code
pending_users = response["pending_users"]
if user_name in pending_users:
user_dict = pending_users[user_name]
response2, status_code = write_user_in_opa(user_dict, token)
if status_code == 200:
pending_users.pop(user_name)
response3, status_code = set_service_store_secret("opa", key=f"pending_users", value=json.dumps(response))
else:
return {"error": f"no pending user with ID {user_name}"}, 404
return response, status_code


def reject_pending_user_in_opa(user_name, token):
response, status_code = get_service_store_secret("opa", key=f"pending_users")
if status_code != 200:
return response, status_code
pending_users = response["pending_users"]

response, status_code = get_service_store_secret("opa", key=f"rejected_users")
if status_code != 200:
return response, status_code
rejected_users = response["rejected_users"]

if user_name in pending_users:
pending_users.pop(user_name)
response, status_code = set_service_store_secret("opa", key=f"pending_users", value=json.dumps({"pending_users": pending_users}))

# add the user to the rejected users, if they're not already there:
if user_name not in rejected_users:
rejected_users[user_name] = user_dict
response, status_code = set_service_store_secret("opa", key=f"pending_users", value=json.dumps({"rejected_users": rejected_users}))

else:
return {"error": f"no pending user with ID {user_name}"}, 404
return response, status_code


def clear_pending_users_in_opa(token):
response, status_code = set_service_store_secret("opa", key="pending_users", value=json.dumps({"pending_users": {}}))
return response, status_code


######
# Vault service stores
######

def get_vault_token_for_service(service=SERVICE_NAME, vault_url=VAULT_URL, approle_token=None, role_id=None, secret_id=None):
"""
Get this service's vault token. Should only be called from inside a container.
Expand Down

0 comments on commit b7833f5

Please sign in to comment.