Source code for lima_api.core

import inspect
import json
import logging
import sys
from enum import Enum
from inspect import Signature
from threading import Lock

from .constants import KwargsMode

try:
    from types import NoneType
except ImportError:  # pragma: no cover
    NoneType = type(None)
from typing import (
    Any,
    Callable,
    Optional,
    Union,
    get_args,
)

import httpx
import pydantic
from opentelemetry.instrumentation.httpx import (
    AsyncOpenTelemetryTransport,
    SyncOpenTelemetryTransport,
)

from .config import settings
from .exceptions import LimaException, ValidationError
from .utils import (
    LimaParams,
    get_body,
    get_final_url,
    get_mappings,
    get_request_params,
    parse_data,
)

DEFAULT_HTTP_RETRIES = settings.lima_default_http_retries
DEFAULT_HTTP_TIMEOUT = settings.lima_default_http_timeout
DEFAULT_RESPONSE_CODE = settings.lima_default_response_code
DEFAULT_UNDEFINED_VALUES: tuple[Any, ...] = (None, "")


class LogEvent(str, Enum):
    START_CLIENT = "start_client"
    """
    """
    BUILD_REQUEST = "build_request"
    """
    """
    SEND_REQUEST = "send_request"
    """
    """
    RECEIVED_RESPONSE = "received_response"
    """
    """
    STOP_CLIENT = "stop_client"
    """
    """
    SETUP = "setup"
    """
    """
    RETRY = "retry"
    """
    """


