Implement streaming bytes during image pushes

This commit is contained in:
SeanOMik 2023-04-20 18:05:20 -04:00
parent 2d9b4d33d8
commit 6df253c030
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
6 changed files with 230 additions and 30 deletions

41
Cargo.lock generated
View File

@ -81,10 +81,11 @@ dependencies = [
[[package]] [[package]]
name = "actix-rt" name = "actix-rt"
version = "2.7.0" version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ea16c295198e958ef31930a6ef37d0fb64e9ca3b6116e6b93a8bdae96ee1000" checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e"
dependencies = [ dependencies = [
"actix-macros",
"futures-core", "futures-core",
"tokio", "tokio",
] ]
@ -237,6 +238,28 @@ version = "1.0.70"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote 1.0.26",
"syn 2.0.15",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.68" version = "0.1.68"
@ -618,8 +641,10 @@ dependencies = [
name = "docker-registry" name = "docker-registry"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix-rt",
"actix-web", "actix-web",
"anyhow", "anyhow",
"async-stream",
"async-trait", "async-trait",
"bytes", "bytes",
"chrono", "chrono",
@ -1917,9 +1942,21 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros",
"winapi", "winapi",
] ]
[[package]]
name = "tokio-macros"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
dependencies = [
"proc-macro2",
"quote 1.0.26",
"syn 1.0.109",
]
[[package]] [[package]]
name = "tokio-rustls" name = "tokio-rustls"
version = "0.23.4" version = "0.23.4"

View File

@ -17,7 +17,7 @@ bytes = "1.4.0"
chrono = "0.4.23" chrono = "0.4.23"
actix-web = "4" actix-web = "4"
tokio = { version = "1.21.2", features = [ "fs" ] } tokio = { version = "1.21.2", features = [ "fs", "macros" ] }
clap = { version = "4.0.23", features = [ "derive" ] } clap = { version = "4.0.23", features = [ "derive" ] }
@ -33,3 +33,5 @@ qstring = "0.7.2"
sha256 = "1.1.2" sha256 = "1.1.2"
pin-project-lite = "0.2.9" pin-project-lite = "0.2.9"
anyhow = "1.0.70" anyhow = "1.0.70"
async-stream = "0.3.5"
actix-rt = "2.8.0"

View File

