Implement a simpler method of request byte streaming to StorageProviders

This commit is contained in:
SeanOMik 2023-04-25 14:57:01 -04:00
parent 8206171216
commit dfb91a9cd8
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
10 changed files with 73 additions and 715 deletions

488
Cargo.lock generated
View File

@ -2,192 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix-codec"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-sink",
"log",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "actix-http"
version = "3.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c83abf9903e1f0ad9973cc4f7b9767fd5a03a583f51a5b7a339e07987cd2724"
dependencies = [
"actix-codec",
"actix-rt",
"actix-service",
"actix-utils",
"ahash",
"base64 0.13.1",
"bitflags",
"brotli",
"bytes",
"bytestring",
"derive_more",
"encoding_rs",
"flate2",
"futures-core",
"h2",
"http",
"httparse",
"httpdate",
"itoa",
"language-tags",
"local-channel",
"mime",
"percent-encoding",
"pin-project-lite",
"rand",
"sha1",
"smallvec",
"tracing",
"zstd",
]
[[package]]
name = "actix-macros"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "465a6172cf69b960917811022d8f29bc0b7fa1398bc4f78b3c466673db1213b6"
dependencies = [
"quote 1.0.26",
"syn 1.0.109",
]
[[package]]
name = "actix-router"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d66ff4d247d2b160861fa2866457e85706833527840e4133f8f49aa423a38799"
dependencies = [
"bytestring",
"http",
"regex",
"serde",
"tracing",
]
[[package]]
name = "actix-rt"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15265b6b8e2347670eb363c47fc8c75208b4a4994b27192f345fcbe707804f3e"
dependencies = [
"actix-macros",
"futures-core",
"tokio",
]
[[package]]
name = "actix-server"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0da34f8e659ea1b077bb4637948b815cd3768ad5a188fdcd74ff4d84240cd824"
dependencies = [
"actix-rt",
"actix-service",
"actix-utils",
"futures-core",
"futures-util",
"mio",
"num_cpus",
"socket2",
"tokio",
"tracing",
]
[[package]]
name = "actix-service"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a"
dependencies = [
"futures-core",
"paste",
"pin-project-lite",
]
[[package]]
name = "actix-utils"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8"
dependencies = [
"local-waker",
"pin-project-lite",
]
[[package]]
name = "actix-web"
version = "4.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d48f7b6534e06c7bfc72ee91db7917d4af6afe23e7d223b51e68fffbb21e96b9"
dependencies = [
"actix-codec",
"actix-http",
"actix-macros",
"actix-router",
"actix-rt",
"actix-server",
"actix-service",
"actix-utils",
"actix-web-codegen",
"ahash",
"bytes",
"bytestring",
"cfg-if",
"cookie",
"derive_more",
"encoding_rs",
"futures-core",
"futures-util",
"http",
"itoa",
"language-tags",
"log",
"mime",
"once_cell",
"pin-project-lite",
"regex",
"serde",
"serde_json",
"serde_urlencoded",
"smallvec",
"socket2",
"time 0.3.17",
"url",
]
[[package]]
name = "actix-web-codegen"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fa9362663c8643d67b2d5eafba49e4cb2c8a053a29ed00a0bea121f17c76b13"
dependencies = [
"actix-router",
"proc-macro2",
"quote 1.0.26",
"syn 1.0.109",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.7.6"
@ -208,21 +22,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
@ -394,27 +193,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "brotli"
version = "3.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
name = "brotli-decompressor"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]
[[package]]
name = "bumpalo"
version = "3.11.1"
@ -433,15 +211,6 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "bytestring"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7f83e57d9154148e355404702e2694463241880b939570d7c97c014da7a69a1"
dependencies = [
"bytes",
]
[[package]]
name = "case"
version = "0.1.0"
@ -453,9 +222,6 @@ name = "cc"
version = "1.0.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f"
dependencies = [
"jobserver",
]
[[package]]
name = "cfg-if"
@ -473,7 +239,7 @@ dependencies = [
"js-sys",
"num-integer",
"num-traits",
"time 0.1.45",
"time",
"wasm-bindgen",
"winapi",
]
@ -525,23 +291,6 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "cookie"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "344adc371239ef32293cb1c4fe519592fcf21206c79c02854320afcdf3ab4917"
dependencies = [
"percent-encoding",
"time 0.3.17",
"version_check",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
@ -572,15 +321,6 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
@ -665,19 +405,6 @@ dependencies = [
"syn 0.11.11",
]
[[package]]
name = "derive_more"
version = "0.99.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
"convert_case",
"proc-macro2",
"quote 1.0.26",
"rustc_version",
"syn 1.0.109",
]
[[package]]
name = "digest"
version = "0.9.0"
@ -702,8 +429,6 @@ dependencies = [
name = "docker-registry"
version = "0.1.0"
dependencies = [
"actix-rt",
"actix-web",
"anyhow",
"async-stream",
"async-trait",
@ -715,7 +440,6 @@ dependencies = [
"futures",
"jws",
"pin-project-lite",
"qstring",
"regex",
"serde",
"serde_json",
@ -743,31 +467,12 @@ version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
[[package]]
name = "encoding_rs"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "flate2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
@ -845,7 +550,7 @@ checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5"
dependencies = [
"futures-core",
"lock_api",
"parking_lot 0.11.2",
"parking_lot",
]
[[package]]
@ -916,25 +621,6 @@ dependencies = [
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "h2"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f9f29bc9dda355256b2916cf526ab02ce0aeaaaf2bad60d65ef3f12f11dd0f4"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -1117,15 +803,6 @@ version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "jobserver"
version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.60"
@ -1150,12 +827,6 @@ dependencies = [
"sha2 0.10.6",
]
[[package]]
name = "language-tags"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388"
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -1188,24 +859,6 @@ dependencies = [
"cc",
]
[[package]]
name = "local-channel"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f303ec0e94c6c54447f84f3b0ef7af769858a9c4ef56ef2a986d3dcd4c3fc9c"
dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"local-waker",
]
[[package]]
name = "local-waker"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e34f76eb3611940e0e7d53a9aaa4e6a3151f69541a282fd0dad5571420c53ff1"
[[package]]
name = "lock_api"
version = "0.4.9"
@ -1249,15 +902,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96590ba8f175222643a85693f33d26e9c8a015f599c216509b1a6894af675d34"
dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.8.5"
@ -1351,17 +995,7 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.4",
"parking_lot_core",
]
[[package]]
@ -1378,19 +1012,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "parking_lot_core"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]]
name = "paste"
version = "1.0.9"
@ -1480,15 +1101,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "qstring"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d464fae65fff2680baf48019211ce37aaec0c78e9264c84a3e484717f965104e"
dependencies = [
"percent-encoding",
]
[[package]]
name = "quote"
version = "0.3.15"
@ -1575,15 +1187,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "rustls"
version = "0.20.8"
@ -1639,12 +1242,6 @@ dependencies = [
"untrusted",
]
[[package]]
name = "semver"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4"
[[package]]
name = "serde"
version = "1.0.147"
@ -1697,17 +1294,6 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.5",
]
[[package]]
name = "sha2"
version = "0.9.9"
@ -1751,15 +1337,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.7"
@ -2017,33 +1594,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376"
dependencies = [
"itoa",
"serde",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd"
[[package]]
name = "time-macros"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2"
dependencies = [
"time-core",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -2071,9 +1621,7 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
@ -2123,7 +1671,6 @@ dependencies = [
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
@ -2526,32 +2073,3 @@ name = "windows_x86_64_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "5.0.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.1+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b"
dependencies = [
"cc",
"libc",
]

View File

@ -16,7 +16,6 @@ sqlx = { version = "0.6.3", features = [ "runtime-tokio-rustls", "sqlite" ] }
bytes = "1.4.0"
chrono = "0.4.23"
actix-web = "4"
tokio = { version = "1.21.2", features = [ "fs", "macros" ] }
tokio-util = { version = "0.7.7", features = [ "io" ] }
@ -30,12 +29,10 @@ regex = "1.7.1"
jws = "0.2.7"
async-trait = "0.1.68"
futures = "0.3.28"
qstring = "0.7.2"
sha256 = "1.1.2"
pin-project-lite = "0.2.9"
anyhow = "1.0.70"
async-stream = "0.3.5"
actix-rt = "2.8.0"
axum = "0.6.16"
axum-macros = "0.3.7"

View File

@ -8,10 +8,10 @@ use tokio_util::io::ReaderStream;
use crate::app_state::AppState;
pub async fn digest_exists_head(Path((name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
let storage = state.storage.lock().await;
if storage.has_digest(&layer_digest).unwrap() {
if storage.has_digest(&layer_digest).await.unwrap() {
if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() {
return (
StatusCode::OK,
@ -26,11 +26,11 @@ pub async fn digest_exists_head(Path((name, layer_digest)): Path<(String, String
StatusCode::NOT_FOUND.into_response()
}
pub async fn pull_digest_get(Path((name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
let storage = state.storage.lock().await;
if let Some(len) = storage.digest_length(&layer_digest).await.unwrap() {
let stream = storage.stream_bytes(&layer_digest).await.unwrap().unwrap();
let stream = storage.get_digest_stream(&layer_digest).await.unwrap().unwrap();
// convert the `AsyncRead` into a `Stream`
let stream = ReaderStream::new(stream.into_async_read());
@ -50,6 +50,6 @@ pub async fn pull_digest_get(Path((name, layer_digest)): Path<(String, String)>,
}
}
pub async fn delete_digest(state: State<Arc<AppState>>) -> impl IntoResponse {
pub async fn delete_digest(_state: State<Arc<AppState>>) -> impl IntoResponse {
todo!()
}

View File

@ -1,5 +1,5 @@
use axum::response::IntoResponse;
use axum::http::{StatusCode, header, HeaderName};
use axum::http::{StatusCode, HeaderName};
pub mod blobs;
pub mod uploads;

View File

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::io::ErrorKind;
use std::sync::Arc;
use axum::http::{StatusCode, header, HeaderName};
@ -10,6 +11,7 @@ use futures::StreamExt;
use tracing::{debug, warn};
use crate::app_state::AppState;
use crate::byte_stream::ByteStream;
/// Starting an upload
pub async fn start_upload_post(Path((name, )): Path<(String, )>) -> impl IntoResponse {
@ -31,19 +33,22 @@ pub async fn chunked_upload_layer_patch(Path((name, layer_uuid)): Path<(String,
let storage = state.storage.lock().await;
let current_size = storage.digest_length(&layer_uuid).await.unwrap();
let written_size = match /* storage.supports_streaming() */ false {
let written_size = match storage.supports_streaming().await {
true => {
// TODO: Make less bad
let sender = storage.start_stream_channel();
let mut written_size = 0;
while let Some(item) = body.next().await {
if let Ok(bytes) = item {
written_size += bytes.len();
sender.send((layer_uuid.clone(), bytes)).await.unwrap();
}
// ByteStream takes a stream of Item, io::Error so this stream needs to be converted to that
let io_stream = async_stream::stream! {
while let Some(bytes) = body.next().await {
yield match bytes {
Ok(b) => Ok(b),
Err(e) => Err(std::io::Error::new(ErrorKind::Other, e))
};
}
};
written_size
let byte_stream = ByteStream::new(io_stream);
let len = storage.save_digest_stream(&layer_uuid, byte_stream, true).await.unwrap();
len
},
false => {
warn!("This storage driver does not support streaming! This means high memory usage during image pushes!");
@ -100,7 +105,7 @@ pub async fn finish_chunked_upload_put(Path((name, layer_uuid)): Path<(String, S
)
}
pub async fn cancel_upload_delete(Path((name, layer_uuid)): Path<(String, String)>, state: State<Arc<AppState>>) -> impl IntoResponse {
pub async fn cancel_upload_delete(Path((_name, layer_uuid)): Path<(String, String)>, state: State<Arc<AppState>>) -> impl IntoResponse {
let storage = state.storage.lock().await;
storage.delete_digest(&layer_uuid).await.unwrap();

View File

@ -14,6 +14,7 @@ pin_project! {
}
}
#[allow(dead_code)]
impl ByteStream {
/// Create a new `ByteStream` by wrapping a `futures` stream.
pub fn new<S>(stream: S) -> ByteStream

View File

@ -1,7 +1,4 @@
use std::io::Read;
use async_trait::async_trait;
use bytes::Bytes;
use sqlx::{Sqlite, Pool};
use tracing::debug;
@ -16,16 +13,6 @@ pub trait Database {
/// Create the tables in the database
async fn create_schema(&self) -> sqlx::Result<()>;
/// Get the digest bytes
/* async fn get_digest(&self, digest: &str) -> sqlx::Result<Option<Bytes>>;
/// Get the length of the digest
async fn digest_length(&self, digest: &str) -> sqlx::Result<usize>;
/// Save digest bytes
async fn save_digest(&self, digest: &str, bytes: &Bytes) -> sqlx::Result<()>;
/// Delete digest
async fn delete_digest(&self, digest: &str) -> sqlx::Result<()>;
/// Replace the uuid with a digest
async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>; */
// Tag related functions
@ -71,72 +58,6 @@ impl Database for Pool<Sqlite> {
Ok(())
}
/* async fn get_digest(&self, digest: &str) -> sqlx::Result<Option<Bytes>> {
// Handle RowNotFound errors
let row: (Vec<u8>, ) = match sqlx::query_as("SELECT blob FROM layer_blobs WHERE digest = ?")
.bind(digest)
.fetch_one(self).await {
Ok(row) => row,
Err(e) => match e {
sqlx::Error::RowNotFound => {
return Ok(None)
},
_ => {
return Err(e);
}
}
};
let bytes = Bytes::from(row.0);
debug!("Got digest {}, {} bytes", digest, bytes.len());
Ok(Some(bytes))
}
async fn digest_length(&self, digest: &str) -> sqlx::Result<usize> {
let row: (i64, ) = sqlx::query_as("SELECT length(blob) FROM layer_blobs WHERE digest = ?")
.bind(digest)
.fetch_one(self).await?;
Ok(row.0 as usize)
}
async fn save_digest(&self, digest: &str, bytes: &Bytes) -> sqlx::Result<()> {
let bytes_len = bytes.len();
let bytes = bytes.bytes().map(|b| b.unwrap()).collect::<Vec<u8>>();
sqlx::query("INSERT INTO layer_blobs (digest, blob) VALUES (?, ?)")
.bind(digest)
.bind(bytes)
.execute(self).await?;
debug!("Saved digest {}, {} bytes", digest, bytes_len);
Ok(())
}
async fn delete_digest(&self, digest: &str) -> sqlx::Result<()> {
sqlx::query("DELETE FROM layer_blobs WHERE digest = ?")
.bind(digest)
.execute(self).await?;
debug!("Deleted digest {}", digest);
Ok(())
}
async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()> {
sqlx::query("UPDATE layer_blobs SET digest = ? WHERE digest = ?")
.bind(new_digest)
.bind(uuid)
.execute(self).await?;
debug!("Replaced digest uuid {} to digest {}", uuid, new_digest);
Ok(())
} */
async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()> {
sqlx::query("INSERT INTO manifest_layers(manifest, layer_digest) VALUES (?, ?)")
.bind(manifest_digest)

View File

@ -8,16 +8,12 @@ mod byte_stream;
use std::net::SocketAddr;
use std::sync::Arc;
use actix_web::{web, App, HttpServer};
use actix_web::middleware::Logger;
use axum::{Router, routing};
use axum::ServiceExt;
use tower_layer::Layer;
use bytes::Bytes;
use sqlx::sqlite::SqlitePoolOptions;
use tokio::sync::{Mutex, mpsc};
use tokio::sync::Mutex;
use tower_http::normalize_path::NormalizePathLayer;
use tracing::{debug, Level};
@ -25,7 +21,7 @@ use app_state::AppState;
use database::Database;
use crate::storage::StorageDriver;
use crate::storage::filesystem::{FilesystemDriver, FilesystemStreamer};
use crate::storage::filesystem::FilesystemDriver;
use tower_http::trace::TraceLayer;
@ -40,29 +36,14 @@ async fn main() -> std::io::Result<()> {
pool.create_schema().await.unwrap();
let storage_path = String::from("registry/blobs");
let (send, recv) = mpsc::channel::<(String, Bytes)>(50);
let storage_driver: Mutex<Box<dyn StorageDriver>> = Mutex::new(Box::new(FilesystemDriver::new(storage_path.clone(), send)));
let storage_driver: Mutex<Box<dyn StorageDriver>> = Mutex::new(Box::new(FilesystemDriver::new("registry/blobs")));
// create the storage streamer
/* {
let path_clone = storage_path.clone();
actix_rt::spawn(async {
let mut streamer = FilesystemStreamer::new(path_clone, recv);
streamer.start_handling_streams().await.unwrap();
});
} */
//let state = web::Data::new(AppState::new(pool, storage_driver));
let state = Arc::new(AppState::new(pool, storage_driver));
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();
// TODO: Make configurable by deployment
let payload_config = web::PayloadConfig::new(5 * 1024 * 1024 * 1024); // 5Gb
let app = NormalizePathLayer::trim_trailing_slash().layer(Router::new()
.nest("/v2", Router::new()
.route("/", routing::get(api::version_check))

View File

@ -1,73 +1,24 @@
use std::{path::Path, io::{ErrorKind, BufRead, BufReader, Read}, sync::Arc, collections::HashMap};
use std::{path::Path, io::ErrorKind};
use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{executor::block_on, StreamExt, Stream};
use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking, sync::{Mutex, mpsc}};
use futures::StreamExt;
use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}};
use tokio_util::io::ReaderStream;
use tracing::debug;
use crate::byte_stream::ByteStream;
use super::{StorageDriver, StorageDriverStreamer, Streamer};
pub struct FilesystemStreamer {
storage_path: String,
chunk_channel: mpsc::Receiver<(String, Bytes)>,
cached_files: HashMap<String, fs::File>,
}
impl FilesystemStreamer {
pub fn new(storage_path: String, chunk_channel: mpsc::Receiver<(String, Bytes)>) -> Self {
Self {
storage_path,
chunk_channel,
cached_files: HashMap::new(),
}
}
pub async fn start_handling_streams(&mut self) -> anyhow::Result<()> {
while let Some((digest, mut bytes)) = self.chunk_channel.recv().await {
let mut temp;
let file = match self.cached_files.get_mut(&digest) {
Some(f) => f,
None => {
let path = format!("{}/{}", self.storage_path, digest);
temp = fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(path).await?;
&mut temp
}
};
file.write_all(&mut bytes).await.unwrap();
}
Ok(())
}
}
impl Streamer for FilesystemStreamer {
fn start(&'static mut self) -> anyhow::Result<()> {
tokio::spawn(self.start_handling_streams());
Ok(())
}
}
use super::StorageDriver;
pub struct FilesystemDriver {
storage_path: String,
streamer_sender: mpsc::Sender<(String, Bytes)>,
}
impl FilesystemDriver {
pub fn new(storage_path: String, stream_sender: mpsc::Sender<(String, Bytes)>) -> FilesystemDriver {
pub fn new(storage_path: &str) -> FilesystemDriver {
Self {
storage_path,
streamer_sender: stream_sender,
storage_path: storage_path.to_string(),
}
}
@ -76,39 +27,36 @@ impl FilesystemDriver {
}
}
impl StorageDriverStreamer for FilesystemDriver {
fn supports_streaming(&self) -> bool {
#[async_trait]
impl StorageDriver for FilesystemDriver {
async fn supports_streaming(&self) -> bool {
true
}
fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)> {
self.streamer_sender.clone()
}
fn has_digest(&self, digest: &str) -> anyhow::Result<bool> {
async fn has_digest(&self, digest: &str) -> anyhow::Result<bool> {
let path = self.get_digest_path(digest);
Ok(Path::new(&path).exists())
}
async fn save_digest_stream(&self, digest: &str, mut stream: ByteStream, append: bool) -> anyhow::Result<usize> {
let path = self.get_digest_path(digest);
let mut file = fs::OpenOptions::new()
.write(true)
.append(append)
.create(true)
.open(path).await?;
let mut len = 0;
while let Some(bytes) = stream.next().await {
let bytes = bytes?;
len += bytes.len();
file.write_all(&bytes).await.unwrap();
}
#[async_trait]
impl StorageDriver for FilesystemDriver {
async fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>> {
let file = match fs::File::open(self.get_digest_path(digest)).await {
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
return Ok(None)
},
_ => {
return Err(e).context("FilesystemDriver: Failure to open digest file");
}
}
};
let s = ReaderStream::new(file);
Ok(Some(ByteStream::new(s)))
Ok(len)
}
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>> {
@ -134,17 +82,20 @@ impl StorageDriver for FilesystemDriver {
}
async fn get_digest_stream(&self, digest: &str) -> anyhow::Result<Option<ByteStream>> {
let path = self.get_digest_path(digest);
/* if self.has_digest(digest) {
let s = async_stream::stream! {
let file = match fs::File::open(self.get_digest_path(digest)).await {
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
return Ok(None)
},
_ => {
return Err(e).context("FilesystemDriver: Failure to open digest file");
}
}
};
} else {
Ok(None)
} */
todo!()
let s = ReaderStream::new(file);
Ok(Some(ByteStream::new(s)))
}
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>> {

View File

@ -1,36 +1,20 @@
pub mod filesystem;
use std::{pin::Pin, sync::Arc};
use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use tokio::{io::{AsyncWrite, AsyncRead}, sync::{Mutex, mpsc}};
use actix_web::web;
use crate::byte_stream::ByteStream;
pub trait Streamer {
fn start(&'static mut self) -> anyhow::Result<()>;
}
pub trait StorageDriverStreamer {
fn supports_streaming(&self) -> bool;
fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>;
fn has_digest(&self, digest: &str) -> anyhow::Result<bool>;
//fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
}
#[async_trait]
pub trait StorageDriver: Send + Sync + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ {
pub trait StorageDriver: Send + Sync {
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>>;
async fn get_digest_stream(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>>;
async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>;
async fn save_digest_stream(&self, digest: &str, stream: ByteStream, append: bool) -> anyhow::Result<usize>;
async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>;
async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>;
async fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
//async fn stream_bytes(&self, stream: Box<dyn Stream<Item = Bytes>>) -> anyhow::Result<()>;
async fn supports_streaming(&self) -> bool;
async fn has_digest(&self, digest: &str) -> anyhow::Result<bool>;
}