為了避免由于一些網(wǎng)絡(luò)或等其他不可控因素,而引起的功能性問題。比如在發(fā)送請求時,會因為網(wǎng)絡(luò)不穩(wěn)定,往往會有請求超時的問題。
這種情況下,我們通常會在代碼中加入重試的代碼。重試的代碼本身不難實現(xiàn),但如何寫得優(yōu)雅、易用,是我們要考慮的問題。
這里要給大家介紹的是一個第三方庫 - Tenacity
(標題中的重試機制并并不準確,它不是 Python 的內(nèi)置模塊,因此并不能稱之為機制),它實現(xiàn)了幾乎我們可以使用到的所有重試場景,比如:
- 在什么情況下才進行重試?
- 重試幾次呢?
- 重試多久后結(jié)束?
- 每次重試的間隔多長呢?
- 重試失敗后的回調(diào)?
在使用它之前 ,先要安裝它
$ pip install tenacity
1. 最基本的重試
無條件重試,重試之間無間隔
from tenacity import retry
@retry
def test_retry():
print("等待重試,重試無間隔執(zhí)行...")
raise Exception
test_retry()
無條件重試,但是在重試之前要等待 2 秒
from tenacity import retry, wait_fixed
@retry(wait=wait_fixed(2))
def test_retry():
print("等待重試...")
raise Exception
test_retry()
2. 設(shè)置停止基本條件
只重試7 次
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(7))
def test_retry():
print("等待重試...")
raise Exception
test_retry()
重試 10 秒后不再重試
from tenacity import retry, stop_after_delay
@retry(stop=stop_after_delay(10))
def test_retry():
print("等待重試...")
raise Exception
test_retry()
或者上面兩個條件滿足一個就結(jié)束重試
from tenacity import retry, stop_after_delay, stop_after_attempt
@retry(stop=(stop_after_delay(10) | stop_after_attempt(7)))
def test_retry():
print("等待重試...")
raise Exception
test_retry()
3. 設(shè)置何時進行重試
在出現(xiàn)特定錯誤/異常(比如請求超時)的情況下,再進行重試
from requests import exceptions
from tenacity import retry, retry_if_exception_type
@retry(retry=retry_if_exception_type(exceptions.Timeout))
def test_retry():
print("等待重試...")
raise exceptions.Timeout
test_retry()
在滿足自定義條件時,再進行重試。
如下示例,當 test_retry
函數(shù)返回值為 False 時,再進行重試
from tenacity import retry, stop_after_attempt, retry_if_result
def is_false(value):
return value is False
@retry(stop=stop_after_attempt(3),
retry=retry_if_result(is_false))
def test_retry():
return False
test_retry()
4. 重試后錯誤重新拋出
當出現(xiàn)異常后,tenacity 會進行重試,若重試后還是失敗,默認情況下,往上拋出的異常會變成 RetryError,而不是最根本的原因。
因此可以加一個參數(shù)(reraise=True
),使得當重試失敗后,往外拋出的異常還是原來的那個。
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(7), reraise=True)
def test_retry():
print("等待重試...")
raise Exception
test_retry()
5. 設(shè)置回調(diào)函數(shù)
當最后一次重試失敗后,可以執(zhí)行一個回調(diào)函數(shù)
from tenacity import *
def return_last_value(retry_state):
print("執(zhí)行回調(diào)函數(shù)")
return retry_state.outcome.result() # 表示返回原函數(shù)的返回值
def is_false(value):
return value is False
@retry(stop=stop_after_attempt(3),
retry_error_callback=return_last_value,
retry=retry_if_result(is_false))
def test_retry():
print("等待重試中...")
return False
print(test_retry())
輸出如下
等待重試中...
等待重試中...
等待重試中...
執(zhí)行回調(diào)函數(shù)
False
-
模塊
+關(guān)注
關(guān)注
7文章
2773瀏覽量
49130 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4365瀏覽量
63913 -
代碼
+關(guān)注
關(guān)注
30文章
4882瀏覽量
70056 -
python
+關(guān)注
關(guān)注
56文章
4822瀏覽量
85900
發(fā)布評論請先 登錄
請問在什么情況下才需要使用通信協(xié)議
在什么情況下低壓線路保護中才要求配置零序電流互感器
AD9235-65在什么情況下才需要單端轉(zhuǎn)差分信號?
什么情況下數(shù)據(jù)能恢復和不能恢復
volte語音通話有什么用,什么情況下可以開/關(guān)volte
什么情況下使用示波器
什么情況下選用工業(yè)主板

評論