Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k8s] Add retries for pod and node fetching to handle transient errors #4543

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 76 additions & 16 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re
import shutil
import subprocess
import time
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse
Expand Down Expand Up @@ -105,6 +106,75 @@

logger = sky_logging.init_logger(__name__)

# Default retry settings for Kubernetes API calls
DEFAULT_MAX_RETRIES = 3
DEFAULT_RETRY_INTERVAL_SECONDS = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reduce the interval if need to be less than 1 second : )



def _retry_on_error(max_retries=DEFAULT_MAX_RETRIES,
retry_interval=DEFAULT_RETRY_INTERVAL_SECONDS,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
retry_interval=DEFAULT_RETRY_INTERVAL_SECONDS,
retry_interval_seconds=DEFAULT_RETRY_INTERVAL_SECONDS,

resource_type: Optional[str] = None):
"""Decorator to retry Kubernetes API calls on transient failures.

Args:
max_retries: Maximum number of retry attempts
retry_interval: Initial seconds to wait between retries
resource_type: Type of resource being accessed (e.g. 'node', 'pod').
Used to provide more specific error messages.
"""

def decorator(func):

@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
backoff = common_utils.Backoff(initial_backoff=retry_interval,
max_backoff_factor=3)

for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (kubernetes.max_retry_error(),
kubernetes.api_exception(),
kubernetes.config_exception()) as e:
last_exception = e
# Don't retry on permanent errors like 401 (Unauthorized)
# or 403 (Forbidden)
if (isinstance(e, kubernetes.api_exception()) and
e.status in (401, 403)):
raise
if attempt < max_retries - 1:
sleep_time = backoff.current_backoff()
logger.debug(f'Kubernetes API call {func.__name__} '
f'failed with {str(e)}. Retrying in '
f'{sleep_time:.1f}s...')
time.sleep(sleep_time)
continue

# Format error message based on the type of exception
resource_msg = f' when trying to get {resource_type} info' \
if resource_type else ''
debug_cmd = f' To debug, run: kubectl get {resource_type}s' \
if resource_type else ''

if isinstance(last_exception, kubernetes.max_retry_error()):
error_msg = f'Timed out{resource_msg} from Kubernetes cluster.'
elif isinstance(last_exception, kubernetes.api_exception()):
error_msg = (f'Kubernetes API error{resource_msg}: '
f'{str(last_exception)}')
else:
error_msg = (f'Kubernetes configuration error{resource_msg}: '
f'{str(last_exception)}')

raise exceptions.ResourcesUnavailableError(
f'{error_msg}'
f' Please check if the cluster is healthy and retry.'
f'{debug_cmd}') from last_exception

return wrapper

return decorator


class GPULabelFormatter:
"""Base class to define a GPU label formatter for a Kubernetes cluster
Expand Down Expand Up @@ -445,6 +515,7 @@ def detect_accelerator_resource(


@functools.lru_cache(maxsize=10)
@_retry_on_error(resource_type='node')
def get_kubernetes_nodes(context: Optional[str] = None) -> List[Any]:
"""Gets the kubernetes nodes in the context.

Expand All @@ -453,17 +524,12 @@ def get_kubernetes_nodes(context: Optional[str] = None) -> List[Any]:
if context is None:
context = get_current_kube_config_context_name()

try:
nodes = kubernetes.core_api(context).list_node(
_request_timeout=kubernetes.API_TIMEOUT).items
except kubernetes.max_retry_error():
raise exceptions.ResourcesUnavailableError(
'Timed out when trying to get node info from Kubernetes cluster. '
'Please check if the cluster is healthy and retry. To debug, run: '
'kubectl get nodes') from None
nodes = kubernetes.core_api(context).list_node(
_request_timeout=kubernetes.API_TIMEOUT).items
Comment on lines +527 to +528
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we already have the request_timeout set here. Is this not covering all the cases the retry_on_error can handle?

return nodes


@_retry_on_error(resource_type='pod')
def get_all_pods_in_kubernetes_cluster(
context: Optional[str] = None) -> List[Any]:
"""Gets pods in all namespaces in kubernetes cluster indicated by context.
Expand All @@ -473,14 +539,8 @@ def get_all_pods_in_kubernetes_cluster(
if context is None:
context = get_current_kube_config_context_name()

try:
pods = kubernetes.core_api(context).list_pod_for_all_namespaces(
_request_timeout=kubernetes.API_TIMEOUT).items
except kubernetes.max_retry_error():
raise exceptions.ResourcesUnavailableError(
'Timed out when trying to get pod info from Kubernetes cluster. '
'Please check if the cluster is healthy and retry. To debug, run: '
'kubectl get pods') from None
pods = kubernetes.core_api(context).list_pod_for_all_namespaces(
_request_timeout=kubernetes.API_TIMEOUT).items
return pods


Expand Down
Loading