Clean up the byte streaming code for pulling images

This commit is contained in:
SeanOMik 2023-04-21 00:02:07 -04:00
parent a232e59788
commit 6107a49b44
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
3 changed files with 19 additions and 37 deletions

View File

@ -33,14 +33,7 @@ pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data<App
if let Some(len) = storage.digest_length(&layer_digest).await.unwrap() { if let Some(len) = storage.digest_length(&layer_digest).await.unwrap() {
let stream = storage.stream_bytes(&layer_digest).unwrap().unwrap(); let stream = storage.stream_bytes(&layer_digest).await.unwrap().unwrap();
/* let s = async_stream::stream! {
let stream = storage.stream_bytes(&layer_digest).unwrap().unwrap().as_ref();
while let Some(item) = stream.next().await {
}
}; */
HttpResponse::Ok() HttpResponse::Ok()
.insert_header(("Content-Length", len)) .insert_header(("Content-Length", len))

View File

@ -90,38 +90,26 @@ impl StorageDriverStreamer for FilesystemDriver {
Ok(Path::new(&path).exists()) Ok(Path::new(&path).exists())
} }
fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>> {
let path = self.get_digest_path(digest);
if self.has_digest(digest)? {
//tokio::spawn(async {
let s = async_stream::try_stream! {
let file = fs::File::open(&path).await.unwrap();
let mut s = ReaderStream::new(file);
while let Some(item) = s.next().await {
if let Ok(bytes) = item {
yield bytes;
//sender.send((layer_uuid.clone(), bytes)).await.unwrap();
}
}
//file.re
};
//});
Ok(Some(ByteStream::new(s)))
} else {
Ok(None)
}
}
} }
#[async_trait] #[async_trait]
impl StorageDriver for FilesystemDriver { impl StorageDriver for FilesystemDriver {
async fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>> {
let file = match fs::File::open(self.get_digest_path(digest)).await {
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
return Ok(None)
},
_ => {
return Err(e).context("FilesystemDriver: Failure to open digest file");
}
}
};
let s = ReaderStream::new(file);
Ok(Some(ByteStream::new(s)))
}
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>> { async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>> {
let mut file = match fs::File::open(self.get_digest_path(digest)) let mut file = match fs::File::open(self.get_digest_path(digest))

View File

@ -19,7 +19,7 @@ pub trait StorageDriverStreamer {
fn supports_streaming(&self) -> bool; fn supports_streaming(&self) -> bool;
fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>; fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>;
fn has_digest(&self, digest: &str) -> anyhow::Result<bool>; fn has_digest(&self, digest: &str) -> anyhow::Result<bool>;
fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>; //fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
} }
#[async_trait] #[async_trait]
@ -31,5 +31,6 @@ pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead
async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>; async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>;
async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>; async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>;
async fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
//async fn stream_bytes(&self, stream: Box<dyn Stream<Item = Bytes>>) -> anyhow::Result<()>; //async fn stream_bytes(&self, stream: Box<dyn Stream<Item = Bytes>>) -> anyhow::Result<()>;
} }