Lock file maintenance #8
1 changed files with 83 additions and 77 deletions
|
@ -97,94 +97,100 @@ impl CaStore {
|
||||||
let string_id = lexicographic_base64::encode(id.to_be_bytes());
|
let string_id = lexicographic_base64::encode(id.to_be_bytes());
|
||||||
let source_fname = format!("temp/{string_id}");
|
let source_fname = format!("temp/{string_id}");
|
||||||
|
|
||||||
/*info!("Starting multipart upload {id}");
|
let mut buf = BytesMut::with_capacity(5_000_000);
|
||||||
let multipart_result = self
|
reader.read_buf(&mut buf).await?;
|
||||||
.client
|
|
||||||
.create_multipart_upload()
|
|
||||||
.bucket(&*self.bucket)
|
|
||||||
.key(&source_fname)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("Creating multipart request for Request ID{id}"))?;
|
|
||||||
|
|
||||||
let mut buf = BytesMut::with_capacity(16 * 1024 * 1024); // 16MiB byte buffer for the file
|
|
||||||
let hasher = Arc::new(Mutex::new(Hasher::new()));
|
let hasher = Arc::new(Mutex::new(Hasher::new()));
|
||||||
|
|
||||||
let mut i = 1;
|
let hash = if buf.len() >= 5_000_000 {
|
||||||
let mut completed_multipart_upload_builder = CompletedMultipartUpload::builder();
|
info!("Starting multipart upload {id}");
|
||||||
|
let multipart_result = self
|
||||||
loop {
|
|
||||||
buf.clear();
|
|
||||||
reader.read_buf(&mut buf).await.context("Reading chunk")?;
|
|
||||||
if buf.is_empty() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Uploading part {i} for multipart upload {id}");
|
|
||||||
|
|
||||||
let buf2 = buf.clone();
|
|
||||||
let hasher = Arc::clone(&hasher);
|
|
||||||
let hasher_job = spawn_blocking(move || {
|
|
||||||
hasher.blocking_lock().update_rayon(&buf2);
|
|
||||||
});
|
|
||||||
|
|
||||||
let part_upload_fut = self
|
|
||||||
.client
|
.client
|
||||||
.upload_part()
|
.create_multipart_upload()
|
||||||
.bucket(&*self.bucket)
|
.bucket(&*self.bucket)
|
||||||
.key(&source_fname)
|
.key(&source_fname)
|
||||||
.set_upload_id(multipart_result.upload_id.clone())
|
.send()
|
||||||
.body(ByteStream::from(buf.to_vec()))
|
.await
|
||||||
.part_number(i)
|
.with_context(|| format!("Creating multipart request for Request ID{id}"))?;
|
||||||
.send();
|
|
||||||
|
|
||||||
let ((), part_upload_result) = try_join!(
|
let mut reader = buf.chain(reader);
|
||||||
async { hasher_job.await.context("Awaiting hasher job") },
|
|
||||||
async { part_upload_fut.await.context("Awaiting uploader job") }
|
let mut buf = BytesMut::with_capacity(16 * 1024 * 1024); // 16MiB byte buffer for the file
|
||||||
)
|
let hasher = Arc::new(Mutex::new(Hasher::new()));
|
||||||
.context("Awaiting job for chunk")?;
|
|
||||||
completed_multipart_upload_builder = completed_multipart_upload_builder.parts(
|
let mut i = 1;
|
||||||
CompletedPart::builder()
|
let mut completed_multipart_upload_builder = CompletedMultipartUpload::builder();
|
||||||
.e_tag(part_upload_result.e_tag.unwrap_or_default())
|
|
||||||
|
loop {
|
||||||
|
buf.clear();
|
||||||
|
reader.read_buf(&mut buf).await.context("Reading chunk")?;
|
||||||
|
if buf.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("Uploading part {i} for multipart upload {id}");
|
||||||
|
|
||||||
|
let buf2 = buf.clone();
|
||||||
|
let hasher = Arc::clone(&hasher);
|
||||||
|
let hasher_job = spawn_blocking(move || {
|
||||||
|
hasher.blocking_lock().update_rayon(&buf2);
|
||||||
|
});
|
||||||
|
|
||||||
|
let part_upload_fut = self
|
||||||
|
.client
|
||||||
|
.upload_part()
|
||||||
|
.bucket(&*self.bucket)
|
||||||
|
.key(&source_fname)
|
||||||
|
.set_upload_id(multipart_result.upload_id.clone())
|
||||||
|
.body(ByteStream::from(buf.to_vec()))
|
||||||
.part_number(i)
|
.part_number(i)
|
||||||
.build(),
|
.send();
|
||||||
);
|
|
||||||
i += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Finalizing Multipart Upload {id}");
|
let ((), part_upload_result) = try_join!(
|
||||||
|
async { hasher_job.await.context("Awaiting hasher job") },
|
||||||
|
async { part_upload_fut.await.context("Awaiting uploader job") }
|
||||||
|
)
|
||||||
|
.context("Awaiting job for chunk")?;
|
||||||
|
completed_multipart_upload_builder = completed_multipart_upload_builder.parts(
|
||||||
|
CompletedPart::builder()
|
||||||
|
.e_tag(part_upload_result.e_tag.unwrap_or_default())
|
||||||
|
.part_number(i)
|
||||||
|
.build(),
|
||||||
|
);
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
let hash = hasher.lock().await.finalize();
|
debug!("Finalizing Multipart Upload {id}");
|
||||||
self.client
|
|
||||||
.complete_multipart_upload()
|
|
||||||
.bucket(&*self.bucket)
|
|
||||||
.key(&source_fname)
|
|
||||||
.multipart_upload(completed_multipart_upload_builder.build())
|
|
||||||
.set_upload_id(multipart_result.upload_id)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.context("Completing multipart upload")?;*/
|
|
||||||
|
|
||||||
let hasher = Arc::new(Mutex::new(Hasher::new()));
|
let hash = hasher.lock().await.finalize();
|
||||||
let mut buf = Vec::new();
|
self.client
|
||||||
reader.read_to_end(&mut buf).await?;
|
.complete_multipart_upload()
|
||||||
let buf = Bytes::from(buf);
|
.bucket(&*self.bucket)
|
||||||
let buf2 = buf.clone();
|
.key(&source_fname)
|
||||||
let hasher2 = Arc::clone(&hasher);
|
.multipart_upload(completed_multipart_upload_builder.build())
|
||||||
spawn_blocking(move || {
|
.set_upload_id(multipart_result.upload_id)
|
||||||
hasher2.blocking_lock().update_rayon(&buf2);
|
.send()
|
||||||
})
|
.await
|
||||||
.await?;
|
.context("Completing multipart upload")?;
|
||||||
self.client
|
hash
|
||||||
.put_object()
|
} else {
|
||||||
.bucket(&*self.bucket)
|
let buf = Bytes::from(buf);
|
||||||
.key(&source_fname)
|
let buf2 = buf.clone();
|
||||||
.body(ByteStream::from(buf.to_vec()))
|
let hasher2 = Arc::clone(&hasher);
|
||||||
.send()
|
spawn_blocking(move || {
|
||||||
.await
|
hasher2.blocking_lock().update_rayon(&buf2);
|
||||||
.context("Uploading file")?;
|
})
|
||||||
|
.await?;
|
||||||
|
self.client
|
||||||
|
.put_object()
|
||||||
|
.bucket(&*self.bucket)
|
||||||
|
.key(&source_fname)
|
||||||
|
.body(ByteStream::from(buf.to_vec()))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.context("Uploading file")?;
|
||||||
|
|
||||||
let hash = hasher.lock().await.finalize();
|
hasher.lock().await.finalize()
|
||||||
|
};
|
||||||
|
|
||||||
let target_fname = lexicographic_base64::encode(hash.as_bytes());
|
let target_fname = lexicographic_base64::encode(hash.as_bytes());
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue