181 lines
5.6 KiB
Python
181 lines
5.6 KiB
Python
#!@python@/bin/python
|
|
|
|
import asyncio
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import functools
|
|
from typing import Any, AsyncIterable, Awaitable, Callable, Optional, TypeVar
|
|
from typing import cast
|
|
from os import path, listdir
|
|
import json
|
|
|
|
import boto3
|
|
from botocore.response import StreamingBody
|
|
|
|
ENDPOINT_URL: str = "https://s3.us-west-000.backblazeb2.com"
|
|
BUCKET_NAME: str = "cache-chir-rs"
|
|
|
|
executor: ThreadPoolExecutor = ThreadPoolExecutor()
|
|
|
|
F = TypeVar('F', bound=Callable[..., Any])
|
|
T = TypeVar('T')
|
|
|
|
|
|
def with_backoff(
|
|
f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
|
|
|
|
async def with_backoff_wrapper(*args: Any, **kwargs: Any) -> T:
|
|
last_delay = 2
|
|
while True:
|
|
try:
|
|
return await f(*args, **kwargs)
|
|
except Exception as e:
|
|
print(f"{e}")
|
|
if last_delay >= 120:
|
|
raise
|
|
await asyncio.sleep(last_delay)
|
|
last_delay *= last_delay
|
|
|
|
return with_backoff_wrapper
|
|
|
|
|
|
def aio(f: Callable[..., T]) -> Callable[..., Awaitable[T]]:
|
|
|
|
async def aio_wrapper(*args: Any, **kwargs: Any) -> T:
|
|
f_bound: Callable[[], T] = functools.partial(f, *args, **kwargs)
|
|
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(executor, f_bound)
|
|
|
|
return aio_wrapper
|
|
|
|
|
|
@aio
|
|
def exists_locally(store_path: str) -> bool:
|
|
return path.exists(store_path)
|
|
|
|
|
|
class NarInfo(object):
|
|
|
|
def __init__(self, narinfo: str) -> None:
|
|
self.compression = "bzip2"
|
|
for narinfo_line in narinfo.splitlines():
|
|
key, value = narinfo_line.split(": ", 1)
|
|
if key == "StorePath":
|
|
self.store_path = value
|
|
elif key == "URL":
|
|
self.url = value
|
|
elif key == "Compression":
|
|
self.compression = value
|
|
elif key == "FileHash":
|
|
self.file_hash = value
|
|
elif key == "FileSize":
|
|
self.file_size = int(value)
|
|
elif key == "NarHash":
|
|
self.nar_hash = value
|
|
elif key == "NarSize":
|
|
self.nar_size = int(value)
|
|
elif key == "References":
|
|
self.references = value.split()
|
|
elif key == "Deriver":
|
|
self.deriver = value
|
|
elif key == "System":
|
|
self.system = value
|
|
elif key == "Sig":
|
|
self.sig = value
|
|
elif key == "CA":
|
|
self.ca = value
|
|
|
|
async def exists_locally(self) -> bool:
|
|
return await exists_locally(self.store_path)
|
|
|
|
|
|
s3 = boto3.client("s3", endpoint_url=ENDPOINT_URL)
|
|
|
|
|
|
@with_backoff
|
|
@aio
|
|
def get_object(Key: str) -> str:
|
|
obj = s3.get_object(Bucket=BUCKET_NAME, Key=Key)
|
|
if "Body" not in obj:
|
|
raise Exception("No Body")
|
|
if isinstance(obj["Body"], StreamingBody):
|
|
return obj["Body"].read().decode("utf-8")
|
|
raise Exception("Not StreamingBody")
|
|
|
|
|
|
async def list_cache_objects() -> AsyncIterable[str]:
|
|
|
|
@with_backoff
|
|
@aio
|
|
def list_objects_v2(ContinuationToken: Optional[str]) -> dict[str, Any]:
|
|
if ContinuationToken is not None:
|
|
return s3.list_objects_v2(Bucket=BUCKET_NAME,
|
|
ContinuationToken=ContinuationToken)
|
|
else:
|
|
return s3.list_objects_v2(Bucket=BUCKET_NAME)
|
|
|
|
cont_token = None
|
|
while True:
|
|
objs = await list_objects_v2(cont_token)
|
|
if "Contents" not in objs:
|
|
raise Exception("No Contents")
|
|
if isinstance(objs["Contents"], list):
|
|
for obj in cast(list[Any], objs["Contents"]):
|
|
if not isinstance(obj, dict):
|
|
raise Exception("Not dict")
|
|
obj = cast(dict[str, Any], obj)
|
|
yield obj["Key"]
|
|
|
|
if "NextContinuationToken" not in objs:
|
|
break
|
|
cont_token = objs["NextContinuationToken"]
|
|
|
|
|
|
@with_backoff
|
|
@aio
|
|
def delete_object(key: str) -> None:
|
|
s3.delete_object(Bucket=BUCKET_NAME, Key=key)
|
|
|
|
|
|
def get_store_hashes() -> set[str]:
|
|
hashes = set()
|
|
for obj in listdir("/nix/store"):
|
|
hashes.add(obj.split("-")[0])
|
|
return hashes
|
|
|
|
|
|
async def main() -> None:
|
|
nars_to_delete = set()
|
|
nars_to_keep = set()
|
|
async for obj_key in list_cache_objects():
|
|
if obj_key.endswith(".narinfo"):
|
|
# check if we have the hash locally
|
|
narinfo = await get_object(obj_key)
|
|
narinfo = NarInfo(narinfo)
|
|
if not await narinfo.exists_locally():
|
|
print(f"Found unused NAR for {narinfo.store_path}")
|
|
await delete_object(obj_key)
|
|
nars_to_delete.add(narinfo.url)
|
|
else:
|
|
nars_to_keep.add(narinfo.url)
|
|
if obj_key.startswith("realisations/"):
|
|
realisation = await get_object(obj_key)
|
|
realisation = json.loads(realisation)
|
|
if not isinstance(realisation, dict):
|
|
continue
|
|
if "outPath" not in realisation:
|
|
continue
|
|
if not await exists_locally("/nix/store/" +
|
|
realisation["outPath"]):
|
|
print(f"Found unused realisation for {realisation['outPath']}")
|
|
await delete_object(obj_key)
|
|
if obj_key.startswith("nar/"):
|
|
nars_to_delete.add(obj_key)
|
|
for nar in nars_to_delete:
|
|
if nar in nars_to_keep:
|
|
continue
|
|
print(f"Deleting unused NAR {nar}")
|
|
await delete_object(nar)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.get_event_loop().run_until_complete(main())
|