Switch to using axum so bytes can be streamed

This commit is contained in:
SeanOMik 2023-04-25 14:21:31 -04:00
parent 6107a49b44
commit 8206171216
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
10 changed files with 446 additions and 236 deletions

203
Cargo.lock generated
View File

@ -297,6 +297,67 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "113713495a32dd0ab52baf5c10044725aa3aec00b31beda84218e469029b72a3"
dependencies = [
"async-trait",
"axum-core",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-macros"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bb524613be645939e280b7279f7b017f98cf7f5ef084ec374df373530e73277"
dependencies = [
"heck",
"proc-macro2",
"quote 1.0.26",
"syn 2.0.15",
]
[[package]]
name = "base64"
version = "0.13.1"
@ -646,6 +707,8 @@ dependencies = [
"anyhow",
"async-stream",
"async-trait",
"axum",
"axum-macros",
"bytes",
"chrono",
"clap",
@ -660,6 +723,8 @@ dependencies = [
"sqlx",
"tokio",
"tokio-util",
"tower-http",
"tower-layer",
"tracing",
"tracing-log",
"tracing-subscriber",
@ -923,15 +988,32 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.8"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "http-range-header"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
[[package]]
name = "httparse"
version = "1.8.0"
@ -944,6 +1026,29 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "iana-time-zone"
version = "0.1.53"
@ -1008,9 +1113,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.4"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]]
name = "jobserver"
@ -1120,6 +1225,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "matchit"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]]
name = "memchr"
version = "2.5.0"
@ -1494,6 +1605,12 @@ dependencies = [
"base64 0.21.0",
]
[[package]]
name = "rustversion"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06"
[[package]]
name = "ryu"
version = "1.0.11"
@ -1559,6 +1676,15 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0"
dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -1826,6 +1952,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "synom"
version = "0.11.3"
@ -1994,6 +2126,53 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.37"
@ -2053,6 +2232,12 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "try-lock"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "typenum"
version = "1.15.0"
@ -2149,6 +2334,16 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"

View File

@ -36,3 +36,8 @@ pin-project-lite = "0.2.9"
anyhow = "1.0.70"
async-stream = "0.3.5"
actix-rt = "2.8.0"
axum = "0.6.16"
axum-macros = "0.3.7"
tower-http = { version = "0.4.0", features = [ "trace", "normalize-path" ] }
tower-layer = { version = "0.3.2" }

View File

@ -1,51 +1,55 @@
use actix_web::{HttpResponse, get, HttpRequest, web, head, delete};
use futures::StreamExt;
use std::sync::Arc;
use axum::body::StreamBody;
use axum::extract::{State, Path};
use axum::http::{StatusCode, header, HeaderName};
use axum::response::{IntoResponse, Response};
use tokio_util::io::ReaderStream;
use crate::app_state::AppState;
use crate::database::Database;
use crate::storage::filesystem::FilesystemDriver;
#[head("/{digest}")]
pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned());
pub async fn digest_exists_head(Path((name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
let storage = state.storage.lock().await;
if storage.has_digest(&layer_digest).unwrap() {
if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() {
return HttpResponse::Ok()
.insert_header(("Content-Length", size))
.insert_header(("Docker-Content-Digest", layer_digest))
.finish();
return (
StatusCode::OK,
[
(header::CONTENT_LENGTH, size.to_string()),
(HeaderName::from_static("docker-content-digest"), layer_digest)
]
).into_response();
}
}
HttpResponse::NotFound()
.finish()
StatusCode::NOT_FOUND.into_response()
}
#[get("/{digest}")]
pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned());
pub async fn pull_digest_get(Path((name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
let storage = state.storage.lock().await;
if let Some(len) = storage.digest_length(&layer_digest).await.unwrap() {
let stream = storage.stream_bytes(&layer_digest).await.unwrap().unwrap();
HttpResponse::Ok()
.insert_header(("Content-Length", len))
.insert_header(("Docker-Content-Digest", layer_digest))
.streaming(stream)
// convert the `AsyncRead` into a `Stream`
let stream = ReaderStream::new(stream.into_async_read());
// convert the `Stream` into an `axum::body::HttpBody`
let body = StreamBody::new(stream);
(
StatusCode::OK,
[
(header::CONTENT_LENGTH, len.to_string()),
(HeaderName::from_static("docker-content-digest"), layer_digest)
],
body
).into_response()
} else {
HttpResponse::NotFound()
.finish()
StatusCode::NOT_FOUND.into_response()
}
}
#[delete("/{digest}")]
pub async fn delete_digest(_req: HttpRequest, _state: web::Data<AppState>) -> HttpResponse {
pub async fn delete_digest(state: State<Arc<AppState>>) -> impl IntoResponse {
todo!()
}

View File

@ -1,5 +1,6 @@
use actix_web::{HttpResponse, web, get, HttpRequest};
use qstring::QString;
use std::sync::Arc;
use axum::{extract::{State, Query}, http::{StatusCode, header, HeaderMap, HeaderName}, response::IntoResponse};
use serde::{Serialize, Deserialize};
use crate::{app_state::AppState, database::Database};
@ -10,30 +11,32 @@ pub struct RepositoryList {
repositories: Vec<String>,
}
#[get("")]
pub async fn list_repositories(req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
// Get limit and last tag from query params if present.
let qs = QString::from(req.query_string());
let limit = qs.get("n");
let last_repo = qs.get("last");
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListRepositoriesParams {
#[serde(rename = "n")]
limit: Option<u32>,
#[serde(rename = "last")]
last_repo: Option<String>,
}
pub async fn list_repositories(Query(params): Query<ListRepositoriesParams>, state: State<Arc<AppState>>) -> impl IntoResponse {
let mut link_header = None;
// Paginate tag results if n was specified, else just pull everything.
let database = &state.database;
let repositories = match limit {
let repositories = match params.limit {
Some(limit) => {
let limit: u32 = limit.parse().unwrap();
// Convert the last param to a String, and list all the repos
let last_repo = last_repo.and_then(|t| Some(t.to_string()));
let last_repo = params.last_repo.and_then(|t| Some(t.to_string()));
let repos = database.list_repositories(Some(limit), last_repo).await.unwrap();
// Get the new last repository for the response
let last_repo = repos.last().and_then(|s| Some(s.clone()));
// Construct the link header
let url = req.uri().to_string();
let url = crate::REGISTRY_URL;
let mut url = format!("<{}/v2/_catalog?n={}", url, limit);
if let Some(last_repo) = last_repo {
url += &format!("&limit={}", last_repo);
@ -54,13 +57,18 @@ pub async fn list_repositories(req: HttpRequest, state: web::Data<AppState>) ->
};
let response_body = serde_json::to_string(&repo_list).unwrap();
// Construct the response, optionally adding the Link header if it was constructed.
let mut resp = HttpResponse::Ok();
resp.append_header(("Content-Type", "application/json"));
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
headers.insert(HeaderName::from_static("docker-distribution-api-version"), "registry/2.0".parse().unwrap());
if let Some(link_header) = link_header {
resp.append_header(("Link", link_header));
headers.insert(header::LINK, link_header.parse().unwrap());
}
resp.body(response_body)
// Construct the response, optionally adding the Link header if it was constructed.
(
StatusCode::OK,
headers,
response_body
)
}

View File

@ -1,4 +1,8 @@
use actix_web::{HttpResponse, web, put, get, head, delete, HttpRequest};
use std::sync::Arc;
use axum::extract::{Path, State};
use axum::response::{Response, IntoResponse};
use axum::http::{StatusCode, HeaderMap, HeaderName, header};
use tracing::log::warn;
use tracing::{debug, info};
@ -8,10 +12,7 @@ use crate::database::Database;
use crate::dto::digest::Digest;
use crate::dto::manifest::Manifest;
#[put("/{reference}")]
pub async fn upload_manifest(path: web::Path<(String, String)>, body: String, state: web::Data<AppState>) -> HttpResponse {
let (name, reference) = (path.0.to_owned(), path.1.to_owned());
pub async fn upload_manifest_put(Path((name, reference)): Path<(String, String)>, state: State<Arc<AppState>>, body: String) -> Response {
// Calculate the sha256 digest for the manifest.
let calculated_hash = sha256::digest(body.clone());
let calculated_digest = format!("sha256:{}", calculated_hash);
@ -40,23 +41,20 @@ pub async fn upload_manifest(path: web::Path<(String, String)>, body: String, st
debug!("Linked manifest {} to layer {}", calculated_digest, image.config.digest);
}
HttpResponse::Created()
.append_header(("Docker-Content-Digest", calculated_digest))
.finish()
(
StatusCode::CREATED,
[ (HeaderName::from_static("docker-content-digest"), calculated_digest) ]
).into_response()
},
Manifest::List(_list) => {
warn!("ManifestList request was received!");
HttpResponse::NotImplemented()
.finish()
StatusCode::NOT_IMPLEMENTED.into_response()
}
}
}
#[get("/{reference}")]
pub async fn pull_manifest(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (name, reference) = (path.0.to_owned(), path.1.to_owned());
pub async fn pull_manifest_get(Path((name, reference)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
let database = &state.database;
let digest = match Digest::is_digest(&reference) {
true => reference.clone(),
@ -65,8 +63,7 @@ pub async fn pull_manifest(path: web::Path<(String, String)>, state: web::Data<A
if let Some(tag) = database.get_tag(&name, &reference).await.unwrap() {
tag.manifest_digest
} else {
return HttpResponse::NotFound()
.finish();
return StatusCode::NOT_FOUND.into_response();
}
}
};
@ -76,23 +73,24 @@ pub async fn pull_manifest(path: web::Path<(String, String)>, state: web::Data<A
debug!("Failed to get manifest in repo {}, for digest {}", name, digest);
// The digest that was provided in the request was invalid.
// NOTE: This could also mean that there's a bug and the tag pointed to an invalid manifest.
return HttpResponse::NotFound()
.finish();
return StatusCode::NOT_FOUND.into_response();
}
let manifest_content = manifest_content.unwrap();
HttpResponse::Ok()
.append_header(("Docker-Content-Digest", digest))
.append_header(("Content-Type", "application/vnd.docker.distribution.manifest.v2+json"))
.append_header(("Accept", "application/vnd.docker.distribution.manifest.v2+json"))
.append_header(("Docker-Distribution-API-Version", "registry/2.0"))
.body(manifest_content)
(
StatusCode::OK,
[
(HeaderName::from_static("docker-content-digest"), digest),
(header::CONTENT_TYPE, "application/vnd.docker.distribution.manifest.v2+json".to_string()),
(header::CONTENT_LENGTH, manifest_content.len().to_string()),
(header::ACCEPT, "application/vnd.docker.distribution.manifest.v2+json".to_string()),
(HeaderName::from_static("docker-distribution-api-version"), "registry/2.0".to_string()),
],
manifest_content
).into_response()
}
#[head("/{reference}")]
pub async fn manifest_exists(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (name, reference) = (path.0.to_owned(), path.1.to_owned());
pub async fn manifest_exists_head(Path((name, reference)): Path<(String, String)>, state: State<Arc<AppState>>) -> Response {
// Get the digest from the reference path.
let database = &state.database;
let digest = match Digest::is_digest(&reference) {
@ -101,8 +99,7 @@ pub async fn manifest_exists(path: web::Path<(String, String)>, state: web::Data
if let Some(tag) = database.get_tag(&name, &reference).await.unwrap() {
tag.manifest_digest
} else {
return HttpResponse::NotFound()
.finish();
return StatusCode::NOT_FOUND.into_response();
}
}
};
@ -111,33 +108,31 @@ pub async fn manifest_exists(path: web::Path<(String, String)>, state: web::Data
if manifest_content.is_none() {
// The digest that was provided in the request was invalid.
// NOTE: This could also mean that there's a bug and the tag pointed to an invalid manifest.
return HttpResponse::NotFound()
.finish();
return StatusCode::NOT_FOUND.into_response();
}
let manifest_content = manifest_content.unwrap();
HttpResponse::Ok()
.append_header(("Docker-Content-Digest", digest))
.append_header(("Content-Type", "application/vnd.docker.distribution.manifest.v2+json"))
.append_header(("Content-Length", manifest_content.len()))
.append_header(("Docker-Distribution-API-Version", "registry/2.0"))
.body(manifest_content)
(
StatusCode::OK,
[
(HeaderName::from_static("docker-content-digest"), digest),
(header::CONTENT_TYPE, "application/vnd.docker.distribution.manifest.v2+json".to_string()),
(header::CONTENT_LENGTH, manifest_content.len().to_string()),
(HeaderName::from_static("docker-distribution-api-version"), "registry/2.0".to_string()),
],
manifest_content
).into_response()
}
#[delete("/{reference}")]
pub async fn delete_manifest(path: web::Path<(String, String)>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let (name, reference) = (path.0.to_owned(), path.1.to_owned());
let headers = req.headers();
let _authorization = headers.get("Authorization").unwrap(); // TODO:
pub async fn delete_manifest(Path((name, reference)): Path<(String, String)>, headers: HeaderMap, state: State<Arc<AppState>>) -> Response {
let _authorization = headers.get("Authorization").unwrap(); // TODO: use authorization header
let database = &state.database;
let digest = match Digest::is_digest(&reference) {
true => {
// Check if the manifest exists
if database.get_manifest(&name, &reference).await.unwrap().is_none() {
return HttpResponse::NotFound()
.finish();
return StatusCode::NOT_FOUND.into_response();
}
reference.clone()
@ -146,15 +141,17 @@ pub async fn delete_manifest(path: web::Path<(String, String)>, req: HttpRequest
if let Some(tag) = database.get_tag(&name, &reference).await.unwrap() {
tag.manifest_digest
} else {
return HttpResponse::NotFound()
.finish();
return StatusCode::NOT_FOUND.into_response();
}
}
};
database.delete_manifest(&name, &digest).await.unwrap();
HttpResponse::Accepted()
.append_header(("Content-Length", "None"))
.finish()
(
StatusCode::ACCEPTED,
[
(header::CONTENT_LENGTH, "None"),
],
).into_response()
}

View File

@ -1,16 +1,17 @@
use axum::response::IntoResponse;
use axum::http::{StatusCode, header, HeaderName};
pub mod blobs;
pub mod uploads;
pub mod manifests;
pub mod tags;
pub mod catalog;
use actix_web::{HttpResponse, get};
/// https://docs.docker.com/registry/spec/api/#api-version-check
/// full endpoint: `/v2/`
#[get("/")]
pub async fn version_check() -> HttpResponse {
HttpResponse::Ok()
.insert_header(("Docker-Distribution-API-Version", "registry/2.0"))
.finish()
pub async fn version_check() -> impl IntoResponse {
(
StatusCode::OK,
[( HeaderName::from_static("docker-distribution-api-version"), "registry/2.0" )]
)
}

View File

@ -1,5 +1,6 @@
use actix_web::{HttpResponse, web, get, HttpRequest};
use qstring::QString;
use std::sync::Arc;
use axum::{extract::{Path, Query, State}, response::IntoResponse, http::{StatusCode, header, HeaderMap, HeaderName}};
use serde::{Serialize, Deserialize};
use crate::{app_state::AppState, database::Database};
@ -11,32 +12,32 @@ pub struct TagList {
tags: Vec<String>,
}
#[get("/list")]
pub async fn list_tags(path: web::Path<(String, )>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let name = path.0.to_owned();
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListRepositoriesParams {
#[serde(rename = "n")]
limit: Option<u32>,
// Get limit and last tag from query params if present.
let qs = QString::from(req.query_string());
let limit = qs.get("n");
let last_tag = qs.get("last");
#[serde(rename = "last")]
last_tag: Option<String>,
}
pub async fn list_tags(Path((name, )): Path<(String, )>, Query(params): Query<ListRepositoriesParams>, state: State<Arc<AppState>>) -> impl IntoResponse {
let mut link_header = None;
// Paginate tag results if n was specified, else just pull everything.
let database = &state.database;
let tags = match limit {
let tags = match params.limit {
Some(limit) => {
let limit: u32 = limit.parse().unwrap();
// Convert the last param to a String, and list all the tags
let last_tag = last_tag.and_then(|t| Some(t.to_string()));
let last_tag = params.last_tag.and_then(|t| Some(t.to_string()));
let tags = database.list_repository_tags_page(&name, limit, last_tag).await.unwrap();
// Get the new last repository for the response
let last_tag = tags.last();
// Construct the link header
let url = req.uri().to_string();
let url = crate::REGISTRY_URL;
let mut url = format!("<{}/v2/{}/tags/list?n={}", url, name, limit);
if let Some(last_tag) = last_tag {
url += &format!("&limit={}", last_tag.name);
@ -58,13 +59,19 @@ pub async fn list_tags(path: web::Path<(String, )>, req: HttpRequest, state: web
};
let response_body = serde_json::to_string(&tag_list).unwrap();
// Construct the response, optionally adding the Link header if it was constructed.
let mut resp = HttpResponse::Ok();
resp.append_header(("Content-Type", "application/json"));
// Create headers
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
headers.insert(HeaderName::from_static("docker-distribution-api-version"), "registry/2.0".parse().unwrap());
// Add the link header if it was constructed
if let Some(link_header) = link_header {
resp.append_header(("Link", link_header));
headers.insert(header::LINK, link_header.parse().unwrap());
}
resp.body(response_body)
(
StatusCode::OK,
headers,
response_body
)
}

View File

@ -1,20 +1,19 @@
use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get};
use bytes::{BytesMut, Bytes, BufMut};
use futures::{StreamExt, TryStreamExt};
use qstring::QString;
use tokio::io::AsyncWriteExt;
use std::collections::HashMap;
use std::sync::Arc;
use axum::http::{StatusCode, header, HeaderName};
use axum::extract::{Path, BodyStream, State, Query};
use axum::response::{IntoResponse, Response};
use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use tracing::{debug, warn};
use crate::app_state::AppState;
use crate::database::Database;
use crate::storage::{StorageDriver, StorageDriverStreamer};
/// Starting an upload
#[post("/")]
pub async fn start_upload(path: web::Path<(String, )>) -> HttpResponse {
pub async fn start_upload_post(Path((name, )): Path<(String, )>) -> impl IntoResponse {
debug!("Upload starting");
let name = path.0.to_owned();
let uuid = uuid::Uuid::new_v4();
debug!("Requesting upload of image {}, generated uuid: {}", name, uuid);
@ -22,24 +21,22 @@ pub async fn start_upload(path: web::Path<(String, )>) -> HttpResponse {
let location = format!("/v2/{}/blobs/uploads/{}", name, uuid.to_string());
debug!("Constructed upload url: {}", location);
HttpResponse::Accepted()
.insert_header(("Location", location))
.finish()
(
StatusCode::ACCEPTED,
[ (header::LOCATION, location) ]
)
}
#[patch("/{uuid}")]
pub async fn chunked_upload_layer(mut payload: web::Payload, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let full_uri = req.uri().to_string();
let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned());
pub async fn chunked_upload_layer_patch(Path((name, layer_uuid)): Path<(String, String)>, state: State<Arc<AppState>>, mut body: BodyStream) -> Response {
let storage = state.storage.lock().await;
let current_size = storage.digest_length(&layer_uuid).await.unwrap();
let written_size = match storage.supports_streaming() {
let written_size = match /* storage.supports_streaming() */ false {
true => {
// TODO: Make less bad
let sender = storage.start_stream_channel();
let mut written_size = 0;
while let Some(item) = payload.next().await {
while let Some(item) = body.next().await {
if let Ok(bytes) = item {
written_size += bytes.len();
sender.send((layer_uuid.clone(), bytes)).await.unwrap();
@ -51,8 +48,8 @@ pub async fn chunked_upload_layer(mut payload: web::Payload, path: web::Path<(St
false => {
warn!("This storage driver does not support streaming! This means high memory usage during image pushes!");
let mut bytes = web::BytesMut::new();
while let Some(item) = payload.next().await {
let mut bytes = BytesMut::new();
while let Some(item) = body.next().await {
bytes.extend_from_slice(&item.unwrap());
}
@ -68,20 +65,20 @@ pub async fn chunked_upload_layer(mut payload: web::Payload, path: web::Path<(St
(0, written_size)
};
HttpResponse::Accepted()
.insert_header(("Location", full_uri))
.insert_header(("Range", format!("{}-{}", starting, ending)))
.insert_header(("Content-Length", 0))
.insert_header(("Docker-Upload-UUID", layer_uuid))
.finish()
let full_uri = format!("{}/v2/{}/blobs/uploads/{}", crate::REGISTRY_URL, name, layer_uuid);
(
StatusCode::ACCEPTED,
[
(header::LOCATION, full_uri),
(header::RANGE, format!("{}-{}", starting, ending)),
(header::CONTENT_LENGTH, "0".to_string()),
(HeaderName::from_static("docker-upload-uuid"), layer_uuid)
]
).into_response()
}
#[put("/{uuid}")]
pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String)>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let (name, layer_uuid) = (path.0.to_owned(), path.1.to_owned());
let qs = QString::from(req.query_string());
let digest = qs.get("digest").unwrap();
pub async fn finish_chunked_upload_put(Path((name, layer_uuid)): Path<(String, String)>, Query(query): Query<HashMap<String, String>>, state: State<Arc<AppState>>, body: Bytes) -> impl IntoResponse {
let digest = query.get("digest").unwrap();
let storage = state.storage.lock().await;
if !body.is_empty() {
@ -93,35 +90,34 @@ pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String)
storage.replace_digest(&layer_uuid, &digest).await.unwrap();
debug!("Completed upload, finished uuid {} to digest {}", layer_uuid, digest);
HttpResponse::Created()
.insert_header(("Location", format!("/v2/{}/blobs/{}", name, digest)))
.insert_header(("Content-Length", 0))
.insert_header(("Docker-Upload-Digest", digest))
.finish()
(
StatusCode::CREATED,
[
(header::LOCATION, format!("/v2/{}/blobs/{}", name, digest)),
(header::CONTENT_LENGTH, "0".to_string()),
(HeaderName::from_static("docker-upload-digest"), digest.to_owned())
]
)
}
#[delete("/{uuid}")]
pub async fn cancel_upload(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned());
pub async fn cancel_upload_delete(Path((name, layer_uuid)): Path<(String, String)>, state: State<Arc<AppState>>) -> impl IntoResponse {
let storage = state.storage.lock().await;
storage.delete_digest(&layer_uuid).await.unwrap();
// I'm not sure what this response should be, its not specified in the registry spec.
HttpResponse::Ok()
.finish()
StatusCode::OK
}
#[get("/{uuid}")]
pub async fn check_upload_status(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (name, layer_uuid) = (path.0.to_owned(), path.1.to_owned());
pub async fn check_upload_status_get(Path((name, layer_uuid)): Path<(String, String)>, state: State<Arc<AppState>>) -> impl IntoResponse {
let storage = state.storage.lock().await;
let ending = storage.digest_length(&layer_uuid).await.unwrap().unwrap_or(0);
HttpResponse::Created()
.insert_header(("Location", format!("/v2/{}/blobs/uploads/{}", name, layer_uuid)))
.insert_header(("Range", format!("0-{}", ending)))
.insert_header(("Docker-Upload-Digest", layer_uuid))
.finish()
(
StatusCode::CREATED,
[
(header::LOCATION, format!("/v2/{}/blobs/uploads/{}", name, layer_uuid)),
(header::RANGE, format!("0-{}", ending)),
(HeaderName::from_static("docker-upload-digest"), layer_uuid)
]
)
}

View File

@ -5,14 +5,20 @@ mod dto;
mod storage;
mod byte_stream;
use std::net::SocketAddr;
use std::sync::Arc;
use actix_web::{web, App, HttpServer};
use actix_web::middleware::Logger;
use axum::{Router, routing};
use axum::ServiceExt;
use tower_layer::Layer;
use bytes::Bytes;
use sqlx::sqlite::SqlitePoolOptions;
use tokio::sync::{Mutex, mpsc};
use tower_http::normalize_path::NormalizePathLayer;
use tracing::{debug, Level};
use app_state::AppState;
@ -21,7 +27,12 @@ use database::Database;
use crate::storage::StorageDriver;
use crate::storage::filesystem::{FilesystemDriver, FilesystemStreamer};
#[actix_web::main]
use tower_http::trace::TraceLayer;
pub const REGISTRY_URL: &'static str = "http://localhost:3000"; // TODO: Move into configuration or something (make sure it doesn't end in /)
//#[actix_web::main]
#[tokio::main]
async fn main() -> std::io::Result<()> {
let pool = SqlitePoolOptions::new()
.max_connections(15)
@ -34,15 +45,16 @@ async fn main() -> std::io::Result<()> {
let storage_driver: Mutex<Box<dyn StorageDriver>> = Mutex::new(Box::new(FilesystemDriver::new(storage_path.clone(), send)));
// create the storage streamer
{
/* {
let path_clone = storage_path.clone();
actix_rt::spawn(async {
let mut streamer = FilesystemStreamer::new(path_clone, recv);
streamer.start_handling_streams().await.unwrap();
});
}
} */
let state = web::Data::new(AppState::new(pool, storage_driver));
//let state = web::Data::new(AppState::new(pool, storage_driver));
let state = Arc::new(AppState::new(pool, storage_driver));
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
@ -51,53 +63,38 @@ async fn main() -> std::io::Result<()> {
// TODO: Make configurable by deployment
let payload_config = web::PayloadConfig::new(5 * 1024 * 1024 * 1024); // 5Gb
debug!("Starting http server...");
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.app_data(state.clone())
.app_data(payload_config.clone())
.service(
web::scope("/v2")
.service(api::version_check)
.service(
web::scope("/_catalog")
.service(api::catalog::list_repositories)
)
.service(
web::scope("/{name}")
.service(
web::scope("/tags")
.service(api::tags::list_tags)
)
.service(
web::scope("/manifests")
.service(api::manifests::upload_manifest)
.service(api::manifests::pull_manifest)
.service(api::manifests::manifest_exists)
.service(api::manifests::delete_manifest) // delete image
)
.service(
web::scope("/blobs")
.service(api::blobs::digest_exists)
.service(api::blobs::pull_digest)
.service(api::blobs::delete_digest)
.service(
web::scope("/uploads")
.service(api::uploads::start_upload)
.service(api::uploads::chunked_upload_layer)
.service(api::uploads::finish_chunked_upload)
.service(api::uploads::cancel_upload)
.service(api::uploads::check_upload_status)
// TODO: Cross Repository Blob Mount
)
)
let app = NormalizePathLayer::trim_trailing_slash().layer(Router::new()
.nest("/v2", Router::new()
.route("/", routing::get(api::version_check))
.route("/_catalog", routing::get(api::catalog::list_repositories))
.route("/:name/tags/list", routing::get(api::tags::list_tags))
.nest("/:name/blobs", Router::new()
.route("/:digest", routing::get(api::blobs::pull_digest_get)
.head(api::blobs::digest_exists_head)
.delete(api::blobs::delete_digest))
.nest("/uploads", Router::new()
.route("/", routing::post(api::uploads::start_upload_post))
.route("/:uuid", routing::patch(api::uploads::chunked_upload_layer_patch)
.put(api::uploads::finish_chunked_upload_put)
.delete(api::uploads::cancel_upload_delete)
.get(api::uploads::check_upload_status_get)
)
)
)
})
.bind(("127.0.0.1", 8080))?
.run()
.await
.route("/:name/manifests/:reference", routing::get(api::manifests::pull_manifest_get)
.put(api::manifests::upload_manifest_put)
.head(api::manifests::manifest_exists_head)
.delete(api::manifests::delete_manifest))
)
.with_state(state)
.layer(TraceLayer::new_for_http()));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
debug!("Starting http server, listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
Ok(())
}

View File

@ -23,7 +23,7 @@ pub trait StorageDriverStreamer {
}
#[async_trait]
pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ {
pub trait StorageDriver: Send + Sync + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ {
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>>;
async fn get_digest_stream(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>>;