From 2d9b4d33d82e72e374f3c36cb82eea86830b8ffb Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Tue, 18 Apr 2023 18:59:26 -0400 Subject: [PATCH] Dont store blobs in database, create StorageProvider --- .gitignore | 3 +- Cargo.lock | 8 ++ Cargo.toml | 2 + docs/storage-providers/filesystem-provider.md | 2 + docs/todo.md | 5 + rust-toolchain.toml | 2 + src/api/blobs.rs | 25 ++-- src/api/uploads.rs | 58 ++++------ src/app_state.rs | 11 +- src/database/mod.rs | 24 ++-- src/main.rs | 9 +- src/storage/filesystem.rs | 107 ++++++++++++++++++ src/storage/mod.rs | 15 +++ 13 files changed, 209 insertions(+), 62 deletions(-) create mode 100644 docs/storage-providers/filesystem-provider.md create mode 100644 docs/todo.md create mode 100644 rust-toolchain.toml create mode 100644 src/storage/filesystem.rs create mode 100644 src/storage/mod.rs diff --git a/.gitignore b/.gitignore index 5c41111..b96bb21 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target .env .vscode -test.db \ No newline at end of file +test.db +/registry \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 3257285..8ecdbf0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,6 +231,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" + [[package]] name = "async-trait" version = "0.1.68" @@ -613,12 +619,14 @@ name = "docker-registry" version = "0.1.0" dependencies = [ "actix-web", + "anyhow", "async-trait", "bytes", "chrono", "clap", "futures", "jws", + "pin-project-lite", "qstring", "regex", "serde", diff --git a/Cargo.toml b/Cargo.toml index 0c47c10..5789f0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,5 @@ async-trait = "0.1.68" futures = "0.3.28" qstring = "0.7.2" sha256 = "1.1.2" +pin-project-lite = "0.2.9" +anyhow = "1.0.70" diff --git a/docs/storage-providers/filesystem-provider.md b/docs/storage-providers/filesystem-provider.md new file mode 100644 index 0000000..a59213e --- /dev/null +++ b/docs/storage-providers/filesystem-provider.md @@ -0,0 +1,2 @@ +# Filesystem Provider +The filesystem storage provider stores files in a directory. A database is still required when using this provider since the only thing thats stored on the filesystem are the layer's binary data. \ No newline at end of file diff --git a/docs/todo.md b/docs/todo.md new file mode 100644 index 0000000..77e2d95 --- /dev/null +++ b/docs/todo.md @@ -0,0 +1,5 @@ +- [ ] slashes in repo names +- [ ] Simple auth +- [ ] postgresql +- [ ] prometheus metrics +- [ ] streaming layer bytes into providers \ No newline at end of file diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..31578d3 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "stable" \ No newline at end of file diff --git a/src/api/blobs.rs b/src/api/blobs.rs index c3978a4..e958d9d 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -3,28 +3,33 @@ use actix_web::{HttpResponse, get, HttpRequest, web, head, delete}; use crate::app_state::AppState; use crate::database::Database; +use crate::storage::filesystem::FilesystemDriver; #[head("/{digest}")] pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned()); - let database = &state.database; - if let Some(bytes) = database.get_digest(&layer_digest).await.unwrap() { - HttpResponse::Ok() - .insert_header(("Content-Length", bytes.len())) - .insert_header(("Docker-Content-Digest", layer_digest)) - .finish() - } else { - HttpResponse::NotFound().finish() + let storage = state.storage.lock().await; + + if storage.has_digest(&layer_digest).await.unwrap() { + if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() { + return HttpResponse::Ok() + .insert_header(("Content-Length", size)) + .insert_header(("Docker-Content-Digest", layer_digest)) + .finish(); + } } + + HttpResponse::NotFound() + .finish() } #[get("/{digest}")] pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned()); - let database = &state.database; - if let Some(bytes) = database.get_digest(&layer_digest).await.unwrap() { + let storage = state.storage.lock().await; + if let Some(bytes) = storage.get_digest(&layer_digest).await.unwrap() { HttpResponse::Ok() .insert_header(("Content-Length", bytes.len())) .insert_header(("Docker-Content-Digest", layer_digest)) diff --git a/src/api/uploads.rs b/src/api/uploads.rs index e8a0362..86473cd 100644 --- a/src/api/uploads.rs +++ b/src/api/uploads.rs @@ -1,11 +1,13 @@ use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get}; use bytes::{BytesMut, Bytes, BufMut}; use qstring::QString; +use tokio::io::AsyncWriteExt; use tracing::{debug}; use crate::app_state::AppState; use crate::database::Database; +use crate::storage::filesystem::FilesystemDriver; /// Starting an upload #[post("/")] @@ -31,25 +33,21 @@ pub async fn chunked_upload_layer(body: Bytes, path: web::Path<(String, String)> debug!("Read body of size: {}", body.len()); - let database = &state.database; - let (starting, ending) = match database.get_digest(&layer_uuid).await.unwrap() { - Some(current_bytes) => { - let mut combined = BytesMut::new(); - let body_size = body.len(); - let current_size = current_bytes.len(); + let storage = state.storage.lock().await; - combined.put(current_bytes); - combined.put(body); + let current_size = storage.digest_length(&layer_uuid).await.unwrap(); + let (starting, ending) = if let Some(current_size) = current_size { + let body_size = body.len(); - database.save_digest(&layer_uuid, &combined.into()).await.unwrap(); + storage.save_digest(&layer_uuid, &body, true).await.unwrap(); - (current_size, current_size + body_size) - }, - None => { - let body_size = body.len(); - database.save_digest(&layer_uuid, &body.into()).await.unwrap(); - (0, body_size) - } + (current_size, current_size + body_size) + } else { + let body_size = body.len(); + + storage.save_digest(&layer_uuid, &body, true).await.unwrap(); + + (0, body_size) }; debug!("s={}, e={}, uuid={}, uri={}", starting, ending, layer_uuid, full_uri); @@ -69,20 +67,15 @@ pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String) let qs = QString::from(req.query_string()); let digest = qs.get("digest").unwrap(); - let database = &state.database; + let storage = state.storage.lock().await; if !body.is_empty() { - let current_bytes = database.get_digest(&layer_uuid).await.unwrap().unwrap(); - let mut combined = BytesMut::new(); - - combined.put(current_bytes); - combined.put(body); - - database.save_digest(&layer_uuid, &combined.into()).await.unwrap(); + storage.save_digest(&layer_uuid, &body, true).await.unwrap(); } else { // TODO: Validate layer with all digest params } - database.replace_digest(&layer_uuid, &digest).await.unwrap(); + storage.replace_digest(&layer_uuid, &digest).await.unwrap(); + debug!("Completed upload, finished uuid {} to digest {}", layer_uuid, digest); HttpResponse::Created() .insert_header(("Location", format!("/v2/{}/blobs/{}", name, digest))) @@ -95,8 +88,8 @@ pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String) pub async fn cancel_upload(path: web::Path<(String, String)>, state: web::Data) -> HttpResponse { let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); - let database = &state.database; - database.delete_digest(&layer_uuid).await.unwrap(); + let storage = state.storage.lock().await; + storage.delete_digest(&layer_uuid).await.unwrap(); // I'm not sure what this response should be, its not specified in the registry spec. HttpResponse::Ok() @@ -107,15 +100,8 @@ pub async fn cancel_upload(path: web::Path<(String, String)>, state: web::Data, state: web::Data) -> HttpResponse { let (name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); - let database = &state.database; - let ending = match database.get_digest(&layer_uuid).await.unwrap() { - Some(current_bytes) => { - current_bytes.len() - }, - None => { - 0 - } - }; + let storage = state.storage.lock().await; + let ending = storage.digest_length(&layer_uuid).await.unwrap().unwrap_or(0); HttpResponse::Created() .insert_header(("Location", format!("/v2/{}/blobs/uploads/{}", name, layer_uuid))) diff --git a/src/app_state.rs b/src/app_state.rs index 1b1f9ca..42220c8 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -1,13 +1,22 @@ use sqlx::{Sqlite, Pool}; +use crate::storage::StorageDriver; + +use tokio::sync::Mutex; + pub struct AppState { pub database: Pool, + pub storage: Mutex>, } impl AppState { - pub fn new(database: Pool) -> Self { + pub fn new/* */(database: Pool, storage: Mutex>) -> Self + /* where + S: StorageDriver, */ + { Self { database, + storage } } } \ No newline at end of file diff --git a/src/database/mod.rs b/src/database/mod.rs index 71c5f04..17caa48 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -17,7 +17,7 @@ pub trait Database { /// Create the tables in the database async fn create_schema(&self) -> sqlx::Result<()>; /// Get the digest bytes - async fn get_digest(&self, digest: &str) -> sqlx::Result>; + /* async fn get_digest(&self, digest: &str) -> sqlx::Result>; /// Get the length of the digest async fn digest_length(&self, digest: &str) -> sqlx::Result; /// Save digest bytes @@ -25,9 +25,7 @@ pub trait Database { /// Delete digest async fn delete_digest(&self, digest: &str) -> sqlx::Result<()>; /// Replace the uuid with a digest - async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>; - async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>; - async fn unlink_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>; + async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>; */ // Tag related functions @@ -48,7 +46,10 @@ pub trait Database { /// Save a manifest's content. async fn save_manifest(&self, repository: &str, digest: &str, content: &str) -> sqlx::Result<()>; /// Delete a manifest - async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<()>; + /// Returns digests that this manifest pointed to. + async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result>; + async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>; + async fn unlink_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>; // Repository related functions @@ -70,7 +71,7 @@ impl Database for Pool { Ok(()) } - async fn get_digest(&self, digest: &str) -> sqlx::Result> { + /* async fn get_digest(&self, digest: &str) -> sqlx::Result> { // Handle RowNotFound errors let row: (Vec, ) = match sqlx::query_as("SELECT blob FROM layer_blobs WHERE digest = ?") .bind(digest) @@ -134,7 +135,7 @@ impl Database for Pool { debug!("Replaced digest uuid {} to digest {}", uuid, new_digest); Ok(()) - } + } */ async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()> { sqlx::query("INSERT INTO manifest_layers(manifest, layer_digest) VALUES (?, ?)") @@ -269,7 +270,7 @@ impl Database for Pool { Ok(()) } - async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<()> { + async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result> { sqlx::query("DELETE FROM image_manifests where digest = ? AND repository = ?") .bind(digest) .bind(repository) @@ -283,11 +284,8 @@ impl Database for Pool { debug!("Unlinked manifest {} from all linked layers", digest); - for row in rows.into_iter() { - let layer_digest = row.0; - self.delete_digest(&layer_digest).await?; - } + let digests = rows.into_iter().map(|r| r.0).collect(); debug!("Deleted all digests for manifest {}", digest); @@ -297,7 +295,7 @@ impl Database for Pool { debug!("Deleted all image tags for manifest {}", digest); - Ok(()) + Ok(digests) } async fn save_repository(&self, repository: &str) -> sqlx::Result<()> { diff --git a/src/main.rs b/src/main.rs index fde28b3..f7994af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,16 +2,21 @@ mod api; mod app_state; mod database; mod dto; +mod storage; use actix_web::{web, App, HttpServer}; use actix_web::middleware::Logger; use sqlx::sqlite::SqlitePoolOptions; +use tokio::sync::Mutex; use tracing::{debug, Level}; use app_state::AppState; use database::Database; +use crate::storage::StorageDriver; +use crate::storage::filesystem::FilesystemDriver; + #[actix_web::main] async fn main() -> std::io::Result<()> { let pool = SqlitePoolOptions::new() @@ -22,7 +27,9 @@ async fn main() -> std::io::Result<()> { //let db_conn: Mutex = Mutex::new(SqliteConnection::establish("test.db").unwrap()); //let db = Mutex::new(Database::new_sqlite_connection("test.db").unwrap()); - let state = web::Data::new(AppState::new(pool)); + let storage_driver: Mutex> = Mutex::new(Box::new(FilesystemDriver::new("registry/blobs"))); + + let state = web::Data::new(AppState::new(pool, storage_driver)); tracing_subscriber::fmt() .with_max_level(Level::DEBUG) diff --git a/src/storage/filesystem.rs b/src/storage/filesystem.rs new file mode 100644 index 0000000..8a7083a --- /dev/null +++ b/src/storage/filesystem.rs @@ -0,0 +1,107 @@ +use std::{path::Path, io::ErrorKind}; + +use anyhow::Context; +use async_trait::async_trait; +use bytes::Bytes; +use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking}; +use tracing::debug; + +use super::StorageDriver; + +pub struct FilesystemDriver { + storage_path: String, +} + +impl FilesystemDriver { + pub fn new(storage_path: &str) -> Self { + Self { + storage_path: storage_path.to_string(), + } + } + + fn get_digest_path(&self, digest: &str) -> String { + format!("{}/{}", self.storage_path, digest) + } +} + +#[async_trait] +impl StorageDriver for FilesystemDriver { + async fn has_digest(&self, digest: &str) -> anyhow::Result { + let path = self.get_digest_path(digest); + + spawn_blocking(move || { + return Path::new(&path).exists() + }).await.context("FilesystemDriver: Failure to spawn blocking thread to check digest") + } + + async fn get_digest(&self, digest: &str) -> anyhow::Result> { + let mut file = match fs::File::open(self.get_digest_path(digest)) + .await { + + Ok(f) => f, + Err(e) => match e.kind() { + ErrorKind::NotFound => { + return Ok(None) + }, + _ => { + return Err(e) + .context("FilesystemDriver: Failure to open digest file"); + } + } + }; + + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await?; + + Ok(Some(Bytes::from_iter(buf))) + } + + async fn digest_length(&self, digest: &str) -> anyhow::Result> { + let file = match fs::File::open(self.get_digest_path(digest)) + .await { + + Ok(f) => f, + Err(e) => match e.kind() { + ErrorKind::NotFound => { + return Ok(None) + }, + _ => { + return Err(e) + .context("FilesystemDriver: Failure to open digest file"); + } + } + }; + + Ok(Some(file.metadata().await?.len() as usize)) + } + + async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()> { + let path = self.get_digest_path(digest); + let mut file = fs::OpenOptions::new() + .write(true) + .append(append) + .create(true) + .open(path).await?; + + file.write_all(&bytes).await?; + + Ok(()) + } + + async fn delete_digest(&self, digest: &str) -> anyhow::Result<()> { + let path = self.get_digest_path(digest); + fs::remove_file(path).await?; + + Ok(()) + } + + async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()> { + let path = self.get_digest_path(uuid); + let path = Path::new(&path); + let parent = path.clone().parent().unwrap(); + + fs::rename(path, format!("{}/{}", parent.as_os_str().to_str().unwrap(), digest)).await?; + + Ok(()) + } +} \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..4c8b604 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,15 @@ +pub mod filesystem; + +use async_trait::async_trait; +use bytes::Bytes; +use tokio::io::{AsyncWrite, AsyncRead}; + +#[async_trait] +pub trait StorageDriver: Send/* : AsyncWrite + AsyncRead */ { + async fn has_digest(&self, digest: &str) -> anyhow::Result; + async fn get_digest(&self, digest: &str) -> anyhow::Result>; + async fn digest_length(&self, digest: &str) -> anyhow::Result>; + async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>; + async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>; + async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>; +} \ No newline at end of file