|
| 1 | +import json |
| 2 | +import os |
| 3 | +from typing import Any, Iterable, Optional |
| 4 | + |
| 5 | + |
| 6 | +def _get_env_bool(name: str, default: bool) -> bool: |
| 7 | + """ helper to convert the environment variable to a bool""" |
| 8 | + val = os.environ.get(name) |
| 9 | + if val is None: |
| 10 | + return default |
| 11 | + return val.strip().lower() in {"1", "true", "t", "yes", "y"} |
| 12 | + |
| 13 | + |
| 14 | +def _get_env_int(name: str, default: int) -> int: |
| 15 | + """ helper to convert the env var to an int or if we could not to the default value given """ |
| 16 | + val = os.environ.get(name) |
| 17 | + if val is None: |
| 18 | + return default |
| 19 | + try: |
| 20 | + return int(val) |
| 21 | + except Exception: |
| 22 | + return default |
| 23 | + |
| 24 | + |
| 25 | +def _get_env_float(name: str, default: float) -> float: |
| 26 | + """ helper to convert the env var to a float or if we could not to the default value given """ |
| 27 | + val = os.environ.get(name) |
| 28 | + if val is None: |
| 29 | + return default |
| 30 | + try: |
| 31 | + return float(val) |
| 32 | + except Exception: |
| 33 | + return default |
| 34 | + |
| 35 | + |
| 36 | +def _get_env_csv(name: str, default_csv: str) -> list[str]: |
| 37 | + """ helper to convert the env var to a list or if we could not to the default value given """ |
| 38 | + val = os.environ.get(name, default_csv) |
| 39 | + return [s.strip().upper() for s in val.split(",") if s.strip()] |
| 40 | + |
| 41 | + |
| 42 | +def get_grpc_keepalive_options() -> list[tuple[str, Any]]: |
| 43 | + """Build gRPC keepalive channel options from environment variables. |
| 44 | +
|
| 45 | + Environment variables (defaults in parentheses): |
| 46 | + - DAPR_GRPC_KEEPALIVE_ENABLED (false) |
| 47 | + - DAPR_GRPC_KEEPALIVE_TIME_MS (120000) |
| 48 | + - DAPR_GRPC_KEEPALIVE_TIMEOUT_MS (20000) |
| 49 | + - DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS (false) |
| 50 | + """ |
| 51 | + enabled = _get_env_bool("DAPR_GRPC_KEEPALIVE_ENABLED", False) |
| 52 | + if not enabled: |
| 53 | + return [] |
| 54 | + time_ms = _get_env_int("DAPR_GRPC_KEEPALIVE_TIME_MS", 120000) |
| 55 | + timeout_ms = _get_env_int("DAPR_GRPC_KEEPALIVE_TIMEOUT_MS", 20000) |
| 56 | + permit_without_calls = ( |
| 57 | + 1 if _get_env_bool("DAPR_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS", False) else 0 |
| 58 | + ) |
| 59 | + return [ |
| 60 | + ("grpc.keepalive_time_ms", time_ms), |
| 61 | + ("grpc.keepalive_timeout_ms", timeout_ms), |
| 62 | + ("grpc.keepalive_permit_without_calls", permit_without_calls), |
| 63 | + ] |
| 64 | + |
| 65 | + |
| 66 | +def get_grpc_retry_service_config_option() -> Optional[tuple[str, str]]: |
| 67 | + """Return ("grpc.service_config", json_str) if retry is enabled via env; else None. |
| 68 | +
|
| 69 | + Environment variables (defaults in parentheses): |
| 70 | + - DAPR_GRPC_RETRY_ENABLED (false) |
| 71 | + - DAPR_GRPC_RETRY_MAX_ATTEMPTS (4) |
| 72 | + - DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS (100) |
| 73 | + - DAPR_GRPC_RETRY_MAX_BACKOFF_MS (1000) |
| 74 | + - DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER (2.0) |
| 75 | + - DAPR_GRPC_RETRY_CODES (UNAVAILABLE,DEADLINE_EXCEEDED) |
| 76 | + """ |
| 77 | + enabled = _get_env_bool("DAPR_GRPC_RETRY_ENABLED", False) |
| 78 | + if not enabled: |
| 79 | + return None |
| 80 | + |
| 81 | + max_attempts = _get_env_int("DAPR_GRPC_RETRY_MAX_ATTEMPTS", 4) |
| 82 | + initial_backoff_ms = _get_env_int("DAPR_GRPC_RETRY_INITIAL_BACKOFF_MS", 100) |
| 83 | + max_backoff_ms = _get_env_int("DAPR_GRPC_RETRY_MAX_BACKOFF_MS", 1000) |
| 84 | + backoff_multiplier = _get_env_float("DAPR_GRPC_RETRY_BACKOFF_MULTIPLIER", 2.0) |
| 85 | + codes = _get_env_csv("DAPR_GRPC_RETRY_CODES", "UNAVAILABLE,DEADLINE_EXCEEDED") |
| 86 | + |
| 87 | + # service_config ref => https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto#L44 |
| 88 | + service_config = { |
| 89 | + "methodConfig": [ |
| 90 | + { |
| 91 | + "name": [{"service": ""}], # match all services/methods |
| 92 | + "retryPolicy": { |
| 93 | + "maxAttempts": max_attempts, |
| 94 | + "initialBackoff": f"{initial_backoff_ms / 1000.0}s", |
| 95 | + "maxBackoff": f"{max_backoff_ms / 1000.0}s", |
| 96 | + "backoffMultiplier": backoff_multiplier, |
| 97 | + "retryableStatusCodes": codes, |
| 98 | + }, |
| 99 | + } |
| 100 | + ] |
| 101 | + } |
| 102 | + # we are not applying retry throttling policy (but a user can pass the whole option string via options) |
| 103 | + return "grpc.service_config", json.dumps(service_config) |
| 104 | + |
| 105 | + |
| 106 | +def build_grpc_channel_options( |
| 107 | + base_options: Optional[Iterable[tuple[str, Any]]] = None, |
| 108 | +) -> Optional[list[tuple[str, Any]]]: |
| 109 | + """Combine base options + env-driven keepalive and retry service config. |
| 110 | +
|
| 111 | + The returned list is safe to pass as the `options` argument to grpc.secure_channel/insecure_channel. |
| 112 | + """ |
| 113 | + combined: list[tuple[str, Any]] = [] |
| 114 | + if base_options: |
| 115 | + combined.extend(list(base_options)) |
| 116 | + |
| 117 | + keepalive = get_grpc_keepalive_options() |
| 118 | + if keepalive: |
| 119 | + combined.extend(keepalive) |
| 120 | + retry_opt = get_grpc_retry_service_config_option() |
| 121 | + if retry_opt is not None: |
| 122 | + combined.append(retry_opt) |
| 123 | + return combined if combined else None |
0 commit comments