[docs] class LimaRetryProcessor: """ .. versionadded:: 1.4.0 Support for retry when exceptions are raised All retry processors must be instanced of `lima_api.core.LimaRetryProcessor`. That processors allow is adding automatic actions for your failed request, in order to you could fix the issue and retry. """ max_retry: int = 1 """ Max number of retries before raise the exception. """
[docs] def __init__(self): self.retry_count: int = 0
[docs] def do_retry(self, client: Union["LimaApi", "SyncLimaApi"], exception: LimaException) -> bool: """ Check before call process. Increment the `self.retry_count` counter. In case that False is returned `self.process` never will call and request will not be retried. """ self.retry_count += 1 return self.retry_count <= self.max_retry
[docs] async def process(self, client: "LimaApi", exception: LimaException) -> bool: # pragma: no cover """ Only called in async clients. In sync clients process must be made on the `self.do_retry` method. Do the process required and return `True` if you want retry. """ return True
class LimaApiBase: base_url: str """ Base url to compose the final path. Will be overridden the constructor. """ headers: dict[str, str] """ The value of headers for httpx.Client/httpx.AsyncClient build_request function for send in each request. Will be updated by the constructor and/or decorator. """ response_mapping: dict[Union[httpx.codes, int], type[LimaException]] """ Dict with response code as key and lima exception as value. Will be updated by the constructor and/or decorator. """ retry_mapping: dict[Union[httpx.codes, int, None], type[LimaRetryProcessor]] = {} """ Mapping that define the retry processor class used depending of the http status. Will be updated by the decorator. """ client_kwargs: dict """ Dict with kwargs to pass to httpx.Client/httpx.AsyncClient Will be overridden the constructor. """ retries: int = DEFAULT_HTTP_RETRIES """ The maximum number of retries when trying to establish a connection. Will be updated by the constructor and/or decorator. """ timeout: float = DEFAULT_HTTP_TIMEOUT """ httpx.Client/httpx.AsyncClient timeout value. Will be updated by the constructor and/or decorator. """ default_response_code: Union[httpx.codes, int] = DEFAULT_RESPONSE_CODE """ Expected response code. Will be updated by the constructor and/or decorator. """ undefined_values: tuple[Any, ...] = DEFAULT_UNDEFINED_VALUES """ List of values that indicate undefined behavior Will be updated by the constructor and/or decorator. """ default_exception: type[LimaException] = LimaException """ LimaException class to raise if response code is not on response_mapping Will be updated by the constructor and/or decorator. """ validation_exception: type[ValidationError] = ValidationError """ ValidationError class to raise if response don't match with the expected model. """ default_send_kwargs: dict[str, Any] = {"follow_redirects": True} """ Extra kwargs send on `self.client.send` for each request. """ def __new__(cls, *args, **kwargs): new_class = super().__new__(cls) if not hasattr(new_class, "headers"): new_class.headers = {} if not hasattr(new_class, "response_mapping"): new_class.response_mapping = {} if not hasattr(new_class, "client_kwargs"): new_class.client_kwargs = {} return new_class def __init__( self, base_url: Optional[str] = None, *, retries: Optional[int] = None, timeout: Optional[float] = None, headers: Optional[dict[str, str]] = None, default_response_code: Optional[Union[httpx.codes, int]] = None, response_mapping: Optional[dict[Union[httpx.codes, int], type[LimaException]]] = None, undefined_values: tuple[Any, ...] = None, default_exception: Optional[type[LimaException]] = None, client_kwargs: Optional[dict] = None, auto_start: bool = False, ): """ :param base_url: the base URL of the client :param retries: the maximum number of retries when trying to establish a connection. :param timeout: httpx.Client/httpx.AsyncClient timeout value :param headers: the value of headers for httpx.Client/httpx.AsyncClient build_request function :param default_response_code: expected response code :param response_mapping: dict with response code as key and lima exception as value :param undefined_values: list of values that indicate undefined behavior :param default_exception: LimaException class to raise if response code is not on response_mapping :param client_kwargs: dict with kwargs to pass to httpx.Client/httpx.AsyncClient :param auto_start: indicate that is not required to open the connection implicitly """ if base_url: self.base_url: str = base_url if not self.base_url: raise AttributeError("base_url is a required attribute") if retries is not None: self.retries = retries if timeout is not None: self.timeout = timeout if default_response_code is not None: self.default_response_code = default_response_code if default_exception is not None: self.default_exception = default_exception if undefined_values is not None: self.undefined_values = undefined_values self.response_mapping.update(response_mapping or {}) self.headers.update(headers or {}) self.transport: Optional[Union[SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport]] = None self.client: Optional[Union[httpx.Client, httpx.AsyncClient]] = None self.client_kwargs.update(client_kwargs or {}) self._auto_start: bool = auto_start @property def auto_start(self) -> bool: return self._auto_start def log(self, *, event: LogEvent, **kwargs) -> None: """ Allow to customize logs inside the Lima flow. :param event: Where is the log "printed" :param kwargs: Data to be or not to be logged """ ... def _create_request( self, *, sync: bool, method: str, path: str, path_params_mapping: list[LimaParams], kwargs: dict, body_mapping: Optional[LimaParams] = None, file_mapping: Optional[list[LimaParams]] = None, query_params_mapping: Optional[list[LimaParams]] = None, header_mapping: Optional[list[LimaParams]] = None, undefined_values: Optional[tuple[Any, ...]] = None, headers: Optional[dict[str, str]] = None, timeout: Optional[float] = None, kwargs_mode: KwargsMode = KwargsMode.IGNORE, ) -> httpx.Request: """ Create a `httpx` request to send. :param sync: if the request should be sent synchronously or asynchronously :param method: the method of the request :param path: the path of the request. It may contain path parameters. :param path_params_mapping: the mapping for the path parameters :param kwargs: the keyword arguments passed into the lima function that will be used to generate the proper request :param body_mapping: the mapping used for generate the body of the request :param file_mapping: the mapping used for generate the files of the request :param query_params_mapping: the mapping used for generate the query parameters of the request :param header_mapping: the mapping used for generate the headers of the request :param undefined_values: the values that are considered undefined :param headers: the headers of the request :param timeout: the timeout of the request :param kwargs_mode: how the kwargs that are not defined in the mappings are used to fill the request :return: an instance of `httpx.Request` """ if self.client is None: raise LimaException(detail="uninitialized client") if sync and inspect.iscoroutinefunction(self.client.send): raise LimaException(detail="sync function in async client") elif not sync and not inspect.iscoroutinefunction(self.client.send): raise LimaException(detail="async function in sync client") try: params = get_request_params( query_params_mapping, kwargs, undefined_values if undefined_values is not None else self.undefined_values, ) except TypeError as ex: validation_kwargs = {} if ex.args: validation_kwargs["detail"] = ex.args[0] raise self.validation_exception(**validation_kwargs) from ex used_params = {param["kwargs_name"] for param in path_params_mapping} used_params.update(param["kwargs_name"] for param in (query_params_mapping or [])) used_params.update(file["kwargs_name"] for file in (file_mapping or [])) body_kwargs = {k: v for k, v in kwargs.items() if k not in used_params} try: body = get_body(body_mapping=body_mapping, kwargs=body_kwargs) if kwargs_mode == KwargsMode.BODY: if body is None: body = body_kwargs elif isinstance(body, dict): body.update( {k: v for k, v in body_kwargs.items() if k not in body and k not in body_mapping["kwargs_name"]} ) elif kwargs_mode == KwargsMode.QUERY: params.update( {k: v for k, v in body_kwargs.items() if not body_mapping or k not in body_mapping["kwargs_name"]} ) except pydantic.ValidationError as ex: raise self.validation_exception("Invalid body") from ex final_url = get_final_url( url=f"{self.base_url}{path}", path_params_mapping=path_params_mapping, kwargs=kwargs, ) _headers = {} if self.headers: _headers.update(self.headers) if headers: _headers.update(headers) for header in header_mapping or []: if header["kwargs_name"] not in kwargs and "default" not in header: raise self.validation_exception(f"required argument missing <{header['kwargs_name']}>") header_value = kwargs.get(header.get("kwargs_name")) if header_value is not None: _headers[header.get("api_name")] = header_value files = None if file_mapping: files = {} for file_map in file_mapping: f = kwargs.get(file_map.get("kwargs_name")) if f is None: if NoneType not in get_args(file_map.get("wrap")): raise ValidationError(f"Required parameter '{file_map.get('kwargs_name')}'") else: files[file_map.get("api_name")] = f body_kwarg = {} if _headers.get("content-type", "application/json") == "application/json" and not file_mapping: body_kwarg["json"] = body else: body_kwarg["data"] = body timeout = timeout if timeout is not None else self.timeout self.log( event=LogEvent.BUILD_REQUEST, path=path, method=method, url=final_url, params=params, headers=_headers, timeout=timeout, **body_kwarg, ) api_request = self.client.build_request( method, final_url, files=files, params=params, headers=_headers, timeout=timeout, **body_kwarg, ) return api_request def _create_response( self, *, api_response: httpx.Response, return_class: Any, response_mapping: Optional[dict[Union[httpx.codes, int], type[LimaException]]] = None, default_response_code: Optional[Union[httpx.codes, int]] = None, default_exception: Optional[type[LimaException]] = None, ) -> Any: """ Create a response object from an httpx.Response :param api_response: httpx.Response object :param return_class: class of the response object :param response_mapping: dict with response code as key and lima exception as value :param default_response_code: expected response code :param default_exception: LimaException class to raise if response code is not on response_mapping :return: response object :raises LimaException: if response code is not on response_mapping or if the response body could not be parsed """ mapping = self.response_mapping if response_mapping: mapping = self.response_mapping.copy() mapping.update(response_mapping) exp_cls = default_exception if default_exception is not None else self.default_exception self.log( event=LogEvent.RECEIVED_RESPONSE, response=api_response, response_mapping=mapping, return_class=return_class, ) if api_response.status_code == ( default_response_code if default_response_code is not None else self.default_response_code ): try: response = parse_data(return_class, api_response.content) except (pydantic.ValidationError, json.JSONDecodeError) as ex: raise self.validation_exception( status_code=api_response.status_code, content=api_response.content, request=api_response.request, response=api_response, ) from ex elif api_response.status_code in mapping: ex_cls: type[LimaException] = mapping[api_response.status_code] raise ex_cls( detail=ex_cls.detail or f"Http Code {api_response.status_code} in response_mapping", status_code=api_response.status_code, content=api_response.content, request=api_response.request, response=api_response, ) else: raise exp_cls( detail=exp_cls.detail or f"Http Code {api_response.status_code} not in response_mapping", status_code=api_response.status_code, content=api_response.content, request=api_response.request, response=api_response, ) return response class LimaApi(LimaApiBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) transport = httpx.AsyncHTTPTransport(retries=self.retries) self.transport: AsyncOpenTelemetryTransport = AsyncOpenTelemetryTransport(transport) self.client: Optional[httpx.AsyncClient] = None async def start_client(self) -> None: client_kwargs = self.client_kwargs.copy() client_kwargs["timeout"] = self.timeout client_kwargs["transport"] = self.transport client = httpx.AsyncClient(**client_kwargs) # noqa: S113 self.log( event=LogEvent.START_CLIENT, **client_kwargs, ) self.client = await client.__aenter__() async def stop_client(self) -> None: self.log(event=LogEvent.STOP_CLIENT) if self.client: await self.client.aclose() self.client = None async def __aenter__(self) -> "LimaApi": await self.start_client() return self async def __aexit__(self, exc_type, exc, tb) -> None: if self.client is not None: await self.client.__aexit__(exc_type, exc, tb) await self.stop_client() async def make_request( self, sync: bool, method: str, path: str, path_params_mapping: list[LimaParams], kwargs: dict, return_class: Any, body_mapping: Optional[LimaParams] = None, file_mapping: Optional[list[LimaParams]] = None, query_params_mapping: Optional[list[LimaParams]] = None, header_mapping: Optional[list[LimaParams]] = None, undefined_values: Optional[tuple[Any, ...]] = None, headers: Optional[dict[str, str]] = None, timeout: Optional[float] = None, response_mapping: Optional[dict[Union[httpx.codes, int], type[LimaException]]] = None, default_response_code: Optional[Union[httpx.codes, int]] = None, default_exception: Optional[type[LimaException]] = None, send_kwargs: Optional[dict] = None, retry_mapping: Optional[dict[Union[httpx.codes, int, None], type[LimaRetryProcessor]]] = None, kwargs_mode: KwargsMode = KwargsMode.IGNORE, ) -> Any: """ Make a request to the server. This method will make a request to the server and return the response. It will handle the retry logic according to the retry mapping. :param sync: if the request should be sent synchronously or asynchronously :param method: the method of the request :param path: the path of the request. It may contain path parameters. :param path_params_mapping: the mapping for the path parameters :param kwargs: the keyword arguments of the function that will be used to generate the proper request :param return_class: the class that will be used to create the response object :param body_mapping: the mapping used for generate the body of the request :param file_mapping: the mapping used for generate the files of the request :param query_params_mapping: the mapping used for generate the query parameters of the request :param header_mapping: the mapping used for generate the headers of the request :param undefined_values: the values that are considered undefined :param headers: the headers of the request :param timeout: the timeout of the request :param response_mapping: the mapping used to know which exception to raise for each status code :param default_response_code: the default status code to use when the status code is not in the response mapping :param default_exception: the default exception to raise when the status code is not in the response mapping :param send_kwargs: the keyword arguments passed into the `client.send` method :param retry_mapping: the mapping used to know which retry processor to use for each status code :param kwargs_mode: how the kwargs that are not defined in the mappings are used to fill the request :return: the response object """ do_request = True if retry_mapping is None: retry_mapping = {} retry_objects: dict[Union[httpx.codes, int, None], LimaRetryProcessor] = {} while do_request: try: response = await self._request( sync=sync, method=method, path=path, path_params_mapping=path_params_mapping, kwargs=kwargs, return_class=return_class, body_mapping=body_mapping, file_mapping=file_mapping, query_params_mapping=query_params_mapping, header_mapping=header_mapping, undefined_values=undefined_values, headers=headers, timeout=timeout, response_mapping=response_mapping, default_response_code=default_response_code, default_exception=default_exception, send_kwargs=send_kwargs, kwargs_mode=kwargs_mode, ) return response except LimaException as ex: do_request = False retry_cls = None if ex.status_code in retry_mapping: retry_cls = retry_mapping[ex.status_code] elif ex.status_code in self.retry_mapping: retry_cls = self.retry_mapping[ex.status_code] if retry_cls: if ex.status_code not in retry_objects: retry_protocol = retry_cls() retry_objects[ex.status_code] = retry_protocol do_request = retry_objects[ex.status_code].do_retry(self, ex) if do_request: do_request = await retry_objects[ex.status_code].process(self, ex) if not do_request: raise ex async def _request( self, sync: bool, method: str, path: str, path_params_mapping: list[LimaParams], kwargs: dict, return_class: Any, body_mapping: Optional[LimaParams] = None, file_mapping: Optional[list[LimaParams]] = None, query_params_mapping: Optional[list[LimaParams]] = None, header_mapping: Optional[list[LimaParams]] = None, undefined_values: Optional[tuple[Any, ...]] = None, headers: Optional[dict[str, str]] = None, timeout: Optional[float] = None, response_mapping: Optional[dict[Union[httpx.codes, int], type[LimaException]]] = None, default_response_code: Optional[Union[httpx.codes, int]] = None, default_exception: Optional[type[LimaException]] = None, send_kwargs: Optional[dict] = None, kwargs_mode: KwargsMode = KwargsMode.IGNORE, ) -> Any: """ Send an HTTP request and return the response. This method constructs an HTTP request using the provided parameters, sends it, and processes the response. It handles automatic retries and exceptions. :param sync: Whether the request should be sent synchronously or asynchronously. :param method: The HTTP method to use for the request (e.g., 'GET', 'POST'). :param path: The URL path for the request, which may include path parameters. :param path_params_mapping: Mapping for path parameters used in the request URL. :param kwargs: Additional keyword arguments for constructing the request. :param return_class: The class type to use for the response object. :param body_mapping: Mapping used to generate the request body. :param file_mapping: Mapping used to generate files for the request. :param query_params_mapping: Mapping used to generate query parameters. :param header_mapping: Mapping used to generate request headers. :param undefined_values: Values considered undefined and to be ignored. :param headers: Additional headers for the request. :param timeout: Timeout for the request in seconds. :param response_mapping: Mapping to determine which exception to raise for each status code. :param default_response_code: Default status code expected in the response. :param default_exception: Default exception to raise if the status code is not in the mapping. :param send_kwargs: Additional keyword arguments for the `client.send` method. :param kwargs_mode: Specifies how undefined kwargs are processed. :return: An instance of the response class specified by `return_class`. :raises LimaException: If there is a connection error or if the response code is not in the response mapping. """ if send_kwargs is None: send_kwargs = self.default_send_kwargs auto_close = False if self.auto_start and (self.client is None or self.client.is_closed): auto_close = True await self.__aenter__() api_request = None api_response = None try: api_request = self._create_request( sync=sync, method=method, path=path, path_params_mapping=path_params_mapping, kwargs=kwargs, body_mapping=body_mapping, file_mapping=file_mapping, query_params_mapping=query_params_mapping, header_mapping=header_mapping, undefined_values=undefined_values, headers=headers, timeout=timeout, kwargs_mode=kwargs_mode, ) self.log( event=LogEvent.SEND_REQUEST, request=api_request, ) api_response = await self.client.send(api_request, **send_kwargs) except httpx.HTTPError as exc: url = api_request.url if api_request else f"{self.base_url}{path}" raise LimaException( detail=f"Connection error {url} - {exc.__class__} - {exc}", request=api_request, response=api_response, ) from exc finally: if auto_close: await self.__aexit__(*sys.exc_info()) response = self._create_response( api_response=api_response, return_class=return_class, response_mapping=response_mapping, default_response_code=default_response_code, default_exception=default_exception, ) return response class SyncLimaApi(LimaApiBase): def __init__(self, *args, auto_close: bool = True, **kwargs): self._auto_close: bool = auto_close and kwargs.get("auto_start", False) super().__init__(*args, **kwargs) transport = httpx.HTTPTransport(retries=self.retries) self.transport: SyncOpenTelemetryTransport = SyncOpenTelemetryTransport(transport) self.client: Optional[httpx.Client] = None self._lock = Lock() self._open_connections = 0 def __del__(self): if self.client: self.__exit__(None, None, None) @property def auto_close(self) -> bool: if self._auto_close and not self.auto_start: msg = "auto_close not allowed with auto_start=False, setting off" logging.warning(msg) self.log(event=LogEvent.SETUP, msg=msg) self._auto_close = False return self._auto_close def start_client(self) -> None: client_kwargs = self.client_kwargs.copy() client_kwargs["timeout"] = self.timeout client_kwargs["transport"] = self.transport client = httpx.Client(**client_kwargs) # noqa: S113 self.log( event=LogEvent.START_CLIENT, **client_kwargs, ) self.client = client.__enter__() def stop_client(self) -> None: self.log(event=LogEvent.STOP_CLIENT) if self.client: self.client.close() self.client = None def __enter__(self) -> "SyncLimaApi": self.start_client() return self def __exit__(self, exc_type, exc, tb) -> None: if self.client is not None: self.client.__exit__(exc_type, exc, tb) self.stop_client() def make_request( self, sync: bool, method: str, path: str, path_params_mapping: list[LimaParams], kwargs: dict, return_class: Any, body_mapping: Optional[LimaParams] = None, file_mapping: Optional[list[LimaParams]] = None, query_params_mapping: Optional[list[LimaParams]] = None, header_mapping: Optional[list[LimaParams]] = None, undefined_values: Optional[tuple[Any, ...]] = None, headers: Optional[dict[str, str]] = None, timeout: Optional[float] = None, response_mapping: Optional[dict[Union[httpx.codes, int], type[LimaException]]] = None, default_response_code: Optional[Union[httpx.codes, int]] = None, default_exception: Optional[type[LimaException]] = None, send_kwargs: Optional[dict] = None, retry_mapping: Optional[dict[Union[httpx.codes, int, None], type[LimaRetryProcessor]]] = None, kwargs_mode: KwargsMode = KwargsMode.IGNORE, ) -> Any: """ Make a request to the server. This method will make a request to the server and return the response. It will handle the retry logic according to the retry mapping. :param sync: if the request should be sent synchronously or asynchronously :param method: the method of the request :param path: the path of the request. It may contain path parameters. :param path_params_mapping: the mapping for the path parameters :param kwargs: the keyword arguments of the function that will be used to generate the proper request :param return_class: the class that will be used to create the response object :param body_mapping: the mapping used for generate the body of the request :param file_mapping: the mapping used for generate the files of the request :param query_params_mapping: the mapping used for generate the query parameters of the request :param header_mapping: the mapping used for generate the headers of the request :param undefined_values: the values that are considered undefined :param headers: the headers of the request :param timeout: the timeout of the request :param response_mapping: the mapping used to know which exception to raise for each status code :param default_response_code: the default status code to use when the status code is not in the response mapping :param default_exception: the default exception to raise when the status code is not in the response mapping :param send_kwargs: the keyword arguments passed into the `client.send` method :param retry_mapping: the mapping used to know which retry processor to use for each status code :param kwargs_mode: how the kwargs that are not defined in the mappings are used to fill the request :return: the response object """ do_request = True if retry_mapping is None: retry_mapping = {} retry_objects: dict[Union[httpx.codes, int, None], LimaRetryProcessor] = {} while do_request: try: response = self._request( sync=sync, method=method, path=path, path_params_mapping=path_params_mapping, kwargs=kwargs, return_class=return_class, body_mapping=body_mapping, file_mapping=file_mapping, query_params_mapping=query_params_mapping, header_mapping=header_mapping, undefined_values=undefined_values, headers=headers, timeout=timeout, response_mapping=response_mapping, default_response_code=default_response_code, default_exception=default_exception, send_kwargs=send_kwargs, kwargs_mode=kwargs_mode, ) return response except LimaException as ex: do_request = False retry_cls = None if ex.status_code in retry_mapping: retry_cls = retry_mapping[ex.status_code] elif ex.status_code in self.retry_mapping: retry_cls = self.retry_mapping[ex.status_code] if retry_cls: if ex.status_code not in retry_objects: retry_protocol = retry_cls() retry_objects[ex.status_code] = retry_protocol do_request = retry_objects[ex.status_code].do_retry(self, ex) if not do_request: raise ex def _request( self, sync: bool, method: str, path: str, path_params_mapping: list[LimaParams], kwargs: dict, return_class: Any, body_mapping: Optional[LimaParams] = None, file_mapping: Optional[list[LimaParams]] = None, query_params_mapping: Optional[list[LimaParams]] = None, header_mapping: Optional[list[LimaParams]] = None, undefined_values: Optional[tuple[Any, ...]] = None, headers: Optional[dict[str, str]] = None, timeout: Optional[float] = None, response_mapping: Optional[dict[Union[httpx.codes, int], type[LimaException]]] = None, default_response_code: Optional[Union[httpx.codes, int]] = None, default_exception: Optional[type[LimaException]] = None, send_kwargs: Optional[dict] = None, kwargs_mode: KwargsMode = KwargsMode.IGNORE, ) -> Any: """ Send an HTTP request and return the response. This method constructs an HTTP request using the provided parameters, sends it, and processes the response. It handles automatic retries and exceptions. :param sync: Whether the request should be sent synchronously or asynchronously. :param method: The HTTP method to use for the request (e.g., 'GET', 'POST'). :param path: The URL path for the request, which may include path parameters. :param path_params_mapping: Mapping for path parameters used in the request URL. :param kwargs: Additional keyword arguments for constructing the request. :param return_class: The class type to use for the response object. :param body_mapping: Mapping used to generate the request body. :param file_mapping: Mapping used to generate files for the request. :param query_params_mapping: Mapping used to generate query parameters. :param header_mapping: Mapping used to generate request headers. :param undefined_values: Values considered undefined and to be ignored. :param headers: Additional headers for the request. :param timeout: Timeout for the request in seconds. :param response_mapping: Mapping to determine which exception to raise for each status code. :param default_response_code: Default status code expected in the response. :param default_exception: Default exception to raise if the status code is not in the mapping. :param send_kwargs: Additional keyword arguments for the `client.send` method. :param kwargs_mode: Specifies how undefined kwargs are processed. :return: An instance of the response class specified by `return_class`. :raises LimaException: If there is a connection error or if the response code is not in the response mapping. """ if send_kwargs is None: send_kwargs = self.default_send_kwargs if self.auto_close: with self._lock: self._open_connections += 1 api_request = None api_response = None try: if self.auto_start and (self.client is None or self.client.is_closed): self.__enter__() api_request = self._create_request( sync=sync, method=method, path=path, path_params_mapping=path_params_mapping, kwargs=kwargs, body_mapping=body_mapping, file_mapping=file_mapping, query_params_mapping=query_params_mapping, header_mapping=header_mapping, undefined_values=undefined_values, headers=headers, timeout=timeout, kwargs_mode=kwargs_mode, ) self.log( event=LogEvent.SEND_REQUEST, request=api_request, ) api_response = self.client.send(api_request, **send_kwargs) except httpx.HTTPError as exc: url = api_request.url if api_request else f"{self.base_url}{path}" raise LimaException( detail=f"Connection error {url} - {exc.__class__} - {exc}", request=api_request, response=api_response, ) from exc finally: if self.auto_close: with self._lock: self._open_connections -= 1 if not bool(self._open_connections): self.__exit__(*sys.exc_info()) response = self._create_response( api_response=api_response, return_class=return_class, response_mapping=response_mapping, default_response_code=default_response_code, default_exception=default_exception, ) return response OriginalFunc = Callable[[LimaApi, Any], Any] DecoratedFunc = Callable[[LimaApi, Any], Any] def method_factory(method): def http_method( path: str, timeout: Optional[float] = None, default_response_code: Optional[Union[httpx.codes, int]] = None, response_mapping: Optional[dict[Union[httpx.codes, int], type[LimaException]]] = None, undefined_values: Optional[tuple[Any, ...]] = None, headers: Optional[dict[str, str]] = None, default_exception: Optional[type[LimaException]] = None, retry_mapping: Optional[dict[Union[httpx.codes, int, None], type[LimaRetryProcessor]]] = None, kwargs_mode: KwargsMode = KwargsMode.IGNORE, ) -> Callable: """ :param path: The URL path for the request, which may include path parameters. :param timeout: Timeout for the request in seconds. :param default_response_code: Default status code expected in the response. :param response_mapping: Mapping to determine which exception to raise for each status code. :param undefined_values: list of values that indicate undefined behavior :param default_response_code: Default status code expected in the response. :param headers: Additional headers for the request. :param default_exception: Default exception to raise if the status code is not in the mapping. :param retry_mapping: the mapping used to know which retry processor to use for each status code :param kwargs_mode: Specifies how undefined kwargs are processed. :return: An instance of the response class specified by function. """ path = path.replace(" ", "") if method == "GET" and kwargs_mode != KwargsMode.IGNORE: kwargs_mode = KwargsMode.QUERY def _http_method(func: OriginalFunc) -> DecoratedFunc: sig: Signature = inspect.signature(func) return_class = sig.return_annotation if return_class is inspect.Signature.empty: raise TypeError("Required return type") is_async = inspect.iscoroutinefunction(func) ( query_params_mapping, path_params_mapping, body_mapping, header_mapping, file_mapping, ) = get_mappings(path, sig.parameters, method) if is_async: async def _func(self: LimaApi, *args: Any, **kwargs: Any) -> Any: return await self.make_request( not is_async, method, path, path_params_mapping, kwargs, return_class, body_mapping=body_mapping, query_params_mapping=query_params_mapping, file_mapping=file_mapping, header_mapping=header_mapping, undefined_values=undefined_values, headers=headers, timeout=timeout, response_mapping=response_mapping, default_response_code=default_response_code, default_exception=default_exception, retry_mapping=retry_mapping, kwargs_mode=kwargs_mode, ) else: def _func(self: SyncLimaApi, *args: Any, **kwargs: Any) -> Any: if not hasattr(self, "_lock"): raise LimaException(detail="sync function in async client") return self.make_request( not is_async, method, path, path_params_mapping, kwargs, return_class, body_mapping=body_mapping, query_params_mapping=query_params_mapping, file_mapping=file_mapping, header_mapping=header_mapping, undefined_values=undefined_values, headers=headers, timeout=timeout, response_mapping=response_mapping, default_response_code=default_response_code, default_exception=default_exception, retry_mapping=retry_mapping, kwargs_mode=kwargs_mode, ) setattr(_func, "__wrapped__", func) return _func return _http_method return http_method get = method_factory("GET") post = method_factory("POST") put = method_factory("PUT") head = method_factory("HEAD") patch = method_factory("PATCH") options = method_factory("OPTIONS") delete = method_factory("DELETE")