From 6df253c0308f8f8b69efdd6ce1d250588d129648 Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Thu, 20 Apr 2023 18:05:20 -0400 Subject: [PATCH] Implement streaming bytes during image pushes --- Cargo.lock | 41 ++++++++++++- Cargo.toml | 4 +- src/api/uploads.rs | 46 ++++++++++----- src/main.rs | 22 +++++-- src/storage/filesystem.rs | 121 ++++++++++++++++++++++++++++++++++++-- src/storage/mod.rs | 26 +++++++- 6 files changed, 230 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ecdbf0..803d1d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,10 +81,11 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ea16c295198e958ef31930a6ef37d0fb64e9ca3b6116e6b93a8bdae96ee1000" +checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e" dependencies = [ + "actix-macros", "futures-core", "tokio", ] @@ -237,6 +238,28 @@ version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote 1.0.26", + "syn 2.0.15", +] + [[package]] name = "async-trait" version = "0.1.68" @@ -618,8 +641,10 @@ dependencies = [ name = "docker-registry" version = "0.1.0" dependencies = [ + "actix-rt", "actix-web", "anyhow", + "async-stream", "async-trait", "bytes", "chrono", @@ -1917,9 +1942,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "winapi", ] +[[package]] +name = "tokio-macros" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +dependencies = [ + "proc-macro2", + "quote 1.0.26", + "syn 1.0.109", +] + [[package]] name = "tokio-rustls" version = "0.23.4" diff --git a/Cargo.toml b/Cargo.toml index 5789f0d..6dc0afa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ bytes = "1.4.0" chrono = "0.4.23" actix-web = "4" -tokio = { version = "1.21.2", features = [ "fs" ] } +tokio = { version = "1.21.2", features = [ "fs", "macros" ] } clap = { version = "4.0.23", features = [ "derive" ] } @@ -33,3 +33,5 @@ qstring = "0.7.2" sha256 = "1.1.2" pin-project-lite = "0.2.9" anyhow = "1.0.70" +async-stream = "0.3.5" +actix-rt = "2.8.0" diff --git a/src/api/uploads.rs b/src/api/uploads.rs index 86473cd..0cd5c06 100644 --- a/src/api/uploads.rs +++ b/src/api/uploads.rs @@ -1,13 +1,14 @@ use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get}; use bytes::{BytesMut, Bytes, BufMut}; +use futures::{StreamExt, TryStreamExt}; use qstring::QString; use tokio::io::AsyncWriteExt; -use tracing::{debug}; +use tracing::{debug, warn}; use crate::app_state::AppState; use crate::database::Database; -use crate::storage::filesystem::FilesystemDriver; +use crate::storage::{StorageDriver, StorageDriverStreamer}; /// Starting an upload #[post("/")] @@ -27,30 +28,45 @@ pub async fn start_upload(path: web::Path<(String, )>) -> HttpResponse { } #[patch("/{uuid}")] -pub async fn chunked_upload_layer(body: Bytes, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data) -> HttpResponse { +pub async fn chunked_upload_layer(/* body: Bytes */mut payload: web::Payload, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data) -> HttpResponse { let full_uri = req.uri().to_string(); let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); - debug!("Read body of size: {}", body.len()); - let storage = state.storage.lock().await; - let current_size = storage.digest_length(&layer_uuid).await.unwrap(); - let (starting, ending) = if let Some(current_size) = current_size { - let body_size = body.len(); - storage.save_digest(&layer_uuid, &body, true).await.unwrap(); + let written_size = match storage.supports_streaming() { + true => { + let sender = storage.start_stream_channel(); + let mut written_size = 0; + while let Some(item) = payload.next().await { + if let Ok(bytes) = item { + written_size += bytes.len(); + sender.send((layer_uuid.clone(), bytes)).await.unwrap(); + } + } - (current_size, current_size + body_size) - } else { - let body_size = body.len(); + written_size + }, + false => { + warn!("This storage driver does not support streaming! This means high memory usage during image pushes!"); - storage.save_digest(&layer_uuid, &body, true).await.unwrap(); + let mut bytes = web::BytesMut::new(); + while let Some(item) = payload.next().await { + bytes.extend_from_slice(&item.unwrap()); + } - (0, body_size) + let bytes_len = bytes.len(); + storage.save_digest(&layer_uuid, &bytes.into(), true).await.unwrap(); + bytes_len + } }; - debug!("s={}, e={}, uuid={}, uri={}", starting, ending, layer_uuid, full_uri); + let (starting, ending) = if let Some(current_size) = current_size { + (current_size, current_size + written_size) + } else { + (0, written_size) + }; HttpResponse::Accepted() .insert_header(("Location", full_uri)) diff --git a/src/main.rs b/src/main.rs index f7994af..6f07fec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,18 +4,21 @@ mod database; mod dto; mod storage; +use std::sync::Arc; + use actix_web::{web, App, HttpServer}; use actix_web::middleware::Logger; +use bytes::Bytes; use sqlx::sqlite::SqlitePoolOptions; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc}; use tracing::{debug, Level}; use app_state::AppState; use database::Database; use crate::storage::StorageDriver; -use crate::storage::filesystem::FilesystemDriver; +use crate::storage::filesystem::{FilesystemDriver, FilesystemStreamer}; #[actix_web::main] async fn main() -> std::io::Result<()> { @@ -25,9 +28,18 @@ async fn main() -> std::io::Result<()> { pool.create_schema().await.unwrap(); - //let db_conn: Mutex = Mutex::new(SqliteConnection::establish("test.db").unwrap()); - //let db = Mutex::new(Database::new_sqlite_connection("test.db").unwrap()); - let storage_driver: Mutex> = Mutex::new(Box::new(FilesystemDriver::new("registry/blobs"))); + let storage_path = String::from("registry/blobs"); + let (send, recv) = mpsc::channel::<(String, Bytes)>(50); + let storage_driver: Mutex> = Mutex::new(Box::new(FilesystemDriver::new(storage_path.clone(), send))); + + // create the storage streamer + { + let path_clone = storage_path.clone(); + actix_rt::spawn(async { + let mut streamer = FilesystemStreamer::new(path_clone, recv); + streamer.start_handling_streams().await.unwrap(); + }); + } let state = web::Data::new(AppState::new(pool, storage_driver)); diff --git a/src/storage/filesystem.rs b/src/storage/filesystem.rs index 8a7083a..e554c8c 100644 --- a/src/storage/filesystem.rs +++ b/src/storage/filesystem.rs @@ -1,27 +1,138 @@ -use std::{path::Path, io::ErrorKind}; +use std::{path::Path, io::ErrorKind, sync::Arc, collections::HashMap}; use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; -use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking}; +use futures::{executor::block_on, StreamExt}; +use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking, sync::{Mutex, mpsc}}; use tracing::debug; -use super::StorageDriver; +use super::{StorageDriver, StorageDriverStreamer, Streamer}; + +pub struct FilesystemStreamer { + /* new_streams_channel: mpsc::Receiver<(String, mpsc::Receiver)>, + // (digest, receiver) + streaming_channels: Vec<(String, )>, */ + + storage_path: String, + chunk_channel: mpsc::Receiver<(String, Bytes)>, + cached_files: HashMap, +} + +impl FilesystemStreamer { + pub fn new(storage_path: String, chunk_channel: mpsc::Receiver<(String, Bytes)>) -> Self { + Self { + storage_path, + chunk_channel, + cached_files: HashMap::new(), + } + } + + pub async fn start_handling_streams(&mut self) -> anyhow::Result<()> { + while let Some((digest, mut bytes)) = self.chunk_channel.recv().await { + let mut temp; + let file = match self.cached_files.get_mut(&digest) { + Some(f) => f, + None => { + let path = format!("{}/{}", self.storage_path, digest); + temp = fs::OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(path).await?; + &mut temp + } + }; + + file.write_all(&mut bytes).await.unwrap(); + } + + Ok(()) + } +} + +impl Streamer for FilesystemStreamer { + fn start(&'static mut self) -> anyhow::Result<()> { + tokio::spawn(self.start_handling_streams()); + + Ok(()) + } +} pub struct FilesystemDriver { storage_path: String, + // (digest, receiver) + streaming_channels: Vec<(String, mpsc::Receiver)>, + streamer_sender: mpsc::Sender<(String, Bytes)>, } impl FilesystemDriver { - pub fn new(storage_path: &str) -> Self { + //pub fn new(storage_path: &str) -> FilesystemDriver { + pub fn new(storage_path: String, stream_sender: mpsc::Sender<(String, Bytes)>) -> FilesystemDriver { + /* let (send, recv) = mpsc::channel::<(String, Bytes)>(50); + let streamer = Arc::new(FilesystemStreamer::new(storage_path.to_string(), recv)); */ + Self { - storage_path: storage_path.to_string(), + storage_path, + streaming_channels: vec![], + streamer_sender: stream_sender, } } fn get_digest_path(&self, digest: &str) -> String { format!("{}/{}", self.storage_path, digest) } + + async fn write_payload(&self, digest: &str, mut payload: actix_web::web::Payload, append: bool) -> anyhow::Result { + let path = self.get_digest_path(digest); + let mut file = fs::OpenOptions::new() + .write(true) + .append(append) + .create(true) + .open(path).await?; + + let mut total_size = 0; + while let Some(item) = payload.next().await { + let item = item?; + + total_size += item.len(); + file.write_all(&item).await?; + } + + Ok(total_size) + } +} + +impl StorageDriverStreamer for FilesystemDriver { + fn write_payload(&self, digest: &str, payload: actix_web::web::Payload, append: bool) -> anyhow::Result { + Ok(tokio::runtime::Handle::current() + .block_on(self.write_payload(digest, payload, append))?) + + /* block_on(|| async { + let path = self.get_digest_path(digest); + let mut file = fs::OpenOptions::new() + .write(true) + .append(append) + .create(true) + .open(path).await?; + + file.write_all(&bytes).await?; + + Ok(()) + }); */ + } + + fn supports_streaming(&self) -> bool { + true + } + + fn start_streaming_thread(&self) -> anyhow::Result<()> { + todo!() + } + + fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)> { + self.streamer_sender.clone() + } } #[async_trait] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 4c8b604..8084366 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,15 +1,37 @@ pub mod filesystem; +use std::{pin::Pin, sync::Arc}; + use async_trait::async_trait; use bytes::Bytes; -use tokio::io::{AsyncWrite, AsyncRead}; +use futures::Stream; +use tokio::{io::{AsyncWrite, AsyncRead}, sync::{Mutex, mpsc}}; + +use actix_web::web; + +pub trait Streamer { + fn start(&'static mut self) -> anyhow::Result<()>; +} + +pub trait StorageDriverStreamer { + /// Write an actix-web payload into the StorageDriver + /// Returns the amount of bytes written + fn write_payload(&self, digest: &str, payload: actix_web::web::Payload, append: bool) -> anyhow::Result; + + fn supports_streaming(&self) -> bool; + fn start_streaming_thread(&self) -> anyhow::Result<()>; + fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>; +} #[async_trait] -pub trait StorageDriver: Send/* : AsyncWrite + AsyncRead */ { +pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ { async fn has_digest(&self, digest: &str) -> anyhow::Result; async fn get_digest(&self, digest: &str) -> anyhow::Result>; async fn digest_length(&self, digest: &str) -> anyhow::Result>; async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>; async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>; async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>; + + //async fn write_payload(&self, payload: Mutex) -> anyhow::Result<()>; + //async fn write_stream>(&self, stream: S) -> anyhow::Result<()>; } \ No newline at end of file