Skip to content

API

RetryTransport

Bases: BaseTransport, AsyncBaseTransport

A transport that automatically retries requests.

with httpx.Client(transport=RetryTransport()) as client:
    response = client.get("https://example.com")

async with httpx.AsyncClient(transport=RetryTransport()) as client:
    response = await client.get("https://example.com")

If you want to use a specific retry strategy, provide a Retry configuration:

retry = Retry(total=5, backoff_factor=0.5)
transport = RetryTransport(retry=retry)

with httpx.Client(transport=transport) as client:
    response = client.get("https://example.com")

By default, the implementation will create a sync and async transport internally, and use whichever is appropriate for the request. If you want to configure your own transport, provide it to the transport argument:

transport = RetryTransport(transport=httpx.HTTPTransport(local_address="0.0.0.0"))

Parameters:

Name Type Description Default
transport Optional[Union[HTTPTransport, AsyncHTTPTransport]]

Optional transport to wrap. If not provided, async and sync transports are created internally.

None
retry Optional[Retry]

The retry configuration.

None
Source code in httpx_retries/transport.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class RetryTransport(httpx.BaseTransport, httpx.AsyncBaseTransport):
    """
    A transport that automatically retries requests.

    ```python
    with httpx.Client(transport=RetryTransport()) as client:
        response = client.get("https://example.com")

    async with httpx.AsyncClient(transport=RetryTransport()) as client:
        response = await client.get("https://example.com")
    ```

    If you want to use a specific retry strategy, provide a [Retry][httpx_retries.Retry] configuration:

    ```python
    retry = Retry(total=5, backoff_factor=0.5)
    transport = RetryTransport(retry=retry)

    with httpx.Client(transport=transport) as client:
        response = client.get("https://example.com")
    ```

    By default, the implementation will create a sync and async transport internally, and use whichever is appropriate
    for the request. If you want to configure your own transport, provide it to the `transport` argument:

    ```python
    transport = RetryTransport(transport=httpx.HTTPTransport(local_address="0.0.0.0"))
    ```

    Args:
        transport: Optional transport to wrap. If not provided, async and sync transports are created internally.
        retry: The retry configuration.
    """

    def __init__(
        self,
        transport: Optional[Union[httpx.HTTPTransport, httpx.AsyncHTTPTransport]] = None,
        retry: Optional[Retry] = None,
    ) -> None:
        self.retry = retry or Retry()

        if transport is not None:
            self._sync_transport = transport if isinstance(transport, httpx.HTTPTransport) else None
            self._async_transport = transport if isinstance(transport, httpx.AsyncHTTPTransport) else None
        else:
            self._sync_transport = httpx.HTTPTransport()
            self._async_transport = httpx.AsyncHTTPTransport()

    def handle_request(self, request: httpx.Request) -> httpx.Response:
        """
        Sends an HTTP request, possibly with retries.

        Args:
            request (httpx.Request): The request to send.

        Returns:
            The final response.
        """
        if self._sync_transport is None:
            raise RuntimeError("Synchronous request received but no sync transport available")

        if self.retry.is_retryable_method(request.method):
            send_method = partial(self._sync_transport.handle_request)
            response = self._retry_operation(request, send_method)
        else:
            response = self._sync_transport.handle_request(request)
        return response

    async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
        """Sends an HTTP request, possibly with retries.

        Args:
            request: The request to perform.

        Returns:
            The final response.
        """
        if self._async_transport is None:
            raise RuntimeError("Async request received but no async transport available")

        if self.retry.is_retryable_method(request.method):
            send_method = partial(self._async_transport.handle_async_request)
            response = await self._retry_operation_async(request, send_method)
        else:
            response = await self._async_transport.handle_async_request(request)
        return response

    def _retry_operation(
        self,
        request: httpx.Request,
        send_method: Callable[..., httpx.Response],
    ) -> httpx.Response:
        retry = self.retry
        response = None

        while True:
            if response is not None:
                retry.sleep(response)
                retry = retry.increment()

            response = send_method(request)
            if retry.is_exhausted() or not retry.is_retryable_status_code(response.status_code):
                return response

    async def _retry_operation_async(
        self,
        request: httpx.Request,
        send_method: Callable[..., Coroutine[Any, Any, httpx.Response]],
    ) -> httpx.Response:
        retry = self.retry
        response = None

        while True:
            if response is not None:
                await retry.asleep(response)
                retry = retry.increment()

            response = await send_method(request)
            if retry.is_exhausted() or not retry.is_retryable_status_code(response.status_code):
                return response

