From dfb91a9cd890042d788d26976f955138afb31728 Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Tue, 25 Apr 2023 14:57:01 -0400 Subject: [PATCH] Implement a simpler method of request byte streaming to StorageProviders --- Cargo.lock | 488 +------------------------------------- Cargo.toml | 3 - src/api/blobs.rs | 10 +- src/api/mod.rs | 2 +- src/api/uploads.rs | 27 ++- src/byte_stream.rs | 1 + src/database/mod.rs | 79 ------ src/main.rs | 25 +- src/storage/filesystem.rs | 129 ++++------ src/storage/mod.rs | 24 +- 10 files changed, 73 insertions(+), 715 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 686e060..308e503 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,192 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "actix-codec" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe" -dependencies = [ - "bitflags", - "bytes", - "futures-core", - "futures-sink", - "log", - "memchr", - "pin-project-lite", - "tokio", - "tokio-util", -] - -[[package]] -name = "actix-http" -version = "3.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c83abf9903e1f0ad9973cc4f7b9767fd5a03a583f51a5b7a339e07987cd2724" -dependencies = [ - "actix-codec", - "actix-rt", - "actix-service", - "actix-utils", - "ahash", - "base64 0.13.1", - "bitflags", - "brotli", - "bytes", - "bytestring", - "derive_more", - "encoding_rs", - "flate2", - "futures-core", - "h2", - "http", - "httparse", - "httpdate", - "itoa", - "language-tags", - "local-channel", - "mime", - "percent-encoding", - "pin-project-lite", - "rand", - "sha1", - "smallvec", - "tracing", - "zstd", -] - -[[package]] -name = "actix-macros" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465a6172cf69b960917811022d8f29bc0b7fa1398bc4f78b3c466673db1213b6" -dependencies = [ - "quote 1.0.26", - "syn 1.0.109", -] - -[[package]] -name = "actix-router" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d66ff4d247d2b160861fa2866457e85706833527840e4133f8f49aa423a38799" -dependencies = [ - "bytestring", - "http", - "regex", - "serde", - "tracing", -] - -[[package]] -name = "actix-rt" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e" -dependencies = [ - "actix-macros", - "futures-core", - "tokio", -] - -[[package]] -name = "actix-server" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0da34f8e659ea1b077bb4637948b815cd3768ad5a188fdcd74ff4d84240cd824" -dependencies = [ - "actix-rt", - "actix-service", - "actix-utils", - "futures-core", - "futures-util", - "mio", - "num_cpus", - "socket2", - "tokio", - "tracing", -] - -[[package]] -name = "actix-service" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" -dependencies = [ - "futures-core", - "paste", - "pin-project-lite", -] - -[[package]] -name = "actix-utils" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" -dependencies = [ - "local-waker", - "pin-project-lite", -] - -[[package]] -name = "actix-web" -version = "4.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d48f7b6534e06c7bfc72ee91db7917d4af6afe23e7d223b51e68fffbb21e96b9" -dependencies = [ - "actix-codec", - "actix-http", - "actix-macros", - "actix-router", - "actix-rt", - "actix-server", - "actix-service", - "actix-utils", - "actix-web-codegen", - "ahash", - "bytes", - "bytestring", - "cfg-if", - "cookie", - "derive_more", - "encoding_rs", - "futures-core", - "futures-util", - "http", - "itoa", - "language-tags", - "log", - "mime", - "once_cell", - "pin-project-lite", - "regex", - "serde", - "serde_json", - "serde_urlencoded", - "smallvec", - "socket2", - "time 0.3.17", - "url", -] - -[[package]] -name = "actix-web-codegen" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fa9362663c8643d67b2d5eafba49e4cb2c8a053a29ed00a0bea121f17c76b13" -dependencies = [ - "actix-router", - "proc-macro2", - "quote 1.0.26", - "syn 1.0.109", -] - -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - [[package]] name = "ahash" version = "0.7.6" @@ -208,21 +22,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "alloc-no-stdlib" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" - -[[package]] -name = "alloc-stdlib" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" -dependencies = [ - "alloc-no-stdlib", -] - [[package]] name = "android_system_properties" version = "0.1.5" @@ -394,27 +193,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "brotli" -version = "3.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", - "brotli-decompressor", -] - -[[package]] -name = "brotli-decompressor" -version = "2.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", -] - [[package]] name = "bumpalo" version = "3.11.1" @@ -433,15 +211,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" -[[package]] -name = "bytestring" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7f83e57d9154148e355404702e2694463241880b939570d7c97c014da7a69a1" -dependencies = [ - "bytes", -] - [[package]] name = "case" version = "0.1.0" @@ -453,9 +222,6 @@ name = "cc" version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f" -dependencies = [ - "jobserver", -] [[package]] name = "cfg-if" @@ -473,7 +239,7 @@ dependencies = [ "js-sys", "num-integer", "num-traits", - "time 0.1.45", + "time", "wasm-bindgen", "winapi", ] @@ -525,23 +291,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "convert_case" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" - -[[package]] -name = "cookie" -version = "0.16.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "344adc371239ef32293cb1c4fe519592fcf21206c79c02854320afcdf3ab4917" -dependencies = [ - "percent-encoding", - "time 0.3.17", - "version_check", -] - [[package]] name = "core-foundation-sys" version = "0.8.3" @@ -572,15 +321,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" -[[package]] -name = "crc32fast" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" -dependencies = [ - "cfg-if", -] - [[package]] name = "crossbeam-queue" version = "0.3.8" @@ -665,19 +405,6 @@ dependencies = [ "syn 0.11.11", ] -[[package]] -name = "derive_more" -version = "0.99.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" -dependencies = [ - "convert_case", - "proc-macro2", - "quote 1.0.26", - "rustc_version", - "syn 1.0.109", -] - [[package]] name = "digest" version = "0.9.0" @@ -702,8 +429,6 @@ dependencies = [ name = "docker-registry" version = "0.1.0" dependencies = [ - "actix-rt", - "actix-web", "anyhow", "async-stream", "async-trait", @@ -715,7 +440,6 @@ dependencies = [ "futures", "jws", "pin-project-lite", - "qstring", "regex", "serde", "serde_json", @@ -743,31 +467,12 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" -[[package]] -name = "encoding_rs" -version = "0.8.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" -dependencies = [ - "cfg-if", -] - [[package]] name = "event-listener" version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "flate2" -version = "1.0.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "flume" version = "0.10.14" @@ -845,7 +550,7 @@ checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" dependencies = [ "futures-core", "lock_api", - "parking_lot 0.11.2", + "parking_lot", ] [[package]] @@ -916,25 +621,6 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] -[[package]] -name = "h2" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1117,15 +803,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" -[[package]] -name = "jobserver" -version = "0.1.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b" -dependencies = [ - "libc", -] - [[package]] name = "js-sys" version = "0.3.60" @@ -1150,12 +827,6 @@ dependencies = [ "sha2 0.10.6", ] -[[package]] -name = "language-tags" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" - [[package]] name = "lazy_static" version = "1.4.0" @@ -1188,24 +859,6 @@ dependencies = [ "cc", ] -[[package]] -name = "local-channel" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f303ec0e94c6c54447f84f3b0ef7af769858a9c4ef56ef2a986d3dcd4c3fc9c" -dependencies = [ - "futures-core", - "futures-sink", - "futures-util", - "local-waker", -] - -[[package]] -name = "local-waker" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e34f76eb3611940e0e7d53a9aaa4e6a3151f69541a282fd0dad5571420c53ff1" - [[package]] name = "lock_api" version = "0.4.9" @@ -1249,15 +902,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96590ba8f175222643a85693f33d26e9c8a015f599c216509b1a6894af675d34" -dependencies = [ - "adler", -] - [[package]] name = "mio" version = "0.8.5" @@ -1351,17 +995,7 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", "lock_api", - "parking_lot_core 0.8.6", -] - -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core 0.9.4", + "parking_lot_core", ] [[package]] @@ -1378,19 +1012,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "parking_lot_core" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-sys", -] - [[package]] name = "paste" version = "1.0.9" @@ -1480,15 +1101,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "qstring" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d464fae65fff2680baf48019211ce37aaec0c78e9264c84a3e484717f965104e" -dependencies = [ - "percent-encoding", -] - [[package]] name = "quote" version = "0.3.15" @@ -1575,15 +1187,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "rustc_version" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" -dependencies = [ - "semver", -] - [[package]] name = "rustls" version = "0.20.8" @@ -1639,12 +1242,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "semver" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" - [[package]] name = "serde" version = "1.0.147" @@ -1697,17 +1294,6 @@ dependencies = [ "serde", ] -[[package]] -name = "sha1" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.5", -] - [[package]] name = "sha2" version = "0.9.9" @@ -1751,15 +1337,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "signal-hook-registry" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" -dependencies = [ - "libc", -] - [[package]] name = "slab" version = "0.4.7" @@ -2017,33 +1594,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "time" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" -dependencies = [ - "itoa", - "serde", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" - -[[package]] -name = "time-macros" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" -dependencies = [ - "time-core", -] - [[package]] name = "tinyvec" version = "1.6.0" @@ -2071,9 +1621,7 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot 0.12.1", "pin-project-lite", - "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", @@ -2123,7 +1671,6 @@ dependencies = [ "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -2526,32 +2073,3 @@ name = "windows_x86_64_msvc" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" - -[[package]] -name = "zstd" -version = "0.11.2+zstd.1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" -dependencies = [ - "zstd-safe", -] - -[[package]] -name = "zstd-safe" -version = "5.0.2+zstd.1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" -dependencies = [ - "libc", - "zstd-sys", -] - -[[package]] -name = "zstd-sys" -version = "2.0.1+zstd.1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" -dependencies = [ - "cc", - "libc", -] diff --git a/Cargo.toml b/Cargo.toml index 47dc993..fa12c12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ sqlx = { version = "0.6.3", features = [ "runtime-tokio-rustls", "sqlite" ] } bytes = "1.4.0" chrono = "0.4.23" -actix-web = "4" tokio = { version = "1.21.2", features = [ "fs", "macros" ] } tokio-util = { version = "0.7.7", features = [ "io" ] } @@ -30,12 +29,10 @@ regex = "1.7.1" jws = "0.2.7" async-trait = "0.1.68" futures = "0.3.28" -qstring = "0.7.2" sha256 = "1.1.2" pin-project-lite = "0.2.9" anyhow = "1.0.70" async-stream = "0.3.5" -actix-rt = "2.8.0" axum = "0.6.16" axum-macros = "0.3.7" diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 6b9c1b0..4997cc7 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -8,10 +8,10 @@ use tokio_util::io::ReaderStream; use crate::app_state::AppState; -pub async fn digest_exists_head(Path((name, layer_digest)): Path<(String, String)>, state: State>) -> Response { +pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, String)>, state: State>) -> Response { let storage = state.storage.lock().await; - if storage.has_digest(&layer_digest).unwrap() { + if storage.has_digest(&layer_digest).await.unwrap() { if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() { return ( StatusCode::OK, @@ -26,11 +26,11 @@ pub async fn digest_exists_head(Path((name, layer_digest)): Path<(String, String StatusCode::NOT_FOUND.into_response() } -pub async fn pull_digest_get(Path((name, layer_digest)): Path<(String, String)>, state: State>) -> Response { +pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, state: State>) -> Response { let storage = state.storage.lock().await; if let Some(len) = storage.digest_length(&layer_digest).await.unwrap() { - let stream = storage.stream_bytes(&layer_digest).await.unwrap().unwrap(); + let stream = storage.get_digest_stream(&layer_digest).await.unwrap().unwrap(); // convert the `AsyncRead` into a `Stream` let stream = ReaderStream::new(stream.into_async_read()); @@ -50,6 +50,6 @@ pub async fn pull_digest_get(Path((name, layer_digest)): Path<(String, String)>, } } -pub async fn delete_digest(state: State>) -> impl IntoResponse { +pub async fn delete_digest(_state: State>) -> impl IntoResponse { todo!() } \ No newline at end of file diff --git a/src/api/mod.rs b/src/api/mod.rs index 83a3332..50ead62 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,5 +1,5 @@ use axum::response::IntoResponse; -use axum::http::{StatusCode, header, HeaderName}; +use axum::http::{StatusCode, HeaderName}; pub mod blobs; pub mod uploads; diff --git a/src/api/uploads.rs b/src/api/uploads.rs index 602fb15..89fbbae 100644 --- a/src/api/uploads.rs +++ b/src/api/uploads.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::io::ErrorKind; use std::sync::Arc; use axum::http::{StatusCode, header, HeaderName}; @@ -10,6 +11,7 @@ use futures::StreamExt; use tracing::{debug, warn}; use crate::app_state::AppState; +use crate::byte_stream::ByteStream; /// Starting an upload pub async fn start_upload_post(Path((name, )): Path<(String, )>) -> impl IntoResponse { @@ -31,19 +33,22 @@ pub async fn chunked_upload_layer_patch(Path((name, layer_uuid)): Path<(String, let storage = state.storage.lock().await; let current_size = storage.digest_length(&layer_uuid).await.unwrap(); - let written_size = match /* storage.supports_streaming() */ false { + let written_size = match storage.supports_streaming().await { true => { - // TODO: Make less bad - let sender = storage.start_stream_channel(); - let mut written_size = 0; - while let Some(item) = body.next().await { - if let Ok(bytes) = item { - written_size += bytes.len(); - sender.send((layer_uuid.clone(), bytes)).await.unwrap(); + // ByteStream takes a stream of Item, io::Error so this stream needs to be converted to that + let io_stream = async_stream::stream! { + while let Some(bytes) = body.next().await { + yield match bytes { + Ok(b) => Ok(b), + Err(e) => Err(std::io::Error::new(ErrorKind::Other, e)) + }; } - } + }; - written_size + let byte_stream = ByteStream::new(io_stream); + let len = storage.save_digest_stream(&layer_uuid, byte_stream, true).await.unwrap(); + + len }, false => { warn!("This storage driver does not support streaming! This means high memory usage during image pushes!"); @@ -100,7 +105,7 @@ pub async fn finish_chunked_upload_put(Path((name, layer_uuid)): Path<(String, S ) } -pub async fn cancel_upload_delete(Path((name, layer_uuid)): Path<(String, String)>, state: State>) -> impl IntoResponse { +pub async fn cancel_upload_delete(Path((_name, layer_uuid)): Path<(String, String)>, state: State>) -> impl IntoResponse { let storage = state.storage.lock().await; storage.delete_digest(&layer_uuid).await.unwrap(); diff --git a/src/byte_stream.rs b/src/byte_stream.rs index 323dd1e..4313d3a 100644 --- a/src/byte_stream.rs +++ b/src/byte_stream.rs @@ -14,6 +14,7 @@ pin_project! { } } +#[allow(dead_code)] impl ByteStream { /// Create a new `ByteStream` by wrapping a `futures` stream. pub fn new(stream: S) -> ByteStream diff --git a/src/database/mod.rs b/src/database/mod.rs index 17caa48..ae53743 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,7 +1,4 @@ -use std::io::Read; - use async_trait::async_trait; -use bytes::Bytes; use sqlx::{Sqlite, Pool}; use tracing::debug; @@ -16,16 +13,6 @@ pub trait Database { /// Create the tables in the database async fn create_schema(&self) -> sqlx::Result<()>; - /// Get the digest bytes - /* async fn get_digest(&self, digest: &str) -> sqlx::Result>; - /// Get the length of the digest - async fn digest_length(&self, digest: &str) -> sqlx::Result; - /// Save digest bytes - async fn save_digest(&self, digest: &str, bytes: &Bytes) -> sqlx::Result<()>; - /// Delete digest - async fn delete_digest(&self, digest: &str) -> sqlx::Result<()>; - /// Replace the uuid with a digest - async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>; */ // Tag related functions @@ -71,72 +58,6 @@ impl Database for Pool { Ok(()) } - /* async fn get_digest(&self, digest: &str) -> sqlx::Result> { - // Handle RowNotFound errors - let row: (Vec, ) = match sqlx::query_as("SELECT blob FROM layer_blobs WHERE digest = ?") - .bind(digest) - .fetch_one(self).await { - Ok(row) => row, - Err(e) => match e { - sqlx::Error::RowNotFound => { - return Ok(None) - }, - _ => { - return Err(e); - } - } - }; - - let bytes = Bytes::from(row.0); - - debug!("Got digest {}, {} bytes", digest, bytes.len()); - - Ok(Some(bytes)) - } - - async fn digest_length(&self, digest: &str) -> sqlx::Result { - let row: (i64, ) = sqlx::query_as("SELECT length(blob) FROM layer_blobs WHERE digest = ?") - .bind(digest) - .fetch_one(self).await?; - - Ok(row.0 as usize) - } - - async fn save_digest(&self, digest: &str, bytes: &Bytes) -> sqlx::Result<()> { - let bytes_len = bytes.len(); - let bytes = bytes.bytes().map(|b| b.unwrap()).collect::>(); - - sqlx::query("INSERT INTO layer_blobs (digest, blob) VALUES (?, ?)") - .bind(digest) - .bind(bytes) - .execute(self).await?; - - debug!("Saved digest {}, {} bytes", digest, bytes_len); - - Ok(()) - } - - async fn delete_digest(&self, digest: &str) -> sqlx::Result<()> { - sqlx::query("DELETE FROM layer_blobs WHERE digest = ?") - .bind(digest) - .execute(self).await?; - - debug!("Deleted digest {}", digest); - - Ok(()) - } - - async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()> { - sqlx::query("UPDATE layer_blobs SET digest = ? WHERE digest = ?") - .bind(new_digest) - .bind(uuid) - .execute(self).await?; - - debug!("Replaced digest uuid {} to digest {}", uuid, new_digest); - - Ok(()) - } */ - async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()> { sqlx::query("INSERT INTO manifest_layers(manifest, layer_digest) VALUES (?, ?)") .bind(manifest_digest) diff --git a/src/main.rs b/src/main.rs index d081b06..635938f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,16 +8,12 @@ mod byte_stream; use std::net::SocketAddr; use std::sync::Arc; -use actix_web::{web, App, HttpServer}; -use actix_web::middleware::Logger; - use axum::{Router, routing}; use axum::ServiceExt; use tower_layer::Layer; -use bytes::Bytes; use sqlx::sqlite::SqlitePoolOptions; -use tokio::sync::{Mutex, mpsc}; +use tokio::sync::Mutex; use tower_http::normalize_path::NormalizePathLayer; use tracing::{debug, Level}; @@ -25,7 +21,7 @@ use app_state::AppState; use database::Database; use crate::storage::StorageDriver; -use crate::storage::filesystem::{FilesystemDriver, FilesystemStreamer}; +use crate::storage::filesystem::FilesystemDriver; use tower_http::trace::TraceLayer; @@ -40,29 +36,14 @@ async fn main() -> std::io::Result<()> { pool.create_schema().await.unwrap(); - let storage_path = String::from("registry/blobs"); - let (send, recv) = mpsc::channel::<(String, Bytes)>(50); - let storage_driver: Mutex> = Mutex::new(Box::new(FilesystemDriver::new(storage_path.clone(), send))); + let storage_driver: Mutex> = Mutex::new(Box::new(FilesystemDriver::new("registry/blobs"))); - // create the storage streamer - /* { - let path_clone = storage_path.clone(); - actix_rt::spawn(async { - let mut streamer = FilesystemStreamer::new(path_clone, recv); - streamer.start_handling_streams().await.unwrap(); - }); - } */ - - //let state = web::Data::new(AppState::new(pool, storage_driver)); let state = Arc::new(AppState::new(pool, storage_driver)); tracing_subscriber::fmt() .with_max_level(Level::DEBUG) .init(); - // TODO: Make configurable by deployment - let payload_config = web::PayloadConfig::new(5 * 1024 * 1024 * 1024); // 5Gb - let app = NormalizePathLayer::trim_trailing_slash().layer(Router::new() .nest("/v2", Router::new() .route("/", routing::get(api::version_check)) diff --git a/src/storage/filesystem.rs b/src/storage/filesystem.rs index 49ad72d..920762b 100644 --- a/src/storage/filesystem.rs +++ b/src/storage/filesystem.rs @@ -1,73 +1,24 @@ -use std::{path::Path, io::{ErrorKind, BufRead, BufReader, Read}, sync::Arc, collections::HashMap}; +use std::{path::Path, io::ErrorKind}; use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; -use futures::{executor::block_on, StreamExt, Stream}; -use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking, sync::{Mutex, mpsc}}; +use futures::StreamExt; +use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}}; use tokio_util::io::ReaderStream; -use tracing::debug; use crate::byte_stream::ByteStream; -use super::{StorageDriver, StorageDriverStreamer, Streamer}; - -pub struct FilesystemStreamer { - storage_path: String, - chunk_channel: mpsc::Receiver<(String, Bytes)>, - cached_files: HashMap, -} - -impl FilesystemStreamer { - pub fn new(storage_path: String, chunk_channel: mpsc::Receiver<(String, Bytes)>) -> Self { - Self { - storage_path, - chunk_channel, - cached_files: HashMap::new(), - } - } - - pub async fn start_handling_streams(&mut self) -> anyhow::Result<()> { - while let Some((digest, mut bytes)) = self.chunk_channel.recv().await { - let mut temp; - let file = match self.cached_files.get_mut(&digest) { - Some(f) => f, - None => { - let path = format!("{}/{}", self.storage_path, digest); - temp = fs::OpenOptions::new() - .write(true) - .append(true) - .create(true) - .open(path).await?; - &mut temp - } - }; - - file.write_all(&mut bytes).await.unwrap(); - } - - Ok(()) - } -} - -impl Streamer for FilesystemStreamer { - fn start(&'static mut self) -> anyhow::Result<()> { - tokio::spawn(self.start_handling_streams()); - - Ok(()) - } -} +use super::StorageDriver; pub struct FilesystemDriver { storage_path: String, - streamer_sender: mpsc::Sender<(String, Bytes)>, } impl FilesystemDriver { - pub fn new(storage_path: String, stream_sender: mpsc::Sender<(String, Bytes)>) -> FilesystemDriver { + pub fn new(storage_path: &str) -> FilesystemDriver { Self { - storage_path, - streamer_sender: stream_sender, + storage_path: storage_path.to_string(), } } @@ -76,39 +27,36 @@ impl FilesystemDriver { } } -impl StorageDriverStreamer for FilesystemDriver { - fn supports_streaming(&self) -> bool { +#[async_trait] +impl StorageDriver for FilesystemDriver { + async fn supports_streaming(&self) -> bool { true } - fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)> { - self.streamer_sender.clone() - } - - fn has_digest(&self, digest: &str) -> anyhow::Result { + async fn has_digest(&self, digest: &str) -> anyhow::Result { let path = self.get_digest_path(digest); Ok(Path::new(&path).exists()) } -} -#[async_trait] -impl StorageDriver for FilesystemDriver { - async fn stream_bytes(&self, digest: &str) -> anyhow::Result> { - let file = match fs::File::open(self.get_digest_path(digest)).await { - Ok(f) => f, - Err(e) => match e.kind() { - ErrorKind::NotFound => { - return Ok(None) - }, - _ => { - return Err(e).context("FilesystemDriver: Failure to open digest file"); - } - } - }; + async fn save_digest_stream(&self, digest: &str, mut stream: ByteStream, append: bool) -> anyhow::Result { + let path = self.get_digest_path(digest); + let mut file = fs::OpenOptions::new() + .write(true) + .append(append) + .create(true) + .open(path).await?; - let s = ReaderStream::new(file); - Ok(Some(ByteStream::new(s))) + let mut len = 0; + while let Some(bytes) = stream.next().await { + let bytes = bytes?; + + len += bytes.len(); + + file.write_all(&bytes).await.unwrap(); + } + + Ok(len) } async fn get_digest(&self, digest: &str) -> anyhow::Result> { @@ -134,17 +82,20 @@ impl StorageDriver for FilesystemDriver { } async fn get_digest_stream(&self, digest: &str) -> anyhow::Result> { - let path = self.get_digest_path(digest); + let file = match fs::File::open(self.get_digest_path(digest)).await { + Ok(f) => f, + Err(e) => match e.kind() { + ErrorKind::NotFound => { + return Ok(None) + }, + _ => { + return Err(e).context("FilesystemDriver: Failure to open digest file"); + } + } + }; - /* if self.has_digest(digest) { - let s = async_stream::stream! { - - }; - } else { - Ok(None) - } */ - - todo!() + let s = ReaderStream::new(file); + Ok(Some(ByteStream::new(s))) } async fn digest_length(&self, digest: &str) -> anyhow::Result> { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 324bf58..973d8da 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,36 +1,20 @@ pub mod filesystem; -use std::{pin::Pin, sync::Arc}; - use async_trait::async_trait; use bytes::Bytes; -use futures::Stream; -use tokio::{io::{AsyncWrite, AsyncRead}, sync::{Mutex, mpsc}}; - -use actix_web::web; use crate::byte_stream::ByteStream; -pub trait Streamer { - fn start(&'static mut self) -> anyhow::Result<()>; -} - -pub trait StorageDriverStreamer { - fn supports_streaming(&self) -> bool; - fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>; - fn has_digest(&self, digest: &str) -> anyhow::Result; - //fn stream_bytes(&self, digest: &str) -> anyhow::Result>; -} - #[async_trait] -pub trait StorageDriver: Send + Sync + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ { +pub trait StorageDriver: Send + Sync { async fn get_digest(&self, digest: &str) -> anyhow::Result>; async fn get_digest_stream(&self, digest: &str) -> anyhow::Result>; async fn digest_length(&self, digest: &str) -> anyhow::Result>; async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>; + async fn save_digest_stream(&self, digest: &str, stream: ByteStream, append: bool) -> anyhow::Result; async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>; async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>; - async fn stream_bytes(&self, digest: &str) -> anyhow::Result>; - //async fn stream_bytes(&self, stream: Box>) -> anyhow::Result<()>; + async fn supports_streaming(&self) -> bool; + async fn has_digest(&self, digest: &str) -> anyhow::Result; } \ No newline at end of file