Add exponential backoff for API functions
This commit is contained in:
parent
07003eba2b
commit
2507adeb8a
1 changed files with 15 additions and 2 deletions
|
@ -22,6 +22,18 @@ executor: ThreadPoolExecutor = ThreadPoolExecutor()
|
||||||
F = TypeVar('F', bound=Callable[..., Any])
|
F = TypeVar('F', bound=Callable[..., Any])
|
||||||
T = TypeVar('T')
|
T = TypeVar('T')
|
||||||
|
|
||||||
|
def with_backoff(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
|
||||||
|
async def with_backoff_wrapper(*args: Any, **kwargs: Any) -> T:
|
||||||
|
last_delay = 2
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
return await f(*args, **kwargs)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{e}")
|
||||||
|
await asyncio.sleep(last_delay)
|
||||||
|
last_delay *= last_delay
|
||||||
|
return with_backoff_wrapper
|
||||||
|
|
||||||
|
|
||||||
def aio(f: Callable[..., T]) -> Callable[..., Awaitable[T]]:
|
def aio(f: Callable[..., T]) -> Callable[..., Awaitable[T]]:
|
||||||
async def aio_wrapper(*args: Any, **kwargs: Any) -> T:
|
async def aio_wrapper(*args: Any, **kwargs: Any) -> T:
|
||||||
|
@ -72,7 +84,7 @@ class NarInfo(object):
|
||||||
|
|
||||||
s3 = boto3.client("s3", endpoint_url=ENDPOINT_URL)
|
s3 = boto3.client("s3", endpoint_url=ENDPOINT_URL)
|
||||||
|
|
||||||
|
@with_backoff
|
||||||
@aio
|
@aio
|
||||||
def get_object(Key: str) -> str:
|
def get_object(Key: str) -> str:
|
||||||
obj = s3.get_object(Bucket=BUCKET_NAME, Key=Key)
|
obj = s3.get_object(Bucket=BUCKET_NAME, Key=Key)
|
||||||
|
@ -84,6 +96,7 @@ def get_object(Key: str) -> str:
|
||||||
|
|
||||||
|
|
||||||
async def list_old_cache_objects() -> AsyncIterable[str]:
|
async def list_old_cache_objects() -> AsyncIterable[str]:
|
||||||
|
@with_backoff
|
||||||
@aio
|
@aio
|
||||||
def list_objects_v2(
|
def list_objects_v2(
|
||||||
ContinuationToken: Optional[str]
|
ContinuationToken: Optional[str]
|
||||||
|
@ -115,7 +128,7 @@ async def list_old_cache_objects() -> AsyncIterable[str]:
|
||||||
break
|
break
|
||||||
cont_token = objs["NextContinuationToken"]
|
cont_token = objs["NextContinuationToken"]
|
||||||
|
|
||||||
|
@with_backoff
|
||||||
@aio
|
@aio
|
||||||
def delete_object(key: str) -> None:
|
def delete_object(key: str) -> None:
|
||||||
s3.delete_object(Bucket=BUCKET_NAME, Key=key)
|
s3.delete_object(Bucket=BUCKET_NAME, Key=key)
|
||||||
|
|
Loading…
Reference in a new issue