handle_async_request(request) async

Sends an HTTP request, possibly with retries.

Parameters:

Name Type Description Default
request Request

The request to perform.

required

Returns:

Type Description
Response

The final response.

Source code in httpx_retries/transport.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
    """Sends an HTTP request, possibly with retries.

    Args:
        request: The request to perform.

    Returns:
        The final response.
    """
    if self._async_transport is None:
        raise RuntimeError("Async request received but no async transport available")

    if self.retry.is_retryable_method(request.method):
        send_method = partial(self._async_transport.handle_async_request)
        response = await self._retry_operation_async(request, send_method)
    else:
        response = await self._async_transport.handle_async_request(request)
    return response

handle_request(request)

Sends an HTTP request, possibly with retries.

Parameters:

Name Type Description Default
request Request

The request to send.

required

Returns:

Type Description
Response

The final response.

Source code in httpx_retries/transport.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def handle_request(self, request: httpx.Request) -> httpx.Response:
    """
    Sends an HTTP request, possibly with retries.

    Args:
        request (httpx.Request): The request to send.

    Returns:
        The final response.
    """
    if self._sync_transport is None:
        raise RuntimeError("Synchronous request received but no sync transport available")

    if self.retry.is_retryable_method(request.method):
        send_method = partial(self._sync_transport.handle_request)
        response = self._retry_operation(request, send_method)
    else:
        response = self._sync_transport.handle_request(request)
    return response

Retry

A class to encapsulate retry logic and configuration.

Each retry attempt will create a new Retry object with updated values, so they can safely be reused.

If backoff_factor is set, it will use an exponential backoff with configurable jitter.

For complex use cases, you can override the backoff_strategy method.

Parameters:

Name Type Description Default
total int

The maximum number of times to retry a request before giving up.

10
max_backoff_wait float

The maximum time in seconds to wait between retries.

120.0
backoff_factor float

The factor by which the wait time increases with each retry attempt.

0.0
respect_retry_after_header bool

Whether to respect the Retry-After header in HTTP responses when deciding how long to wait before retrying.

True
allowed_methods Iterable[HTTPMethod, str]

The HTTP methods that can be retried. Defaults to ["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"].

None
status_forcelist Iterable[HTTPStatus, int]

The HTTP status codes that can be retried. Defaults to [429, 502, 503, 504].

None
backoff_jitter float

The amount of jitter to add to the backoff time, between 0 and 1. Defaults to 1 (full jitter).

1.0
attempts_made int

The number of attempts already made.

