From a232e59788a8370d4d1bc6b95d564e90399da56f Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Thu, 20 Apr 2023 23:56:54 -0400 Subject: [PATCH] Implement streaming bytes for pulling images --- Cargo.lock | 11 +++-- Cargo.toml | 1 + src/api/blobs.rs | 20 ++++++-- src/byte_stream.rs | 100 ++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + src/storage/filesystem.rs | 94 ++++++++++++++++++++--------------- src/storage/mod.rs | 8 ++- 7 files changed, 185 insertions(+), 50 deletions(-) create mode 100644 src/byte_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 803d1d3..572802d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -659,6 +659,7 @@ dependencies = [ "sha256", "sqlx", "tokio", + "tokio-util", "tracing", "tracing-log", "tracing-subscriber", @@ -1928,9 +1929,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.2" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" dependencies = [ "autocfg", "bytes", @@ -1943,7 +1944,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys", ] [[package]] @@ -1981,9 +1982,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.4" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 6dc0afa..ab4b278 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ bytes = "1.4.0" chrono = "0.4.23" actix-web = "4" tokio = { version = "1.21.2", features = [ "fs", "macros" ] } +tokio-util = { version = "0.7.7", features = [ "io" ] } clap = { version = "4.0.23", features = [ "derive" ] } diff --git a/src/api/blobs.rs b/src/api/blobs.rs index e958d9d..cf7f5c1 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -1,4 +1,5 @@ use actix_web::{HttpResponse, get, HttpRequest, web, head, delete}; +use futures::StreamExt; use crate::app_state::AppState; @@ -11,7 +12,7 @@ pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data, state: web::Data, + #[pin] + inner: Pin> + Send + 'static>>, + } +} + +impl ByteStream { + /// Create a new `ByteStream` by wrapping a `futures` stream. + pub fn new(stream: S) -> ByteStream + where + S: Stream> + Send + 'static, + { + ByteStream { + size_hint: None, + inner: Box::pin(stream), + } + } + + pub(crate) fn size_hint(&self) -> Option { + self.size_hint + } + + pub fn into_async_read(self) -> impl AsyncRead + Send + 'static { + ImplAsyncRead::new(self.inner) + } +} + +impl From> for ByteStream { + fn from(buf: Vec) -> ByteStream { + ByteStream { + size_hint: Some(buf.len()), + inner: Box::pin(stream::once(async move { Ok(Bytes::from(buf)) })), + } + } +} + +impl std::fmt::Debug for ByteStream { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "", self.size_hint) + } +} + +impl Stream for ByteStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_next(cx) + } +} + +pin_project! { + struct ImplAsyncRead { + buffer: BytesMut, + #[pin] + stream: futures::stream::Fuse> + Send>>>, + } +} + +impl ImplAsyncRead { + fn new(stream: Pin> + Send>>) -> Self { + ImplAsyncRead { + buffer: BytesMut::new(), + stream: stream.fuse(), + } + } +} + +impl AsyncRead for ImplAsyncRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf, + ) -> Poll> { + let this = self.project(); + if this.buffer.is_empty() { + match futures::ready!(this.stream.poll_next(cx)) { + None => return Poll::Ready(Ok(())), + Some(Err(e)) => return Poll::Ready(Err(e)), + Some(Ok(bytes)) => { + this.buffer.put(bytes); + } + } + } + let available = std::cmp::min(buf.remaining(), this.buffer.len()); + let bytes = this.buffer.split_to(available); + buf.put_slice(&bytes); + Poll::Ready(Ok(())) + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 6f07fec..30a2ca1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod app_state; mod database; mod dto; mod storage; +mod byte_stream; use std::sync::Arc; diff --git a/src/storage/filesystem.rs b/src/storage/filesystem.rs index 0987add..a25f673 100644 --- a/src/storage/filesystem.rs +++ b/src/storage/filesystem.rs @@ -1,19 +1,18 @@ -use std::{path::Path, io::ErrorKind, sync::Arc, collections::HashMap}; +use std::{path::Path, io::{ErrorKind, BufRead, BufReader, Read}, sync::Arc, collections::HashMap}; use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; -use futures::{executor::block_on, StreamExt}; +use futures::{executor::block_on, StreamExt, Stream}; use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking, sync::{Mutex, mpsc}}; +use tokio_util::io::ReaderStream; use tracing::debug; +use crate::byte_stream::ByteStream; + use super::{StorageDriver, StorageDriverStreamer, Streamer}; pub struct FilesystemStreamer { - /* new_streams_channel: mpsc::Receiver<(String, mpsc::Receiver)>, - // (digest, receiver) - streaming_channels: Vec<(String, )>, */ - storage_path: String, chunk_channel: mpsc::Receiver<(String, Bytes)>, cached_files: HashMap, @@ -61,20 +60,13 @@ impl Streamer for FilesystemStreamer { pub struct FilesystemDriver { storage_path: String, - // (digest, receiver) - streaming_channels: Vec<(String, mpsc::Receiver)>, streamer_sender: mpsc::Sender<(String, Bytes)>, } impl FilesystemDriver { - //pub fn new(storage_path: &str) -> FilesystemDriver { pub fn new(storage_path: String, stream_sender: mpsc::Sender<(String, Bytes)>) -> FilesystemDriver { - /* let (send, recv) = mpsc::channel::<(String, Bytes)>(50); - let streamer = Arc::new(FilesystemStreamer::new(storage_path.to_string(), recv)); */ - Self { storage_path, - streaming_channels: vec![], streamer_sender: stream_sender, } } @@ -82,25 +74,6 @@ impl FilesystemDriver { fn get_digest_path(&self, digest: &str) -> String { format!("{}/{}", self.storage_path, digest) } - - async fn write_payload(&self, digest: &str, mut payload: actix_web::web::Payload, append: bool) -> anyhow::Result { - let path = self.get_digest_path(digest); - let mut file = fs::OpenOptions::new() - .write(true) - .append(append) - .create(true) - .open(path).await?; - - let mut total_size = 0; - while let Some(item) = payload.next().await { - let item = item?; - - total_size += item.len(); - file.write_all(&item).await?; - } - - Ok(total_size) - } } impl StorageDriverStreamer for FilesystemDriver { @@ -111,17 +84,44 @@ impl StorageDriverStreamer for FilesystemDriver { fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)> { self.streamer_sender.clone() } + + fn has_digest(&self, digest: &str) -> anyhow::Result { + let path = self.get_digest_path(digest); + + Ok(Path::new(&path).exists()) + } + + fn stream_bytes(&self, digest: &str) -> anyhow::Result> { + let path = self.get_digest_path(digest); + + if self.has_digest(digest)? { + + + //tokio::spawn(async { + let s = async_stream::try_stream! { + let file = fs::File::open(&path).await.unwrap(); + + let mut s = ReaderStream::new(file); + + while let Some(item) = s.next().await { + if let Ok(bytes) = item { + yield bytes; + //sender.send((layer_uuid.clone(), bytes)).await.unwrap(); + } + } + //file.re + }; + //}); + + Ok(Some(ByteStream::new(s))) + } else { + Ok(None) + } + } } #[async_trait] impl StorageDriver for FilesystemDriver { - async fn has_digest(&self, digest: &str) -> anyhow::Result { - let path = self.get_digest_path(digest); - - spawn_blocking(move || { - return Path::new(&path).exists() - }).await.context("FilesystemDriver: Failure to spawn blocking thread to check digest") - } async fn get_digest(&self, digest: &str) -> anyhow::Result> { let mut file = match fs::File::open(self.get_digest_path(digest)) @@ -137,7 +137,7 @@ impl StorageDriver for FilesystemDriver { .context("FilesystemDriver: Failure to open digest file"); } } - }; + }; let mut buf = Vec::new(); file.read_to_end(&mut buf).await?; @@ -145,6 +145,20 @@ impl StorageDriver for FilesystemDriver { Ok(Some(Bytes::from_iter(buf))) } + async fn get_digest_stream(&self, digest: &str) -> anyhow::Result> { + let path = self.get_digest_path(digest); + + /* if self.has_digest(digest) { + let s = async_stream::stream! { + + }; + } else { + Ok(None) + } */ + + todo!() + } + async fn digest_length(&self, digest: &str) -> anyhow::Result> { let file = match fs::File::open(self.get_digest_path(digest)) .await { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b4588e2..6282ab0 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -9,6 +9,8 @@ use tokio::{io::{AsyncWrite, AsyncRead}, sync::{Mutex, mpsc}}; use actix_web::web; +use crate::byte_stream::ByteStream; + pub trait Streamer { fn start(&'static mut self) -> anyhow::Result<()>; } @@ -16,14 +18,18 @@ pub trait Streamer { pub trait StorageDriverStreamer { fn supports_streaming(&self) -> bool; fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>; + fn has_digest(&self, digest: &str) -> anyhow::Result; + fn stream_bytes(&self, digest: &str) -> anyhow::Result>; } #[async_trait] pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ { - async fn has_digest(&self, digest: &str) -> anyhow::Result; async fn get_digest(&self, digest: &str) -> anyhow::Result>; + async fn get_digest_stream(&self, digest: &str) -> anyhow::Result>; async fn digest_length(&self, digest: &str) -> anyhow::Result>; async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>; async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>; async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>; + + //async fn stream_bytes(&self, stream: Box>) -> anyhow::Result<()>; } \ No newline at end of file