Skip to content

Commit

Permalink
proper formatting
Browse files Browse the repository at this point in the history
Signed-off-by: ahmedsobeh <[email protected]>
  • Loading branch information
ahmedsobeh committed Jun 28, 2024
1 parent cf178c5 commit 5702836
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 95 deletions.
6 changes: 1 addition & 5 deletions valkey/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,6 @@ def _error_message(self, exception: BaseException) -> str:
)





class ConnectKwargs(TypedDict, total=False):
username: str
password: str
Expand All @@ -996,8 +993,6 @@ class ConnectKwargs(TypedDict, total=False):
path: str




_CP = TypeVar("_CP", bound="ConnectionPool")


Expand Down Expand Up @@ -1057,6 +1052,7 @@ class initializer. In the case of conflicting arguments, querystring
arguments always win.
"""
from .._parsers.url_parser import parse_url

url_options = parse_url(url)
kwargs.update(url_options)
return cls(**kwargs)
Expand Down
180 changes: 90 additions & 90 deletions valkey/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def pack(self, *args):
# output list if we're sending large values or memoryviews
arg_length = len(arg)
if (
len(buff) > buffer_cutoff
or arg_length > buffer_cutoff
or isinstance(arg, memoryview)
len(buff) > buffer_cutoff
or arg_length > buffer_cutoff
or isinstance(arg, memoryview)
):
buff = SYM_EMPTY.join(
(buff, SYM_DOLLAR, str(arg_length).encode(), SYM_CRLF)
Expand All @@ -134,35 +134,35 @@ class AbstractConnection:
"Manages communication to and from a Valkey server"

def __init__(
self,
db: int = 0,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
socket_connect_timeout: Optional[float] = None,
retry_on_timeout: bool = False,
retry_on_error=SENTINEL,
encoding: str = "utf-8",
encoding_errors: str = "strict",
decode_responses: bool = False,
parser_class=DefaultParser,
socket_read_size: int = 65536,
health_check_interval: int = 0,
client_name: Optional[str] = None,
lib_name: Optional[str] = "valkey-py",
lib_version: Optional[str] = get_lib_version(),
username: Optional[str] = None,
retry: Union[Any, None] = None,
valkey_connect_func: Optional[Callable[[], None]] = None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
command_packer: Optional[Callable[[], None]] = None,
cache_enabled: bool = False,
client_cache: Optional[AbstractCache] = None,
cache_max_size: int = 10000,
cache_ttl: int = 0,
cache_policy: str = DEFAULT_EVICTION_POLICY,
cache_deny_list: List[str] = DEFAULT_DENY_LIST,
cache_allow_list: List[str] = DEFAULT_ALLOW_LIST,
self,
db: int = 0,
password: Optional[str] = None,
socket_timeout: Optional[float] = None,
socket_connect_timeout: Optional[float] = None,
retry_on_timeout: bool = False,
retry_on_error=SENTINEL,
encoding: str = "utf-8",
encoding_errors: str = "strict",
decode_responses: bool = False,
parser_class=DefaultParser,
socket_read_size: int = 65536,
health_check_interval: int = 0,
client_name: Optional[str] = None,
lib_name: Optional[str] = "valkey-py",
lib_version: Optional[str] = get_lib_version(),
username: Optional[str] = None,
retry: Union[Any, None] = None,
valkey_connect_func: Optional[Callable[[], None]] = None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
command_packer: Optional[Callable[[], None]] = None,
cache_enabled: bool = False,
client_cache: Optional[AbstractCache] = None,
cache_max_size: int = 10000,
cache_ttl: int = 0,
cache_policy: str = DEFAULT_EVICTION_POLICY,
cache_deny_list: List[str] = DEFAULT_DENY_LIST,
cache_allow_list: List[str] = DEFAULT_ALLOW_LIST,
):
"""
Initialize a new Connection.
Expand Down Expand Up @@ -350,8 +350,8 @@ def on_connect(self):
# if credential provider or username and/or password are set, authenticate
if self.credential_provider or (self.username or self.password):
cred_provider = (
self.credential_provider
or UsernamePasswordCredentialProvider(self.username, self.password)
self.credential_provider
or UsernamePasswordCredentialProvider(self.username, self.password)
)
auth_args = cred_provider.get_credentials()

Expand Down Expand Up @@ -399,8 +399,8 @@ def on_connect(self):
self.send_command("HELLO", self.protocol)
response = self.read_response()
if (
response.get(b"proto") != self.protocol
and response.get("proto") != self.protocol
response.get(b"proto") != self.protocol
and response.get("proto") != self.protocol
):
raise ConnectionError("Invalid RESP version")

Expand Down Expand Up @@ -528,11 +528,11 @@ def can_read(self, timeout=0):
raise ConnectionError(f"Error while reading from {host_error}: {e.args}")

def read_response(
self,
disable_decoding=False,
*,
disconnect_on_error=True,
push_request=False,
self,
disable_decoding=False,
*,
disconnect_on_error=True,
push_request=False,
):
"""Read the response from a previously sent command"""

