Skip to content

Commit

Permalink
LoginFlowManager and CommandLineLoginFlowManager (#972)
Browse files Browse the repository at this point in the history
* first pass at LoginFlowManager and CommandLineLoginFlowManager

* Reword param text

Co-authored-by: Kurt McKee <[email protected]>

* improve output and helptext

* CommandLineLoginFlowManager tests

* add example usage

* move refresh_tokens to a class init param

* revert changes to prompt in GlobusAuthorizationParameters

---------

Co-authored-by: Kurt McKee <[email protected]>
  • Loading branch information
aaschaer and kurtmckee authored Apr 15, 2024
1 parent 3ae9104 commit c97221f
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 5 deletions.
4 changes: 4 additions & 0 deletions changelog.d/20240405_142203_aaschaer_login_flow_manager.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added
~~~~~

- Added ``LoginFlowManager`` and ``CommandLineLoginFLowManager`` to experimental (:pr:`NUMBER`)
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
from globus_sdk._testing.models import RegisteredResponse, ResponseSet

RESPONSES = ResponseSet(
default=RegisteredResponse(
service="auth",
path="/v2/oauth2/token",
method="POST",
status=200,
json={
"access_token": "transfer_access_token",
"scope": "urn:globus:auth:scope:transfer.api.globus.org:all",
"expires_in": 172800,
"token_type": "Bearer",
"resource_server": "transfer.api.globus.org",
"state": "_default",
"other_tokens": [],
},
),
invalid_grant=RegisteredResponse(
service="auth",
path="/v2/oauth2/token",
method="POST",
status=401,
json={"error": "invalid_grant"},
)
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@

class GlobusAuthorizationParameters(_serializable.Serializable):
"""
Represents authorization parameters that can be used to instruct a client
which additional authorizations are needed in order to complete a request.
Data class containing authorization parameters that can be passed during
an authentication flow to control how the user will authenticate.
When used with a GlobusAuthRequirementsError this represents the additional
authorization parameters needed in order to complete a request that had
insufficient authorization state.
:ivar session_message: A message to be displayed to the user.
:vartype session_message: str, optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __init__(
session_required_policies: str | list[str] | None = None,
session_required_single_domain: str | list[str] | None = None,
session_required_mfa: bool | None = None,
prompt: str | None = None,
prompt: Literal["login"] | None = None,
extra: dict[str, t.Any] | None = None,
):
self.session_message = _validators.opt_str("session_message", session_message)
Expand All @@ -126,7 +126,10 @@ def __init__(
self.session_required_mfa = _validators.opt_bool(
"session_required_mfa", session_required_mfa
)
self.prompt = _validators.opt_str("prompt", prompt)
if prompt in [None, "login"]:
self.prompt = prompt
else:
raise _validators.ValidationError("'prompt' must be 'login' or null")
self.extra = extra or {}

# Enforce that the error contains at least one of the fields we expect
Expand Down
7 changes: 7 additions & 0 deletions src/globus_sdk/experimental/login_flow_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .command_line_login_flow_manager import CommandLineLoginFlowManager
from .login_flow_manager import LoginFlowManager

__all__ = [
"LoginFlowManager",
"CommandLineLoginFlowManager",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from globus_sdk import AuthLoginClient, OAuthTokenResponse
from globus_sdk.experimental.auth_requirements_error import (
GlobusAuthorizationParameters,
)

from .login_flow_manager import LoginFlowManager


class CommandLineLoginFlowManager(LoginFlowManager):
"""
A ``CommandLineLoginFlowManager`` is a ``LoginFlowManager`` that uses the command
line for interacting with the user during its interactive login flows.
Example usage:
>>> login_client = globus_sdk.NativeAppAuthClient(...)
>>> login_flow_manager = CommandLineLoginFlowManager(login_client)
>>> scopes = [globus_sdk.scopes.TransferScopes.all]
>>> auth_params = GlobusAuthorizationParameters(required_scopes=scopes)
>>> tokens = login_flow_manager.run_login_flow(auth_params)
"""

def __init__(
self,
login_client: AuthLoginClient,
*,
refresh_tokens: bool = False,
login_prompt: str = "Please authenticate with Globus here:",
code_prompt: str = "Enter the resulting Authorization Code here:",
):
"""
:param login_client: The ``AuthLoginClient`` that will be making the Globus
Auth API calls needed for the authentication flow. Note that this
must either be a NativeAppAuthClient or a templated
ConfidentialAppAuthClient, standard ConfidentialAppAuthClients cannot
use the web auth-code flow.
:param refresh_tokens: Control whether refresh tokens will be requested.
:param login_prompt: The string that will be output to the command line
prompting the user to authenticate.
:param code_prompt: The string that will be output to the command line
prompting the user to enter their authorization code.
"""
self.login_prompt = login_prompt
self.code_prompt = code_prompt
super().__init__(login_client, refresh_tokens=refresh_tokens)

def run_login_flow(
self,
auth_parameters: GlobusAuthorizationParameters,
) -> OAuthTokenResponse:
"""
Run an interactive login flow on the command line to get tokens for the user.
:param auth_parameters: ``GlobusAuthorizationParameters`` passed through
to the authentication flow to control how the user will authenticate.
"""

# type is ignored here as AuthLoginClient does not provide a signature for
# oauth2_start_flow since it has different positional arguments between
# NativeAppAuthClient and ConfidentialAppAuthClient
self.login_client.oauth2_start_flow( # type: ignore
redirect_uri=self.login_client.base_url + "v2/web/auth-code",
refresh_tokens=self.refresh_tokens,
requested_scopes=auth_parameters.required_scopes,
)

# create authorization url and prompt user to follow it to login
print(
"{0}\n{1}\n{2}\n{1}\n".format(
self.login_prompt,
"-" * len(self.login_prompt),
self.login_client.oauth2_get_authorize_url(
session_required_identities=(
auth_parameters.session_required_identities
),
session_required_single_domain=(
auth_parameters.session_required_single_domain
),
session_required_policies=auth_parameters.session_required_policies,
session_required_mfa=auth_parameters.session_required_mfa,
prompt=auth_parameters.prompt, # type: ignore
),
)
)

# ask user to copy and paste auth code
auth_code = input(f"{self.code_prompt}\n").strip()

# get and return tokens
return self.login_client.oauth2_exchange_code_for_tokens(auth_code)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import abc

from globus_sdk import AuthLoginClient, OAuthTokenResponse
from globus_sdk.experimental.auth_requirements_error import (
GlobusAuthorizationParameters,
)


class LoginFlowManager(metaclass=abc.ABCMeta):
"""
A ``LoginFlowManager`` is an abstract superclass for subclasses that manage
interactive login flows with a user in order to authenticate with Globus Auth
and obtain tokens.
"""

def __init__(
self,
login_client: AuthLoginClient,
*,
refresh_tokens: bool = False,
):
self.login_client = login_client
self.refresh_tokens = refresh_tokens
"""
:param refresh_tokens: Control whether refresh tokens will be requested.
"""

@abc.abstractmethod
def run_login_flow(
self,
auth_parameters: GlobusAuthorizationParameters,
) -> OAuthTokenResponse:
"""
Run an interactive login flow to get tokens for the user.
:param auth_parameters: ``GlobusAuthorizationParameters`` passed through
to the authentication flow to control how the user will authenticate.
"""
73 changes: 73 additions & 0 deletions tests/functional/test_login_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from globus_sdk import ConfidentialAppAuthClient, NativeAppAuthClient
from globus_sdk._testing import load_response
from globus_sdk.experimental.auth_requirements_error import (
GlobusAuthorizationParameters,
)
from globus_sdk.experimental.login_flow_manager import CommandLineLoginFlowManager


def _mock_input(s):
print(s)
return "mock_input"


def test_command_line_login_flower_manager_native(monkeypatch, capsys):
"""
test CommandLineLoginFlowManager with a NativeAppAuthClient
"""
login_client = NativeAppAuthClient("mock_client_id")
load_response(login_client.oauth2_exchange_code_for_tokens)
monkeypatch.setattr("builtins.input", _mock_input)

custom_login_prompt = "Login:"
custom_code_prompt = "Code:"
login_flow_manager = CommandLineLoginFlowManager(
login_client,
login_prompt=custom_login_prompt,
code_prompt=custom_code_prompt,
)
auth_params = GlobusAuthorizationParameters(
required_scopes=["urn:globus:auth:scope:transfer.api.globus.org:all"],
session_required_identities=["[email protected]"],
)
token_res = login_flow_manager.run_login_flow(auth_params)
assert (
token_res.by_resource_server["transfer.api.globus.org"]["access_token"]
== "transfer_access_token"
)

captured_output = capsys.readouterr().out
assert custom_login_prompt in captured_output
assert custom_code_prompt in captured_output
assert "https://auth.globus.org/v2/oauth2/authorize" in captured_output
assert "client_id=mock_client_id" in captured_output
assert "&session_required_identities=user%40org.edu" in captured_output


def test_command_line_login_flower_manager_confidential(monkeypatch, capsys):
"""
test CommandLineLoginFlowManager with a ConfidentialAppAuthClient
"""
login_client = ConfidentialAppAuthClient(
client_id="mock_client_id", client_secret="mock_client_secret"
)
load_response(login_client.oauth2_exchange_code_for_tokens)
monkeypatch.setattr("builtins.input", _mock_input)

login_flow_manager = CommandLineLoginFlowManager(login_client)
auth_params = GlobusAuthorizationParameters(
required_scopes=["urn:globus:auth:scope:transfer.api.globus.org:all"],
session_required_single_domain=["org.edu"],
)
token_res = login_flow_manager.run_login_flow(auth_params)
assert (
token_res.by_resource_server["transfer.api.globus.org"]["access_token"]
== "transfer_access_token"
)

captured_output = capsys.readouterr().out
assert "Please authenticate with Globus here:" in captured_output
assert "Enter the resulting Authorization Code here:" in captured_output
assert "https://auth.globus.org/v2/oauth2/authorize" in captured_output
assert "client_id=mock_client_id" in captured_output
assert "&session_required_single_domain=org.edu" in captured_output

0 comments on commit c97221f

Please sign in to comment.