@ -1,13 +1,14 @@
use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get}; use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get};
use bytes::{BytesMut, Bytes, BufMut}; use bytes::{BytesMut, Bytes, BufMut};
use futures::{StreamExt, TryStreamExt};
use qstring::QString; use qstring::QString;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tracing::{debug}; use tracing::{debug, warn};
use crate::app_state::AppState; use crate::app_state::AppState;
use crate::database::Database; use crate::database::Database;
use crate::storage::filesystem::FilesystemDriver; use crate::storage::{StorageDriver, StorageDriverStreamer};
/// Starting an upload /// Starting an upload
#[post("/")] #[post("/")]
@ -27,30 +28,45 @@ pub async fn start_upload(path: web::Path<(String, )>) -> HttpResponse {
} }
#[patch("/{uuid}")] #[patch("/{uuid}")]
pub async fn chunked_upload_layer(body: Bytes, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse { pub async fn chunked_upload_layer(/* body: Bytes */mut payload: web::Payload, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let full_uri = req.uri().to_string(); let full_uri = req.uri().to_string();
let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned());
debug!("Read body of size: {}", body.len());
let storage = state.storage.lock().await; let storage = state.storage.lock().await;
let current_size = storage.digest_length(&layer_uuid).await.unwrap(); let current_size = storage.digest_length(&layer_uuid).await.unwrap();
let (starting, ending) = if let Some(current_size) = current_size {
let body_size = body.len();
storage.save_digest(&layer_uuid, &body, true).await.unwrap(); let written_size = match storage.supports_streaming() {
true => {
let sender = storage.start_stream_channel();
let mut written_size = 0;
while let Some(item) = payload.next().await {
if let Ok(bytes) = item {
written_size += bytes.len();
sender.send((layer_uuid.clone(), bytes)).await.unwrap();
}
}
(current_size, current_size + body_size) written_size
} else { },
let body_size = body.len(); false => {
warn!("This storage driver does not support streaming! This means high memory usage during image pushes!");
storage.save_digest(&layer_uuid, &body, true).await.unwrap(); let mut bytes = web::BytesMut::new();
while let Some(item) = payload.next().await {
bytes.extend_from_slice(&item.unwrap());
}
(0, body_size) let bytes_len = bytes.len();
storage.save_digest(&layer_uuid, &bytes.into(), true).await.unwrap();
bytes_len
}
}; };
debug!("s={}, e={}, uuid={}, uri={}", starting, ending, layer_uuid, full_uri); let (starting, ending) = if let Some(current_size) = current_size {
(current_size, current_size + written_size)
} else {
(0, written_size)
};
HttpResponse::Accepted() HttpResponse::Accepted()
.insert_header(("Location", full_uri)) .insert_header(("Location", full_uri))

View File

@ -4,18 +4,21 @@ mod database;
mod dto; mod dto;
mod storage; mod storage;
use std::sync::Arc;
use actix_web::{web, App, HttpServer}; use actix_web::{web, App, HttpServer};
use actix_web::middleware::Logger; use actix_web::middleware::Logger;
use bytes::Bytes;
use sqlx::sqlite::SqlitePoolOptions; use sqlx::sqlite::SqlitePoolOptions;
use tokio::sync::Mutex; use tokio::sync::{Mutex, mpsc};
use tracing::{debug, Level}; use tracing::{debug, Level};
use app_state::AppState; use app_state::AppState;
use database::Database; use database::Database;
use crate::storage::StorageDriver; use crate::storage::StorageDriver;
use crate::storage::filesystem::FilesystemDriver; use crate::storage::filesystem::{FilesystemDriver, FilesystemStreamer};
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
@ -25,9 +28,18 @@ async fn main() -> std::io::Result<()> {
pool.create_schema().await.unwrap(); pool.create_schema().await.unwrap();
//let db_conn: Mutex<dyn Database> = Mutex::new(SqliteConnection::establish("test.db").unwrap()); let storage_path = String::from("registry/blobs");
//let db = Mutex::new(Database::new_sqlite_connection("test.db").unwrap()); let (send, recv) = mpsc::channel::<(String, Bytes)>(50);
let storage_driver: Mutex<Box<dyn StorageDriver>> = Mutex::new(Box::new(FilesystemDriver::new("registry/blobs"))); let storage_driver: Mutex<Box<dyn StorageDriver>> = Mutex::new(Box::new(FilesystemDriver::new(storage_path.clone(), send)));
// create the storage streamer
{
let path_clone = storage_path.clone();
actix_rt::spawn(async {
let mut streamer = FilesystemStreamer::new(path_clone, recv);
streamer.start_handling_streams().await.unwrap();
});
}
let state = web::Data::new(AppState::new(pool, storage_driver)); let state = web::Data::new(AppState::new(pool, storage_driver));

View File

@ -1,27 +1,138 @@
use std::{path::Path, io::ErrorKind}; use std::{path::Path, io::ErrorKind, sync::Arc, collections::HashMap};
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking}; use futures::{executor::block_on, StreamExt};
use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking, sync::{Mutex, mpsc}};
use tracing::debug; use tracing::debug;
use super::StorageDriver; use super::{StorageDriver, StorageDriverStreamer, Streamer};
pub struct FilesystemStreamer {
/* new_streams_channel: mpsc::Receiver<(String, mpsc::Receiver<Bytes>)>,
// (digest, receiver)
streaming_channels: Vec<(String, )>, */
storage_path: String,
chunk_channel: mpsc::Receiver<(String, Bytes)>,
cached_files: HashMap<String, fs::File>,
}
impl FilesystemStreamer {
pub fn new(storage_path: String, chunk_channel: mpsc::Receiver<(String, Bytes)>) -> Self {
Self {
storage_path,
chunk_channel,
cached_files: HashMap::new(),
}
}
pub async fn start_handling_streams(&mut self) -> anyhow::Result<()> {
while let Some((digest, mut bytes)) = self.chunk_channel.recv().await {
let mut temp;
let file = match self.cached_files.get_mut(&digest) {
Some(f) => f,
None => {
let path = format!("{}/{}", self.storage_path, digest);
temp = fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(path).await?;
&mut temp
}
};
file.write_all(&mut bytes).await.unwrap();
}
Ok(())
}
}
impl Streamer for FilesystemStreamer {
fn start(&'static mut self) -> anyhow::Result<()> {
tokio::spawn(self.start_handling_streams());
Ok(())
}
}
pub struct FilesystemDriver { pub struct FilesystemDriver {
storage_path: String, storage_path: String,
// (digest, receiver)
streaming_channels: Vec<(String, mpsc::Receiver<Bytes>)>,
streamer_sender: mpsc::Sender<(String, Bytes)>,
} }
impl FilesystemDriver { impl FilesystemDriver {
pub fn new(storage_path: &str) -> Self { //pub fn new(storage_path: &str) -> FilesystemDriver {
pub fn new(storage_path: String, stream_sender: mpsc::Sender<(String, Bytes)>) -> FilesystemDriver {
/* let (send, recv) = mpsc::channel::<(String, Bytes)>(50);
let streamer = Arc::new(FilesystemStreamer::new(storage_path.to_string(), recv)); */
Self { Self {
storage_path: storage_path.to_string(), storage_path,
streaming_channels: vec![],
streamer_sender: stream_sender,
} }
} }
fn get_digest_path(&self, digest: &str) -> String { fn get_digest_path(&self, digest: &str) -> String {
format!("{}/{}", self.storage_path, digest) format!("{}/{}", self.storage_path, digest)
} }
async fn write_payload(&self, digest: &str, mut payload: actix_web::web::Payload, append: bool) -> anyhow::Result<usize> {
let path = self.get_digest_path(digest);
let mut file = fs::OpenOptions::new()
.write(true)
.append(append)
.create(true)
.open(path).await?;
let mut total_size = 0;
while let Some(item) = payload.next().await {
let item = item?;
total_size += item.len();
file.write_all(&item).await?;
}
Ok(total_size)
}
}
impl StorageDriverStreamer for FilesystemDriver {
fn write_payload(&self, digest: &str, payload: actix_web::web::Payload, append: bool) -> anyhow::Result<usize> {
Ok(tokio::runtime::Handle::current()
.block_on(self.write_payload(digest, payload, append))?)
/* block_on(|| async {
let path = self.get_digest_path(digest);
let mut file = fs::OpenOptions::new()
.write(true)
.append(append)
.create(true)
.open(path).await?;
file.write_all(&bytes).await?;
Ok(())
}); */
}
fn supports_streaming(&self) -> bool {
true
}
fn start_streaming_thread(&self) -> anyhow::Result<()> {
todo!()
}
fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)> {
self.streamer_sender.clone()
}
} }
#[async_trait] #[async_trait]