Expand Down Expand Up @@ -588,9 +588,9 @@ def pack_commands(self, commands):
for chunk in self._command_packer.pack(*cmd):
chunklen = len(chunk)
if (
buffer_length > buffer_cutoff
or chunklen > buffer_cutoff
or isinstance(chunk, memoryview)
buffer_length > buffer_cutoff
or chunklen > buffer_cutoff
or isinstance(chunk, memoryview)
):
if pieces:
output.append(SYM_EMPTY.join(pieces))
Expand All @@ -608,7 +608,7 @@ def pack_commands(self, commands):
return output

def _cache_invalidation_process(
self, data: List[Union[str, Optional[List[str]]]]
self, data: List[Union[str, Optional[List[str]]]]
) -> None:
"""
Invalidate (delete) all valkey commands associated with a specific key.
Expand All @@ -627,26 +627,26 @@ def _get_from_local_cache(self, command: Sequence[str]):
If the command is in the local cache, return the response
"""
if (
self.client_cache is None
or command[0] in self.cache_deny_list
or command[0] not in self.cache_allow_list
self.client_cache is None
or command[0] in self.cache_deny_list
or command[0] not in self.cache_allow_list
):
return None
while self.can_read():
self.read_response(push_request=True)
return self.client_cache.get(command)

def _add_to_local_cache(
self, command: Sequence[str], response: ResponseT, keys: List[KeysT]
self, command: Sequence[str], response: ResponseT, keys: List[KeysT]
):
"""
Add the command and response to the local cache if the command
is allowed to be cached
"""
if (
self.client_cache is not None
and (self.cache_deny_list == [] or command[0] not in self.cache_deny_list)
and (self.cache_allow_list == [] or command[0] in self.cache_allow_list)
self.client_cache is not None
and (self.cache_deny_list == [] or command[0] not in self.cache_deny_list)
and (self.cache_allow_list == [] or command[0] in self.cache_allow_list)
):
self.client_cache.set(command, response, keys)

Expand All @@ -667,13 +667,13 @@ class Connection(AbstractConnection):
"Manages TCP communication to and from a Valkey server"

def __init__(
self,
host="localhost",
port=6379,
socket_keepalive=False,
socket_keepalive_options=None,
socket_type=0,
**kwargs,
self,
host="localhost",
port=6379,
socket_keepalive=False,
socket_keepalive_options=None,
socket_type=0,
**kwargs,
):
self.host = host
self.port = int(port)
Expand All @@ -695,7 +695,7 @@ def _connect(self):
# socket.connect()
err = None
for res in socket.getaddrinfo(
self.host, self.port, self.socket_type, socket.SOCK_STREAM
self.host, self.port, self.socket_type, socket.SOCK_STREAM
):
family, socktype, proto, canonname, socket_address = res
sock = None
Expand Down Expand Up @@ -761,22 +761,22 @@ class SSLConnection(Connection):
""" # noqa

def __init__(
self,
ssl_keyfile=None,
ssl_certfile=None,
ssl_cert_reqs="required",
ssl_ca_certs=None,
ssl_ca_data=None,
ssl_check_hostname=False,
ssl_ca_path=None,
ssl_password=None,
ssl_validate_ocsp=False,
ssl_validate_ocsp_stapled=False,
ssl_ocsp_context=None,
ssl_ocsp_expected_cert=None,
ssl_min_version=None,
ssl_ciphers=None,
**kwargs,
self,
ssl_keyfile=None,
ssl_certfile=None,
ssl_cert_reqs="required",
ssl_ca_certs=None,
ssl_ca_data=None,
ssl_check_hostname=False,
ssl_ca_path=None,
ssl_password=None,
ssl_validate_ocsp=False,
ssl_validate_ocsp_stapled=False,
ssl_ocsp_context=None,
ssl_ocsp_expected_cert=None,
ssl_min_version=None,
ssl_ciphers=None,
**kwargs,
):
"""Constructor
Expand Down Expand Up @@ -845,9 +845,9 @@ def _connect(self):
password=self.certificate_password,
)
if (
self.ca_certs is not None
or self.ca_path is not None
or self.ca_data is not None
self.ca_certs is not None
or self.ca_path is not None
or self.ca_data is not None
):
context.load_verify_locations(
cafile=self.ca_certs, capath=self.ca_path, cadata=self.ca_data
Expand Down Expand Up @@ -1032,12 +1032,12 @@ class initializer. In the case of conflicting arguments, querystring
return cls(**kwargs)

def __init__(
self,
connection_class=Connection,
max_connections: Optional[int] = None,
**connection_kwargs,
self,
connection_class=Connection,
max_connections: Optional[int] = None,
**connection_kwargs,
):
max_connections = max_connections or 2 ** 31
max_connections = max_connections or 2**31
if not isinstance(max_connections, int) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')

Expand Down Expand Up @@ -1290,12 +1290,12 @@ class BlockingConnectionPool(ConnectionPool):
"""

def __init__(
self,
max_connections=50,
timeout=20,
connection_class=Connection,
queue_class=LifoQueue,
**connection_kwargs,
self,
max_connections=50,
timeout=20,
connection_class=Connection,
queue_class=LifoQueue,
**connection_kwargs,
):
self.queue_class = queue_class
self.timeout = timeout
Expand Down

0 comments on commit 5702836

Please sign in to comment.