Skip to content

Commit

Permalink
add preapproved users
Browse files Browse the repository at this point in the history
  • Loading branch information
daisieh committed Jan 18, 2025
1 parent 39a0289 commit 8d40c66
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions src/authx/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,14 @@ def add_program_to_opa(program_auth):
response2, status_code = set_service_store_secret("opa", key="programs", value=json.dumps(response2))
return response, status_code

# add the users to the preapproved user list
for user_id in program_auth["team_members"]:
# if the user isn't already approved, make sure they will be:
response, status_code = authx.auth.add_preapproved_user_in_opa(user_id)
for user_id in program_auth["program_curators"]:
# if the user isn't already approved, make sure they will be:
response, status_code = authx.auth.add_preapproved_user_in_opa(user_id)

return {"message": f"{program_id} not added"}, status_code


Expand Down Expand Up @@ -690,6 +698,10 @@ def set_role_type_in_opa(role_type, members):
result, status_code = get_service_store_secret("opa", key=f"site_roles")
if status_code == 200:
if role_type in result['site_roles']:
for user_id in members:
# if the user isn't already approved, make sure they will be:
response, status_code = authx.auth.add_preapproved_user_in_opa(user_id)

result['site_roles'][role_type] = members
result, status_code = set_service_store_secret("opa", key=f"site_roles", value=json.dumps(result))
if status_code == 200:
Expand Down Expand Up @@ -726,6 +738,10 @@ def get_self_in_opa(token):
def remove_user_from_opa(user_name):
safe_name = urllib.parse.quote_plus(user_name)
response, status_code = delete_service_store_secret("opa", key=f"users/{safe_name}")

# if the user was preapproved, take them out of that list
remove_preapproved_user_in_opa(safe_name)

return response, status_code


Expand Down Expand Up @@ -758,6 +774,12 @@ def add_pending_user_to_opa(user_token):
response, status_code = set_service_store_secret("opa", key=f"pending_users", value=json.dumps(response))
if status_code == 200:
return response, 201 # 201 created, to indicate that we added the user

preapproved_users, status_code = list_preapproved_users_in_opa()
if status_code == 200:
if user_name in preapproved_users:
return approve_pending_user_in_opa(user_name)

return response, status_code


Expand Down Expand Up @@ -815,6 +837,61 @@ def clear_pending_users_in_opa():
return response, status_code


#####
# Preapproved user authorizations
#####

def list_preapproved_users_in_opa():
response, status_code = get_service_store_secret("opa", key=f"preapproved_users")
if status_code == 200:
response = response["preapproved_users"]
return response, status_code


def clear_preapproved_users_in_opa():
response, status_code = set_service_store_secret("opa", key="preapproved_users", value=json.dumps({"preapproved_users": {}}))
return response, status_code


def get_preapproved_user(user_name):
response, status_code = get_service_store_secret("opa", key=f"preapproved_users")
if status_code == 200:
response = user_name in response["preapproved_users"]
else:
response = False
return response, status_code


def add_preapproved_user_in_opa(user_name):
response, status_code = get_service_store_secret("opa", key=f"preapproved_users")

if user_name in response["preapproved_users"]:
# return 200 to indicate OK but nothing was added
return {"message": f"User {user_name} already pending"}, 200

response["preapproved_users"].append(user_name)

response, status_code = set_service_store_secret("opa", key=f"preapproved_users", value=json.dumps(response))
if status_code == 200:
return response, 201 # 201 created, to indicate that we added the user
return response, status_code


def remove_preapproved_user_in_opa(user_name):
response, status_code = get_service_store_secret("opa", key=f"preapproved_users")
if status_code != 200:
return response, status_code
preapproved_users = response["preapproved_users"]

if user_name in preapproved_users:
preapproved_users.pop(user_name)
response, status_code = set_service_store_secret("opa", key=f"preapproved_users", value=json.dumps({"preapproved_users": preapproved_users}))

else:
return {"error": f"no preapproved user with ID {user_name}"}, 404
return response, status_code


######
# Vault service stores
######
Expand Down

0 comments on commit 8d40c66

Please sign in to comment.