View File

@ -1,15 +1,37 @@
pub mod filesystem; pub mod filesystem;
use std::{pin::Pin, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use tokio::io::{AsyncWrite, AsyncRead}; use futures::Stream;
use tokio::{io::{AsyncWrite, AsyncRead}, sync::{Mutex, mpsc}};
use actix_web::web;
pub trait Streamer {
fn start(&'static mut self) -> anyhow::Result<()>;
}
pub trait StorageDriverStreamer {
/// Write an actix-web payload into the StorageDriver
/// Returns the amount of bytes written
fn write_payload(&self, digest: &str, payload: actix_web::web::Payload, append: bool) -> anyhow::Result<usize>;
fn supports_streaming(&self) -> bool;
fn start_streaming_thread(&self) -> anyhow::Result<()>;
fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>;
}
#[async_trait] #[async_trait]
pub trait StorageDriver: Send/* : AsyncWrite + AsyncRead */ { pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ {
async fn has_digest(&self, digest: &str) -> anyhow::Result<bool>; async fn has_digest(&self, digest: &str) -> anyhow::Result<bool>;
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>>; async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>>;
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>>; async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>>;
async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>; async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>;
async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>; async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>;
async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>; async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>;
//async fn write_payload(&self, payload: Mutex<web::Payload>) -> anyhow::Result<()>;
//async fn write_stream<T, S: Stream<Item = T>>(&self, stream: S) -> anyhow::Result<()>;
} }