From 8206171216251ceb76b1dc61f5de97ee353305e6 Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Tue, 25 Apr 2023 14:21:31 -0400 Subject: [PATCH] Switch to using axum so bytes can be streamed --- Cargo.lock | 203 ++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 5 ++ src/api/blobs.rs | 60 +++++++------ src/api/catalog.rs | 44 ++++++---- src/api/manifests.rs | 99 ++++++++++----------- src/api/mod.rs | 15 ++-- src/api/tags.rs | 49 ++++++----- src/api/uploads.rs | 106 +++++++++++----------- src/main.rs | 99 ++++++++++----------- src/storage/mod.rs | 2 +- 10 files changed, 446 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 572802d..686e060 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,67 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "113713495a32dd0ab52baf5c10044725aa3aec00b31beda84218e469029b72a3" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-macros" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bb524613be645939e280b7279f7b017f98cf7f5ef084ec374df373530e73277" +dependencies = [ + "heck", + "proc-macro2", + "quote 1.0.26", + "syn 2.0.15", +] + [[package]] name = "base64" version = "0.13.1" @@ -646,6 +707,8 @@ dependencies = [ "anyhow", "async-stream", "async-trait", + "axum", + "axum-macros", "bytes", "chrono", "clap", @@ -660,6 +723,8 @@ dependencies = [ "sqlx", "tokio", "tokio-util", + "tower-http", + "tower-layer", "tracing", "tracing-log", "tracing-subscriber", @@ -923,15 +988,32 @@ dependencies = [ [[package]] name = "http" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes", "fnv", "itoa", ] +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.8.0" @@ -944,6 +1026,29 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "hyper" +version = "0.14.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "iana-time-zone" version = "0.1.53" @@ -1008,9 +1113,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.4" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" [[package]] name = "jobserver" @@ -1120,6 +1225,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "memchr" version = "2.5.0" @@ -1494,6 +1605,12 @@ dependencies = [ "base64 0.21.0", ] +[[package]] +name = "rustversion" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" + [[package]] name = "ryu" version = "1.0.11" @@ -1559,6 +1676,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1826,6 +1952,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "synom" version = "0.11.3" @@ -1994,6 +2126,53 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.37" @@ -2053,6 +2232,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" + [[package]] name = "typenum" version = "1.15.0" @@ -2149,6 +2334,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index ab4b278..47dc993 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,8 @@ pin-project-lite = "0.2.9" anyhow = "1.0.70" async-stream = "0.3.5" actix-rt = "2.8.0" +axum = "0.6.16" +axum-macros = "0.3.7" + +tower-http = { version = "0.4.0", features = [ "trace", "normalize-path" ] } +tower-layer = { version = "0.3.2" } diff --git a/src/api/blobs.rs b/src/api/blobs.rs index e18188c..6b9c1b0 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -1,51 +1,55 @@ -use actix_web::{HttpResponse, get, HttpRequest, web, head, delete}; -use futures::StreamExt; +use std::sync::Arc; + +use axum::body::StreamBody; +use axum::extract::{State, Path}; +use axum::http::{StatusCode, header, HeaderName}; +use axum::response::{IntoResponse, Response}; +use tokio_util::io::ReaderStream; use crate::app_state::AppState; -use crate::database::Database; -use crate::storage::filesystem::FilesystemDriver; - -#[head("/{digest}")] -pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { - let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn digest_exists_head(Path((name, layer_digest)): Path<(String, String)>, state: State>) -> Response { let storage = state.storage.lock().await; if storage.has_digest(&layer_digest).unwrap() { if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() { - return HttpResponse::Ok() - .insert_header(("Content-Length", size)) - .insert_header(("Docker-Content-Digest", layer_digest)) - .finish(); + return ( + StatusCode::OK, + [ + (header::CONTENT_LENGTH, size.to_string()), + (HeaderName::from_static("docker-content-digest"), layer_digest) + ] + ).into_response(); } } - HttpResponse::NotFound() - .finish() + StatusCode::NOT_FOUND.into_response() } -#[get("/{digest}")] -pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { - let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn pull_digest_get(Path((name, layer_digest)): Path<(String, String)>, state: State>) -> Response { let storage = state.storage.lock().await; - if let Some(len) = storage.digest_length(&layer_digest).await.unwrap() { let stream = storage.stream_bytes(&layer_digest).await.unwrap().unwrap(); - HttpResponse::Ok() - .insert_header(("Content-Length", len)) - .insert_header(("Docker-Content-Digest", layer_digest)) - .streaming(stream) + // convert the `AsyncRead` into a `Stream` + let stream = ReaderStream::new(stream.into_async_read()); + // convert the `Stream` into an `axum::body::HttpBody` + let body = StreamBody::new(stream); + + ( + StatusCode::OK, + [ + (header::CONTENT_LENGTH, len.to_string()), + (HeaderName::from_static("docker-content-digest"), layer_digest) + ], + body + ).into_response() } else { - HttpResponse::NotFound() - .finish() + StatusCode::NOT_FOUND.into_response() } } -#[delete("/{digest}")] -pub async fn delete_digest(_req: HttpRequest, _state: web::Data) -> HttpResponse { +pub async fn delete_digest(state: State>) -> impl IntoResponse { todo!() } \ No newline at end of file diff --git a/src/api/catalog.rs b/src/api/catalog.rs index b8fd1cc..6905195 100644 --- a/src/api/catalog.rs +++ b/src/api/catalog.rs @@ -1,5 +1,6 @@ -use actix_web::{HttpResponse, web, get, HttpRequest}; -use qstring::QString; +use std::sync::Arc; + +use axum::{extract::{State, Query}, http::{StatusCode, header, HeaderMap, HeaderName}, response::IntoResponse}; use serde::{Serialize, Deserialize}; use crate::{app_state::AppState, database::Database}; @@ -10,30 +11,32 @@ pub struct RepositoryList { repositories: Vec, } -#[get("")] -pub async fn list_repositories(req: HttpRequest, state: web::Data) -> HttpResponse { - // Get limit and last tag from query params if present. - let qs = QString::from(req.query_string()); - let limit = qs.get("n"); - let last_repo = qs.get("last"); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListRepositoriesParams { + #[serde(rename = "n")] + limit: Option, + #[serde(rename = "last")] + last_repo: Option, +} + +pub async fn list_repositories(Query(params): Query, state: State>) -> impl IntoResponse { let mut link_header = None; // Paginate tag results if n was specified, else just pull everything. let database = &state.database; - let repositories = match limit { + let repositories = match params.limit { Some(limit) => { - let limit: u32 = limit.parse().unwrap(); // Convert the last param to a String, and list all the repos - let last_repo = last_repo.and_then(|t| Some(t.to_string())); + let last_repo = params.last_repo.and_then(|t| Some(t.to_string())); let repos = database.list_repositories(Some(limit), last_repo).await.unwrap(); // Get the new last repository for the response let last_repo = repos.last().and_then(|s| Some(s.clone())); // Construct the link header - let url = req.uri().to_string(); + let url = crate::REGISTRY_URL; let mut url = format!("<{}/v2/_catalog?n={}", url, limit); if let Some(last_repo) = last_repo { url += &format!("&limit={}", last_repo); @@ -54,13 +57,18 @@ pub async fn list_repositories(req: HttpRequest, state: web::Data) -> }; let response_body = serde_json::to_string(&repo_list).unwrap(); - // Construct the response, optionally adding the Link header if it was constructed. - let mut resp = HttpResponse::Ok(); - resp.append_header(("Content-Type", "application/json")); + let mut headers = HeaderMap::new(); + headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + headers.insert(HeaderName::from_static("docker-distribution-api-version"), "registry/2.0".parse().unwrap()); if let Some(link_header) = link_header { - resp.append_header(("Link", link_header)); + headers.insert(header::LINK, link_header.parse().unwrap()); } - - resp.body(response_body) + + // Construct the response, optionally adding the Link header if it was constructed. + ( + StatusCode::OK, + headers, + response_body + ) } \ No newline at end of file diff --git a/src/api/manifests.rs b/src/api/manifests.rs index b4f337e..9338b8e 100644 --- a/src/api/manifests.rs +++ b/src/api/manifests.rs @@ -1,4 +1,8 @@ -use actix_web::{HttpResponse, web, put, get, head, delete, HttpRequest}; +use std::sync::Arc; + +use axum::extract::{Path, State}; +use axum::response::{Response, IntoResponse}; +use axum::http::{StatusCode, HeaderMap, HeaderName, header}; use tracing::log::warn; use tracing::{debug, info}; @@ -8,10 +12,7 @@ use crate::database::Database; use crate::dto::digest::Digest; use crate::dto::manifest::Manifest; -#[put("/{reference}")] -pub async fn upload_manifest(path: web::Path<(String, String)>, body: String, state: web::Data) -> HttpResponse { - let (name, reference) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn upload_manifest_put(Path((name, reference)): Path<(String, String)>, state: State>, body: String) -> Response { // Calculate the sha256 digest for the manifest. let calculated_hash = sha256::digest(body.clone()); let calculated_digest = format!("sha256:{}", calculated_hash); @@ -40,23 +41,20 @@ pub async fn upload_manifest(path: web::Path<(String, String)>, body: String, st debug!("Linked manifest {} to layer {}", calculated_digest, image.config.digest); } - HttpResponse::Created() - .append_header(("Docker-Content-Digest", calculated_digest)) - .finish() + ( + StatusCode::CREATED, + [ (HeaderName::from_static("docker-content-digest"), calculated_digest) ] + ).into_response() }, Manifest::List(_list) => { warn!("ManifestList request was received!"); - HttpResponse::NotImplemented() - .finish() + StatusCode::NOT_IMPLEMENTED.into_response() } } } -#[get("/{reference}")] -pub async fn pull_manifest(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { - let (name, reference) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn pull_manifest_get(Path((name, reference)): Path<(String, String)>, state: State>) -> Response { let database = &state.database; let digest = match Digest::is_digest(&reference) { true => reference.clone(), @@ -65,8 +63,7 @@ pub async fn pull_manifest(path: web::Path<(String, String)>, state: web::Data, state: web::Data, state: web::Data) -> HttpResponse { - let (name, reference) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn manifest_exists_head(Path((name, reference)): Path<(String, String)>, state: State>) -> Response { // Get the digest from the reference path. let database = &state.database; let digest = match Digest::is_digest(&reference) { @@ -101,8 +99,7 @@ pub async fn manifest_exists(path: web::Path<(String, String)>, state: web::Data if let Some(tag) = database.get_tag(&name, &reference).await.unwrap() { tag.manifest_digest } else { - return HttpResponse::NotFound() - .finish(); + return StatusCode::NOT_FOUND.into_response(); } } }; @@ -111,33 +108,31 @@ pub async fn manifest_exists(path: web::Path<(String, String)>, state: web::Data if manifest_content.is_none() { // The digest that was provided in the request was invalid. // NOTE: This could also mean that there's a bug and the tag pointed to an invalid manifest. - return HttpResponse::NotFound() - .finish(); + return StatusCode::NOT_FOUND.into_response(); } let manifest_content = manifest_content.unwrap(); - HttpResponse::Ok() - .append_header(("Docker-Content-Digest", digest)) - .append_header(("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")) - .append_header(("Content-Length", manifest_content.len())) - .append_header(("Docker-Distribution-API-Version", "registry/2.0")) - .body(manifest_content) + ( + StatusCode::OK, + [ + (HeaderName::from_static("docker-content-digest"), digest), + (header::CONTENT_TYPE, "application/vnd.docker.distribution.manifest.v2+json".to_string()), + (header::CONTENT_LENGTH, manifest_content.len().to_string()), + (HeaderName::from_static("docker-distribution-api-version"), "registry/2.0".to_string()), + ], + manifest_content + ).into_response() } -#[delete("/{reference}")] -pub async fn delete_manifest(path: web::Path<(String, String)>, req: HttpRequest, state: web::Data) -> HttpResponse { - let (name, reference) = (path.0.to_owned(), path.1.to_owned()); - - let headers = req.headers(); - let _authorization = headers.get("Authorization").unwrap(); // TODO: +pub async fn delete_manifest(Path((name, reference)): Path<(String, String)>, headers: HeaderMap, state: State>) -> Response { + let _authorization = headers.get("Authorization").unwrap(); // TODO: use authorization header let database = &state.database; let digest = match Digest::is_digest(&reference) { true => { // Check if the manifest exists if database.get_manifest(&name, &reference).await.unwrap().is_none() { - return HttpResponse::NotFound() - .finish(); + return StatusCode::NOT_FOUND.into_response(); } reference.clone() @@ -146,15 +141,17 @@ pub async fn delete_manifest(path: web::Path<(String, String)>, req: HttpRequest if let Some(tag) = database.get_tag(&name, &reference).await.unwrap() { tag.manifest_digest } else { - return HttpResponse::NotFound() - .finish(); + return StatusCode::NOT_FOUND.into_response(); } } }; database.delete_manifest(&name, &digest).await.unwrap(); - HttpResponse::Accepted() - .append_header(("Content-Length", "None")) - .finish() + ( + StatusCode::ACCEPTED, + [ + (header::CONTENT_LENGTH, "None"), + ], + ).into_response() } \ No newline at end of file diff --git a/src/api/mod.rs b/src/api/mod.rs index 3ce2296..83a3332 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,16 +1,17 @@ +use axum::response::IntoResponse; +use axum::http::{StatusCode, header, HeaderName}; + pub mod blobs; pub mod uploads; pub mod manifests; pub mod tags; pub mod catalog; -use actix_web::{HttpResponse, get}; - /// https://docs.docker.com/registry/spec/api/#api-version-check /// full endpoint: `/v2/` -#[get("/")] -pub async fn version_check() -> HttpResponse { - HttpResponse::Ok() - .insert_header(("Docker-Distribution-API-Version", "registry/2.0")) - .finish() +pub async fn version_check() -> impl IntoResponse { + ( + StatusCode::OK, + [( HeaderName::from_static("docker-distribution-api-version"), "registry/2.0" )] + ) } \ No newline at end of file diff --git a/src/api/tags.rs b/src/api/tags.rs index ab28f0a..a4dfff5 100644 --- a/src/api/tags.rs +++ b/src/api/tags.rs @@ -1,5 +1,6 @@ -use actix_web::{HttpResponse, web, get, HttpRequest}; -use qstring::QString; +use std::sync::Arc; + +use axum::{extract::{Path, Query, State}, response::IntoResponse, http::{StatusCode, header, HeaderMap, HeaderName}}; use serde::{Serialize, Deserialize}; use crate::{app_state::AppState, database::Database}; @@ -11,32 +12,32 @@ pub struct TagList { tags: Vec, } -#[get("/list")] -pub async fn list_tags(path: web::Path<(String, )>, req: HttpRequest, state: web::Data) -> HttpResponse { - let name = path.0.to_owned(); - - // Get limit and last tag from query params if present. - let qs = QString::from(req.query_string()); - let limit = qs.get("n"); - let last_tag = qs.get("last"); +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListRepositoriesParams { + #[serde(rename = "n")] + limit: Option, + #[serde(rename = "last")] + last_tag: Option, +} + +pub async fn list_tags(Path((name, )): Path<(String, )>, Query(params): Query, state: State>) -> impl IntoResponse { let mut link_header = None; // Paginate tag results if n was specified, else just pull everything. let database = &state.database; - let tags = match limit { + let tags = match params.limit { Some(limit) => { - let limit: u32 = limit.parse().unwrap(); // Convert the last param to a String, and list all the tags - let last_tag = last_tag.and_then(|t| Some(t.to_string())); + let last_tag = params.last_tag.and_then(|t| Some(t.to_string())); let tags = database.list_repository_tags_page(&name, limit, last_tag).await.unwrap(); // Get the new last repository for the response let last_tag = tags.last(); // Construct the link header - let url = req.uri().to_string(); + let url = crate::REGISTRY_URL; let mut url = format!("<{}/v2/{}/tags/list?n={}", url, name, limit); if let Some(last_tag) = last_tag { url += &format!("&limit={}", last_tag.name); @@ -57,14 +58,20 @@ pub async fn list_tags(path: web::Path<(String, )>, req: HttpRequest, state: web tags: tags.into_iter().map(|t| t.name).collect(), }; let response_body = serde_json::to_string(&tag_list).unwrap(); - - // Construct the response, optionally adding the Link header if it was constructed. - let mut resp = HttpResponse::Ok(); - resp.append_header(("Content-Type", "application/json")); + // Create headers + let mut headers = HeaderMap::new(); + headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + headers.insert(HeaderName::from_static("docker-distribution-api-version"), "registry/2.0".parse().unwrap()); + + // Add the link header if it was constructed if let Some(link_header) = link_header { - resp.append_header(("Link", link_header)); + headers.insert(header::LINK, link_header.parse().unwrap()); } - - resp.body(response_body) + + ( + StatusCode::OK, + headers, + response_body + ) } \ No newline at end of file diff --git a/src/api/uploads.rs b/src/api/uploads.rs index 97c4b30..602fb15 100644 --- a/src/api/uploads.rs +++ b/src/api/uploads.rs @@ -1,20 +1,19 @@ -use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get}; -use bytes::{BytesMut, Bytes, BufMut}; -use futures::{StreamExt, TryStreamExt}; -use qstring::QString; -use tokio::io::AsyncWriteExt; +use std::collections::HashMap; +use std::sync::Arc; + +use axum::http::{StatusCode, header, HeaderName}; +use axum::extract::{Path, BodyStream, State, Query}; +use axum::response::{IntoResponse, Response}; + +use bytes::{Bytes, BytesMut}; +use futures::StreamExt; use tracing::{debug, warn}; use crate::app_state::AppState; -use crate::database::Database; -use crate::storage::{StorageDriver, StorageDriverStreamer}; - /// Starting an upload -#[post("/")] -pub async fn start_upload(path: web::Path<(String, )>) -> HttpResponse { +pub async fn start_upload_post(Path((name, )): Path<(String, )>) -> impl IntoResponse { debug!("Upload starting"); - let name = path.0.to_owned(); let uuid = uuid::Uuid::new_v4(); debug!("Requesting upload of image {}, generated uuid: {}", name, uuid); @@ -22,24 +21,22 @@ pub async fn start_upload(path: web::Path<(String, )>) -> HttpResponse { let location = format!("/v2/{}/blobs/uploads/{}", name, uuid.to_string()); debug!("Constructed upload url: {}", location); - HttpResponse::Accepted() - .insert_header(("Location", location)) - .finish() + ( + StatusCode::ACCEPTED, + [ (header::LOCATION, location) ] + ) } -#[patch("/{uuid}")] -pub async fn chunked_upload_layer(mut payload: web::Payload, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data) -> HttpResponse { - let full_uri = req.uri().to_string(); - let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn chunked_upload_layer_patch(Path((name, layer_uuid)): Path<(String, String)>, state: State>, mut body: BodyStream) -> Response { let storage = state.storage.lock().await; let current_size = storage.digest_length(&layer_uuid).await.unwrap(); - let written_size = match storage.supports_streaming() { + let written_size = match /* storage.supports_streaming() */ false { true => { + // TODO: Make less bad let sender = storage.start_stream_channel(); let mut written_size = 0; - while let Some(item) = payload.next().await { + while let Some(item) = body.next().await { if let Ok(bytes) = item { written_size += bytes.len(); sender.send((layer_uuid.clone(), bytes)).await.unwrap(); @@ -51,8 +48,8 @@ pub async fn chunked_upload_layer(mut payload: web::Payload, path: web::Path<(St false => { warn!("This storage driver does not support streaming! This means high memory usage during image pushes!"); - let mut bytes = web::BytesMut::new(); - while let Some(item) = payload.next().await { + let mut bytes = BytesMut::new(); + while let Some(item) = body.next().await { bytes.extend_from_slice(&item.unwrap()); } @@ -68,20 +65,20 @@ pub async fn chunked_upload_layer(mut payload: web::Payload, path: web::Path<(St (0, written_size) }; - HttpResponse::Accepted() - .insert_header(("Location", full_uri)) - .insert_header(("Range", format!("{}-{}", starting, ending))) - .insert_header(("Content-Length", 0)) - .insert_header(("Docker-Upload-UUID", layer_uuid)) - .finish() + let full_uri = format!("{}/v2/{}/blobs/uploads/{}", crate::REGISTRY_URL, name, layer_uuid); + ( + StatusCode::ACCEPTED, + [ + (header::LOCATION, full_uri), + (header::RANGE, format!("{}-{}", starting, ending)), + (header::CONTENT_LENGTH, "0".to_string()), + (HeaderName::from_static("docker-upload-uuid"), layer_uuid) + ] + ).into_response() } -#[put("/{uuid}")] -pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data) -> HttpResponse { - let (name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); - - let qs = QString::from(req.query_string()); - let digest = qs.get("digest").unwrap(); +pub async fn finish_chunked_upload_put(Path((name, layer_uuid)): Path<(String, String)>, Query(query): Query>, state: State>, body: Bytes) -> impl IntoResponse { + let digest = query.get("digest").unwrap(); let storage = state.storage.lock().await; if !body.is_empty() { @@ -93,35 +90,34 @@ pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String) storage.replace_digest(&layer_uuid, &digest).await.unwrap(); debug!("Completed upload, finished uuid {} to digest {}", layer_uuid, digest); - HttpResponse::Created() - .insert_header(("Location", format!("/v2/{}/blobs/{}", name, digest))) - .insert_header(("Content-Length", 0)) - .insert_header(("Docker-Upload-Digest", digest)) - .finish() + ( + StatusCode::CREATED, + [ + (header::LOCATION, format!("/v2/{}/blobs/{}", name, digest)), + (header::CONTENT_LENGTH, "0".to_string()), + (HeaderName::from_static("docker-upload-digest"), digest.to_owned()) + ] + ) } -#[delete("/{uuid}")] -pub async fn cancel_upload(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { - let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn cancel_upload_delete(Path((name, layer_uuid)): Path<(String, String)>, state: State>) -> impl IntoResponse { let storage = state.storage.lock().await; storage.delete_digest(&layer_uuid).await.unwrap(); // I'm not sure what this response should be, its not specified in the registry spec. - HttpResponse::Ok() - .finish() + StatusCode::OK } -#[get("/{uuid}")] -pub async fn check_upload_status(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { - let (name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); - +pub async fn check_upload_status_get(Path((name, layer_uuid)): Path<(String, String)>, state: State>) -> impl IntoResponse { let storage = state.storage.lock().await; let ending = storage.digest_length(&layer_uuid).await.unwrap().unwrap_or(0); - HttpResponse::Created() - .insert_header(("Location", format!("/v2/{}/blobs/uploads/{}", name, layer_uuid))) - .insert_header(("Range", format!("0-{}", ending))) - .insert_header(("Docker-Upload-Digest", layer_uuid)) - .finish() + ( + StatusCode::CREATED, + [ + (header::LOCATION, format!("/v2/{}/blobs/uploads/{}", name, layer_uuid)), + (header::RANGE, format!("0-{}", ending)), + (HeaderName::from_static("docker-upload-digest"), layer_uuid) + ] + ) } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 30a2ca1..d081b06 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,14 +5,20 @@ mod dto; mod storage; mod byte_stream; +use std::net::SocketAddr; use std::sync::Arc; use actix_web::{web, App, HttpServer}; use actix_web::middleware::Logger; +use axum::{Router, routing}; +use axum::ServiceExt; +use tower_layer::Layer; + use bytes::Bytes; use sqlx::sqlite::SqlitePoolOptions; use tokio::sync::{Mutex, mpsc}; +use tower_http::normalize_path::NormalizePathLayer; use tracing::{debug, Level}; use app_state::AppState; @@ -21,7 +27,12 @@ use database::Database; use crate::storage::StorageDriver; use crate::storage::filesystem::{FilesystemDriver, FilesystemStreamer}; -#[actix_web::main] +use tower_http::trace::TraceLayer; + +pub const REGISTRY_URL: &'static str = "http://localhost:3000"; // TODO: Move into configuration or something (make sure it doesn't end in /) + +//#[actix_web::main] +#[tokio::main] async fn main() -> std::io::Result<()> { let pool = SqlitePoolOptions::new() .max_connections(15) @@ -34,15 +45,16 @@ async fn main() -> std::io::Result<()> { let storage_driver: Mutex> = Mutex::new(Box::new(FilesystemDriver::new(storage_path.clone(), send))); // create the storage streamer - { + /* { let path_clone = storage_path.clone(); actix_rt::spawn(async { let mut streamer = FilesystemStreamer::new(path_clone, recv); streamer.start_handling_streams().await.unwrap(); }); - } + } */ - let state = web::Data::new(AppState::new(pool, storage_driver)); + //let state = web::Data::new(AppState::new(pool, storage_driver)); + let state = Arc::new(AppState::new(pool, storage_driver)); tracing_subscriber::fmt() .with_max_level(Level::DEBUG) @@ -51,53 +63,38 @@ async fn main() -> std::io::Result<()> { // TODO: Make configurable by deployment let payload_config = web::PayloadConfig::new(5 * 1024 * 1024 * 1024); // 5Gb - debug!("Starting http server..."); - - HttpServer::new(move || { - App::new() - .wrap(Logger::default()) - .app_data(state.clone()) - .app_data(payload_config.clone()) - .service( - web::scope("/v2") - .service(api::version_check) - .service( - web::scope("/_catalog") - .service(api::catalog::list_repositories) - ) - .service( - web::scope("/{name}") - .service( - web::scope("/tags") - .service(api::tags::list_tags) - ) - .service( - web::scope("/manifests") - .service(api::manifests::upload_manifest) - .service(api::manifests::pull_manifest) - .service(api::manifests::manifest_exists) - .service(api::manifests::delete_manifest) // delete image - ) - .service( - web::scope("/blobs") - .service(api::blobs::digest_exists) - .service(api::blobs::pull_digest) - .service(api::blobs::delete_digest) - .service( - web::scope("/uploads") - .service(api::uploads::start_upload) - .service(api::uploads::chunked_upload_layer) - .service(api::uploads::finish_chunked_upload) - .service(api::uploads::cancel_upload) - .service(api::uploads::check_upload_status) - // TODO: Cross Repository Blob Mount - ) - ) - + let app = NormalizePathLayer::trim_trailing_slash().layer(Router::new() + .nest("/v2", Router::new() + .route("/", routing::get(api::version_check)) + .route("/_catalog", routing::get(api::catalog::list_repositories)) + .route("/:name/tags/list", routing::get(api::tags::list_tags)) + .nest("/:name/blobs", Router::new() + .route("/:digest", routing::get(api::blobs::pull_digest_get) + .head(api::blobs::digest_exists_head) + .delete(api::blobs::delete_digest)) + .nest("/uploads", Router::new() + .route("/", routing::post(api::uploads::start_upload_post)) + .route("/:uuid", routing::patch(api::uploads::chunked_upload_layer_patch) + .put(api::uploads::finish_chunked_upload_put) + .delete(api::uploads::cancel_upload_delete) + .get(api::uploads::check_upload_status_get) ) + ) ) - }) - .bind(("127.0.0.1", 8080))? - .run() - .await + .route("/:name/manifests/:reference", routing::get(api::manifests::pull_manifest_get) + .put(api::manifests::upload_manifest_put) + .head(api::manifests::manifest_exists_head) + .delete(api::manifests::delete_manifest)) + ) + .with_state(state) + .layer(TraceLayer::new_for_http())); + + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + debug!("Starting http server, listening on {}", addr); + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); + + Ok(()) } \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 974a4ff..324bf58 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -23,7 +23,7 @@ pub trait StorageDriverStreamer { } #[async_trait] -pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ { +pub trait StorageDriver: Send + Sync + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ { async fn get_digest(&self, digest: &str) -> anyhow::Result>; async fn get_digest_stream(&self, digest: &str) -> anyhow::Result>; async fn digest_length(&self, digest: &str) -> anyhow::Result>;