nixos-config/packages/clean-s3-cache.py

145 lines
4.7 KiB
Python
Executable file

#!#SHEBANG#
import asyncio
from concurrent.futures import ThreadPoolExecutor
import functools
from typing import Any, AsyncIterable, Awaitable, Callable, Optional, TypeVar, cast
from os import path
import datetime
import json
import boto3
from botocore.response import StreamingBody
ENDPOINT_URL: str = "https://s3.us-west-000.backblazeb2.com"
BUCKET_NAME: str = "cache-chir-rs"
yesterday = datetime.datetime.now().replace(
tzinfo=datetime.timezone.utc) - datetime.timedelta(days=1)
executor: ThreadPoolExecutor = ThreadPoolExecutor()
F = TypeVar('F', bound=Callable[..., Any])
T = TypeVar('T')
def aio(f: Callable[..., T]) -> Callable[..., Awaitable[T]]:
async def aio_wrapper(*args: Any, **kwargs: Any) -> T:
f_bound: Callable[[], T] = functools.partial(f, *args, **kwargs)
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, f_bound)
return aio_wrapper
@aio
def exists_locally(store_path: str) -> bool:
return path.exists(store_path)
class NarInfo(object):
def __init__(self, narinfo: str) -> None:
self.compression = "bzip2"
for narinfo_line in narinfo.splitlines():
key, value = narinfo_line.split(": ", 1)
if key == "StorePath":
self.store_path = value
elif key == "URL":
self.url = value
elif key == "Compression":
self.compression = value
elif key == "FileHash":
self.file_hash = value
elif key == "FileSize":
self.file_size = int(value)
elif key == "NarHash":
self.nar_hash = value
elif key == "NarSize":
self.nar_size = int(value)
elif key == "References":
self.references = value.split()
elif key == "Deriver":
self.deriver = value
elif key == "System":
self.system = value
elif key == "Sig":
self.sig = value
elif key == "CA":
self.ca = value
async def exists_locally(self) -> bool:
return await exists_locally(self.store_path)
s3 = boto3.client("s3", endpoint_url=ENDPOINT_URL)
@aio
def get_object(Key: str) -> str:
obj = s3.get_object(Bucket=BUCKET_NAME, Key=Key)
if "Body" not in obj:
raise Exception("No Body")
if isinstance(obj["Body"], StreamingBody):
return obj["Body"].read().decode("utf-8")
raise Exception("Not StreamingBody")
async def list_old_cache_objects() -> AsyncIterable[str]:
@aio
def list_objects_v2(
ContinuationToken: Optional[str]
) -> dict[str, Any]:
if ContinuationToken != None:
return s3.list_objects_v2(
Bucket=BUCKET_NAME,
ContinuationToken=ContinuationToken
)
else:
return s3.list_objects_v2(
Bucket=BUCKET_NAME
)
cont_token = None
while True:
objs = await list_objects_v2(cont_token)
if "Contents" not in objs:
raise Exception("No Contents")
if isinstance(objs["Contents"], list):
for obj in cast(list[Any], objs["Contents"]):
if not isinstance(obj, dict):
raise Exception("Not dict")
obj = cast(dict[str, Any], obj)
if "LastModified" in obj and isinstance(obj["LastModified"], datetime.datetime):
if obj["LastModified"] < yesterday:
yield obj["Key"]
if "NextContinuationToken" not in objs:
break
cont_token = objs["NextContinuationToken"]
@aio
def delete_object(key: str) -> None:
s3.delete_object(Bucket=BUCKET_NAME, Key=key)
async def main() -> None:
async for obj_key in list_old_cache_objects():
if obj_key.endswith(".narinfo"):
narinfo = await get_object(obj_key)
narinfo = NarInfo(narinfo)
if not await narinfo.exists_locally():
print(f"Found unused NAR for {narinfo.store_path}")
await delete_object(obj_key)
await delete_object(narinfo.url)
if obj_key.startswith("realisations/"):
realisation = await get_object(obj_key)
realisation = json.loads(realisation)
if not isinstance(realisation, dict):
continue
if "outPath" not in realisation:
continue
if not await exists_locally("/nix/store/" + realisation["outPath"]):
print(f"Found unused realisation for {realisation['outPath']}")
await delete_object(obj_key)
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())