0
Source code in httpx_retries/retry.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class Retry:
    """
    A class to encapsulate retry logic and configuration.

    Each retry attempt will create a new [Retry][httpx_retries.Retry] object with updated values,
    so they can safely be reused.

    If `backoff_factor` is set, it will use an exponential backoff with configurable jitter.

    For complex use cases, you can override the `backoff_strategy` method.

    Args:
        total (int, optional): The maximum number of times to retry a request before giving up.
        max_backoff_wait (float, optional): The maximum time in seconds to wait between retries.
        backoff_factor (float, optional): The factor by which the wait time increases with each retry attempt.
        respect_retry_after_header (bool, optional): Whether to respect the Retry-After header in HTTP responses
            when deciding how long to wait before retrying.
        allowed_methods (Iterable[http.HTTPMethod, str], optional): The HTTP methods that can be retried. Defaults to
            ["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"].
        status_forcelist (Iterable[http.HTTPStatus, int], optional): The HTTP status codes that can be retried.
            Defaults to [429, 502, 503, 504].
        backoff_jitter (float, optional): The amount of jitter to add to the backoff time, between 0 and 1.
            Defaults to 1 (full jitter).
        attempts_made (int, optional): The number of attempts already made.
    """

    RETRYABLE_METHODS: Final[frozenset[HTTPMethod]] = frozenset(
        [HTTPMethod.HEAD, HTTPMethod.GET, HTTPMethod.PUT, HTTPMethod.DELETE, HTTPMethod.OPTIONS, HTTPMethod.TRACE]
    )
    RETRYABLE_STATUS_CODES: Final[frozenset[HTTPStatus]] = frozenset(
        [
            HTTPStatus.TOO_MANY_REQUESTS,
            HTTPStatus.BAD_GATEWAY,
            HTTPStatus.SERVICE_UNAVAILABLE,
            HTTPStatus.GATEWAY_TIMEOUT,
        ]
    )

    def __init__(
        self,
        total: int = 10,
        allowed_methods: Optional[Iterable[Union[HTTPMethod, str]]] = None,
        status_forcelist: Optional[Iterable[Union[HTTPStatus, int]]] = None,
        backoff_factor: float = 0.0,
        respect_retry_after_header: bool = True,
        max_backoff_wait: float = 120.0,
        backoff_jitter: float = 1.0,
        attempts_made: int = 0,
    ) -> None:
        """Initialize a new Retry instance."""
        if total < 0:
            raise ValueError("total must be non-negative")
        if backoff_factor < 0:
            raise ValueError("backoff_factor must be non-negative")
        if max_backoff_wait <= 0:
            raise ValueError("max_backoff_wait must be positive")
        if not 0 <= backoff_jitter <= 1:
            raise ValueError("backoff_jitter must be between 0 and 1")
        if attempts_made < 0:
            raise ValueError("attempts_made must be non-negative")

        self.total = total
        self.backoff_factor = backoff_factor
        self.respect_retry_after_header = respect_retry_after_header
        self.max_backoff_wait = max_backoff_wait
        self.backoff_jitter = backoff_jitter
        self.attempts_made = attempts_made

        self.allowed_methods = frozenset(
            HTTPMethod(method.upper()) for method in (allowed_methods or self.RETRYABLE_METHODS)
        )
        self.status_forcelist = frozenset(
            HTTPStatus(int(code)) for code in (status_forcelist or self.RETRYABLE_STATUS_CODES)
        )

    def is_retryable_method(self, method: str) -> bool:
        """Check if a method is retryable."""
        return HTTPMethod(method.upper()) in self.allowed_methods

    def is_retryable_status_code(self, status_code: int) -> bool:
        """Check if a status code is retryable."""
        return HTTPStatus(status_code) in self.status_forcelist

    def is_retry(self, method: str, status_code: int, has_retry_after: bool) -> bool:
        """
        Check if a method and status code are retryable.

        This functions identically to urllib3's `Retry.is_retry` method.
        """
        return (
            self.total > 0
            and self.is_retryable_method(method)
            and self.is_retryable_status_code(status_code)
            and not has_retry_after
        )

    def is_exhausted(self) -> bool:
        """Check if the retry attempts have been exhausted."""
        return self.attempts_made >= self.total

    def parse_retry_after(self, retry_after: str) -> float:
        """
        Parse the Retry-After header.

        Args:
            retry_after: The Retry-After header value.

        Returns:
            The number of seconds to wait before retrying.

        Raises:
            ValueError: If the Retry-After header is not a valid number or HTTP date.
        """
        retry_after = retry_after.strip()
        if retry_after.isdigit():
            return float(retry_after)

        try:
            parsed_date = parsedate_to_datetime(retry_after)
            if parsed_date.tzinfo is None:
                parsed_date = parsed_date.replace(tzinfo=datetime.timezone.utc)

            diff = (parsed_date - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
            return max(0.0, diff)
        except (TypeError, ValueError):
            raise ValueError(f"Invalid Retry-After header: {retry_after}")

    def backoff_strategy(self) -> float:
        """
        Calculate the backoff time based on the number of attempts.

        For complex use cases, you can override this method to implement a custom backoff strategy.

        ```python
        class CustomRetry(Retry):
            def backoff_strategy(self) -> float:
                if self.attempts_made == 3:
                    return 1.0

                return super().backoff_strategy()
        ```

        Returns:
            The calculated backoff time in seconds, capped by max_backoff_wait.
        """
        if self.backoff_factor == 0:
            return 0.0

        # Calculate exponential backoff
        backoff: float = self.backoff_factor * (2**self.attempts_made)

        # Apply jitter if configured
        if self.backoff_jitter > 0:
            backoff *= random.uniform(1 - self.backoff_jitter, 1)

        return min(backoff, self.max_backoff_wait)

    def _calculate_sleep(self, headers: Union[httpx.Headers, Mapping[str, str]]) -> float:
        """Calculate the sleep duration based on headers and backoff strategy."""
        # Check Retry-After header first if enabled
        if self.respect_retry_after_header:
            retry_after = headers.get("Retry-After", "").strip()
            if retry_after:
                try:
                    return min(self.parse_retry_after(retry_after), self.max_backoff_wait)
                except ValueError:
                    logger.warning("Retry-After header is not a valid HTTP date: %s", retry_after)

        # Fall back to backoff strategy
        return self.backoff_strategy() if self.attempts_made > 0 else 0.0

    def sleep(self, response: httpx.Response) -> None:
        """Sleep between retry attempts using the calculated duration."""
        time.sleep(self._calculate_sleep(response.headers))

    async def asleep(self, response: httpx.Response) -> None:
        """Sleep between retry attempts asynchronously using the calculated duration."""
        await asyncio.sleep(self._calculate_sleep(response.headers))

    def increment(self) -> "Retry":
        """Return a new Retry instance with the attempt count incremented."""
        return Retry(
            total=self.total,
            max_backoff_wait=self.max_backoff_wait,
            backoff_factor=self.backoff_factor,
            respect_retry_after_header=self.respect_retry_after_header,
            allowed_methods=self.allowed_methods,
            status_forcelist=self.status_forcelist,
            backoff_jitter=self.backoff_jitter,
            attempts_made=self.attempts_made + 1,
        )

asleep(response) async

Sleep between retry attempts asynchronously using the calculated duration.

Source code in httpx_retries/retry.py
206
207
208
async def asleep(self, response: httpx.Response) -> None:
    """Sleep between retry attempts asynchronously using the calculated duration."""
    await asyncio.sleep(self._calculate_sleep(response.headers))

backoff_strategy()

Calculate the backoff time based on the number of attempts.

For complex use cases, you can override this method to implement a custom backoff strategy.

class CustomRetry(Retry):
    def backoff_strategy(self) -> float:
        if self.attempts_made == 3:
            return 1.0

        return super().backoff_strategy()

Returns:

Type Description
float

The calculated backoff time in seconds, capped by max_backoff_wait.

Source code in httpx_retries/retry.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def backoff_strategy(self) -> float:
    """
    Calculate the backoff time based on the number of attempts.

    For complex use cases, you can override this method to implement a custom backoff strategy.

    ```python
    class CustomRetry(Retry):
        def backoff_strategy(self) -> float:
            if self.attempts_made == 3:
                return 1.0

            return super().backoff_strategy()
    ```

    Returns:
        The calculated backoff time in seconds, capped by max_backoff_wait.
    """
    if self.backoff_factor == 0:
        return 0.0

    # Calculate exponential backoff
    backoff: float = self.backoff_factor * (2**self.attempts_made)

    # Apply jitter if configured
    if self.backoff_jitter > 0:
        backoff *= random.uniform(1 - self.backoff_jitter, 1)

    return min(backoff, self.max_backoff_wait)

increment()

Return a new Retry instance with the attempt count incremented.

Source code in httpx_retries/retry.py
210
211
212
213
214
215
216
217
218
219
220
221
def increment(self) -> "Retry":
    """Return a new Retry instance with the attempt count incremented."""
    return Retry(
        total=self.total,
        max_backoff_wait=self.max_backoff_wait,
        backoff_factor=self.backoff_factor,
        respect_retry_after_header=self.respect_retry_after_header,
        allowed_methods=self.allowed_methods,
        status_forcelist=self.status_forcelist,
        backoff_jitter=self.backoff_jitter,
        attempts_made=self.attempts_made + 1,
    )

is_exhausted()

Check if the retry attempts have been exhausted.

Source code in httpx_retries/retry.py
127
128
129
def is_exhausted(self) -> bool:
    """Check if the retry attempts have been exhausted."""
    return self.attempts_made >= self.total

is_retry(method, status_code, has_retry_after)

Check if a method and status code are retryable.

This functions identically to urllib3's Retry.is_retry method.

Source code in httpx_retries/retry.py
114
115
116
117
118
119
120
121
122
123
124
125
def is_retry(self, method: str, status_code: int, has_retry_after: bool) -> bool:
    """
    Check if a method and status code are retryable.

    This functions identically to urllib3's `Retry.is_retry` method.
    """
    return (
        self.total > 0
        and self.is_retryable_method(method)
        and self.is_retryable_status_code(status_code)
        and not has_retry_after
    )

is_retryable_method(method)

Check if a method is retryable.

Source code in httpx_retries/retry.py
106
107
108
def is_retryable_method(self, method: str) -> bool:
    """Check if a method is retryable."""
    return HTTPMethod(method.upper()) in self.allowed_methods

is_retryable_status_code(status_code)

Check if a status code is retryable.

Source code in httpx_retries/retry.py
110
111
112
def is_retryable_status_code(self, status_code: int) -> bool:
    """Check if a status code is retryable."""
    return HTTPStatus(status_code) in self.status_forcelist

parse_retry_after(retry_after)

Parse the Retry-After header.

Parameters:

Name Type Description Default
retry_after str

The Retry-After header value.

required

Returns:

Type Description
float

The number of seconds to wait before retrying.

Raises:

Type Description
ValueError

If the Retry-After header is not a valid number or HTTP date.

Source code in httpx_retries/retry.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def parse_retry_after(self, retry_after: str) -> float:
    """
    Parse the Retry-After header.

    Args:
        retry_after: The Retry-After header value.

    Returns:
        The number of seconds to wait before retrying.

    Raises:
        ValueError: If the Retry-After header is not a valid number or HTTP date.
    """
    retry_after = retry_after.strip()
    if retry_after.isdigit():
        return float(retry_after)

    try:
        parsed_date = parsedate_to_datetime(retry_after)
        if parsed_date.tzinfo is None:
            parsed_date = parsed_date.replace(tzinfo=datetime.timezone.utc)

        diff = (parsed_date - datetime.datetime.now(datetime.timezone.utc)).total_seconds()
        return max(0.0, diff)
    except (TypeError, ValueError):
        raise ValueError(f"Invalid Retry-After header: {retry_after}")

sleep(response)

Sleep between retry attempts using the calculated duration.

Source code in httpx_retries/retry.py
202
203
204
def sleep(self, response: httpx.Response) -> None:
    """Sleep between retry attempts using the calculated duration."""
    time.sleep(self._calculate_sleep(response.headers))