Dont store blobs in database, create StorageProvider

This commit is contained in:
SeanOMik 2023-04-18 18:59:26 -04:00
parent 5431c2d9d3
commit 2d9b4d33d8
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
13 changed files with 209 additions and 62 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
.env .env
.vscode .vscode
test.db test.db
/registry

8
Cargo.lock generated
View File

@ -231,6 +231,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anyhow"
version = "1.0.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.68" version = "0.1.68"
@ -613,12 +619,14 @@ name = "docker-registry"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"anyhow",
"async-trait", "async-trait",
"bytes", "bytes",
"chrono", "chrono",
"clap", "clap",
"futures", "futures",
"jws", "jws",
"pin-project-lite",
"qstring", "qstring",
"regex", "regex",
"serde", "serde",

View File

@ -31,3 +31,5 @@ async-trait = "0.1.68"
futures = "0.3.28" futures = "0.3.28"
qstring = "0.7.2" qstring = "0.7.2"
sha256 = "1.1.2" sha256 = "1.1.2"
pin-project-lite = "0.2.9"
anyhow = "1.0.70"

View File

@ -0,0 +1,2 @@
# Filesystem Provider
The filesystem storage provider stores files in a directory. A database is still required when using this provider since the only thing thats stored on the filesystem are the layer's binary data.

5
docs/todo.md Normal file
View File

@ -0,0 +1,5 @@
- [ ] slashes in repo names
- [ ] Simple auth
- [ ] postgresql
- [ ] prometheus metrics
- [ ] streaming layer bytes into providers

2
rust-toolchain.toml Normal file
View File

@ -0,0 +1,2 @@
[toolchain]
channel = "stable"

View File

@ -3,28 +3,33 @@ use actix_web::{HttpResponse, get, HttpRequest, web, head, delete};
use crate::app_state::AppState; use crate::app_state::AppState;
use crate::database::Database; use crate::database::Database;
use crate::storage::filesystem::FilesystemDriver;
#[head("/{digest}")] #[head("/{digest}")]
pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse { pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned()); let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned());
let database = &state.database; let storage = state.storage.lock().await;
if let Some(bytes) = database.get_digest(&layer_digest).await.unwrap() {
HttpResponse::Ok() if storage.has_digest(&layer_digest).await.unwrap() {
.insert_header(("Content-Length", bytes.len())) if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() {
.insert_header(("Docker-Content-Digest", layer_digest)) return HttpResponse::Ok()
.finish() .insert_header(("Content-Length", size))
} else { .insert_header(("Docker-Content-Digest", layer_digest))
HttpResponse::NotFound().finish() .finish();
}
} }
HttpResponse::NotFound()
.finish()
} }
#[get("/{digest}")] #[get("/{digest}")]
pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse { pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned()); let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned());
let database = &state.database; let storage = state.storage.lock().await;
if let Some(bytes) = database.get_digest(&layer_digest).await.unwrap() { if let Some(bytes) = storage.get_digest(&layer_digest).await.unwrap() {
HttpResponse::Ok() HttpResponse::Ok()
.insert_header(("Content-Length", bytes.len())) .insert_header(("Content-Length", bytes.len()))
.insert_header(("Docker-Content-Digest", layer_digest)) .insert_header(("Docker-Content-Digest", layer_digest))

View File

@ -1,11 +1,13 @@
use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get}; use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete, get};
use bytes::{BytesMut, Bytes, BufMut}; use bytes::{BytesMut, Bytes, BufMut};
use qstring::QString; use qstring::QString;
use tokio::io::AsyncWriteExt;
use tracing::{debug}; use tracing::{debug};
use crate::app_state::AppState; use crate::app_state::AppState;
use crate::database::Database; use crate::database::Database;
use crate::storage::filesystem::FilesystemDriver;
/// Starting an upload /// Starting an upload
#[post("/")] #[post("/")]
@ -31,25 +33,21 @@ pub async fn chunked_upload_layer(body: Bytes, path: web::Path<(String, String)>
debug!("Read body of size: {}", body.len()); debug!("Read body of size: {}", body.len());
let database = &state.database; let storage = state.storage.lock().await;
let (starting, ending) = match database.get_digest(&layer_uuid).await.unwrap() {
Some(current_bytes) => {
let mut combined = BytesMut::new();
let body_size = body.len();
let current_size = current_bytes.len();
combined.put(current_bytes); let current_size = storage.digest_length(&layer_uuid).await.unwrap();
combined.put(body); let (starting, ending) = if let Some(current_size) = current_size {
let body_size = body.len();
database.save_digest(&layer_uuid, &combined.into()).await.unwrap(); storage.save_digest(&layer_uuid, &body, true).await.unwrap();
(current_size, current_size + body_size) (current_size, current_size + body_size)
}, } else {
None => { let body_size = body.len();
let body_size = body.len();
database.save_digest(&layer_uuid, &body.into()).await.unwrap(); storage.save_digest(&layer_uuid, &body, true).await.unwrap();
(0, body_size)
} (0, body_size)
}; };
debug!("s={}, e={}, uuid={}, uri={}", starting, ending, layer_uuid, full_uri); debug!("s={}, e={}, uuid={}, uri={}", starting, ending, layer_uuid, full_uri);
@ -69,20 +67,15 @@ pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String)
let qs = QString::from(req.query_string()); let qs = QString::from(req.query_string());
let digest = qs.get("digest").unwrap(); let digest = qs.get("digest").unwrap();
let database = &state.database; let storage = state.storage.lock().await;
if !body.is_empty() { if !body.is_empty() {
let current_bytes = database.get_digest(&layer_uuid).await.unwrap().unwrap(); storage.save_digest(&layer_uuid, &body, true).await.unwrap();
let mut combined = BytesMut::new();
combined.put(current_bytes);
combined.put(body);
database.save_digest(&layer_uuid, &combined.into()).await.unwrap();
} else { } else {
// TODO: Validate layer with all digest params // TODO: Validate layer with all digest params
} }
database.replace_digest(&layer_uuid, &digest).await.unwrap(); storage.replace_digest(&layer_uuid, &digest).await.unwrap();
debug!("Completed upload, finished uuid {} to digest {}", layer_uuid, digest);
HttpResponse::Created() HttpResponse::Created()
.insert_header(("Location", format!("/v2/{}/blobs/{}", name, digest))) .insert_header(("Location", format!("/v2/{}/blobs/{}", name, digest)))
@ -95,8 +88,8 @@ pub async fn finish_chunked_upload(body: Bytes, path: web::Path<(String, String)
pub async fn cancel_upload(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse { pub async fn cancel_upload(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); let (_name, layer_uuid) = (path.0.to_owned(), path.1.to_owned());
let database = &state.database; let storage = state.storage.lock().await;
database.delete_digest(&layer_uuid).await.unwrap(); storage.delete_digest(&layer_uuid).await.unwrap();
// I'm not sure what this response should be, its not specified in the registry spec. // I'm not sure what this response should be, its not specified in the registry spec.
HttpResponse::Ok() HttpResponse::Ok()
@ -107,15 +100,8 @@ pub async fn cancel_upload(path: web::Path<(String, String)>, state: web::Data<A
pub async fn check_upload_status(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse { pub async fn check_upload_status(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (name, layer_uuid) = (path.0.to_owned(), path.1.to_owned()); let (name, layer_uuid) = (path.0.to_owned(), path.1.to_owned());
let database = &state.database; let storage = state.storage.lock().await;
let ending = match database.get_digest(&layer_uuid).await.unwrap() { let ending = storage.digest_length(&layer_uuid).await.unwrap().unwrap_or(0);
Some(current_bytes) => {
current_bytes.len()
},
None => {
0
}
};
HttpResponse::Created() HttpResponse::Created()
.insert_header(("Location", format!("/v2/{}/blobs/uploads/{}", name, layer_uuid))) .insert_header(("Location", format!("/v2/{}/blobs/uploads/{}", name, layer_uuid)))

View File

@ -1,13 +1,22 @@
use sqlx::{Sqlite, Pool}; use sqlx::{Sqlite, Pool};
use crate::storage::StorageDriver;
use tokio::sync::Mutex;
pub struct AppState { pub struct AppState {
pub database: Pool<Sqlite>, pub database: Pool<Sqlite>,
pub storage: Mutex<Box<dyn StorageDriver>>,
} }
impl AppState { impl AppState {
pub fn new(database: Pool<Sqlite>) -> Self { pub fn new/* <S> */(database: Pool<Sqlite>, storage: Mutex<Box<dyn StorageDriver>>) -> Self
/* where
S: StorageDriver, */
{
Self { Self {
database, database,
storage
} }
} }
} }

View File

@ -17,7 +17,7 @@ pub trait Database {
/// Create the tables in the database /// Create the tables in the database
async fn create_schema(&self) -> sqlx::Result<()>; async fn create_schema(&self) -> sqlx::Result<()>;
/// Get the digest bytes /// Get the digest bytes
async fn get_digest(&self, digest: &str) -> sqlx::Result<Option<Bytes>>; /* async fn get_digest(&self, digest: &str) -> sqlx::Result<Option<Bytes>>;
/// Get the length of the digest /// Get the length of the digest
async fn digest_length(&self, digest: &str) -> sqlx::Result<usize>; async fn digest_length(&self, digest: &str) -> sqlx::Result<usize>;
/// Save digest bytes /// Save digest bytes
@ -25,9 +25,7 @@ pub trait Database {
/// Delete digest /// Delete digest
async fn delete_digest(&self, digest: &str) -> sqlx::Result<()>; async fn delete_digest(&self, digest: &str) -> sqlx::Result<()>;
/// Replace the uuid with a digest /// Replace the uuid with a digest
async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>; async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>; */
async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>;
async fn unlink_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>;
// Tag related functions // Tag related functions
@ -48,7 +46,10 @@ pub trait Database {
/// Save a manifest's content. /// Save a manifest's content.
async fn save_manifest(&self, repository: &str, digest: &str, content: &str) -> sqlx::Result<()>; async fn save_manifest(&self, repository: &str, digest: &str, content: &str) -> sqlx::Result<()>;
/// Delete a manifest /// Delete a manifest
async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<()>; /// Returns digests that this manifest pointed to.
async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<Vec<String>>;
async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>;
async fn unlink_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>;
// Repository related functions // Repository related functions
@ -70,7 +71,7 @@ impl Database for Pool<Sqlite> {
Ok(()) Ok(())
} }
async fn get_digest(&self, digest: &str) -> sqlx::Result<Option<Bytes>> { /* async fn get_digest(&self, digest: &str) -> sqlx::Result<Option<Bytes>> {
// Handle RowNotFound errors // Handle RowNotFound errors
let row: (Vec<u8>, ) = match sqlx::query_as("SELECT blob FROM layer_blobs WHERE digest = ?") let row: (Vec<u8>, ) = match sqlx::query_as("SELECT blob FROM layer_blobs WHERE digest = ?")
.bind(digest) .bind(digest)
@ -134,7 +135,7 @@ impl Database for Pool<Sqlite> {
debug!("Replaced digest uuid {} to digest {}", uuid, new_digest); debug!("Replaced digest uuid {} to digest {}", uuid, new_digest);
Ok(()) Ok(())
} } */
async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()> { async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()> {
sqlx::query("INSERT INTO manifest_layers(manifest, layer_digest) VALUES (?, ?)") sqlx::query("INSERT INTO manifest_layers(manifest, layer_digest) VALUES (?, ?)")
@ -269,7 +270,7 @@ impl Database for Pool<Sqlite> {
Ok(()) Ok(())
} }
async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<()> { async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<Vec<String>> {
sqlx::query("DELETE FROM image_manifests where digest = ? AND repository = ?") sqlx::query("DELETE FROM image_manifests where digest = ? AND repository = ?")
.bind(digest) .bind(digest)
.bind(repository) .bind(repository)
@ -283,11 +284,8 @@ impl Database for Pool<Sqlite> {
debug!("Unlinked manifest {} from all linked layers", digest); debug!("Unlinked manifest {} from all linked layers", digest);
for row in rows.into_iter() {
let layer_digest = row.0;
self.delete_digest(&layer_digest).await?; let digests = rows.into_iter().map(|r| r.0).collect();
}
debug!("Deleted all digests for manifest {}", digest); debug!("Deleted all digests for manifest {}", digest);
@ -297,7 +295,7 @@ impl Database for Pool<Sqlite> {
debug!("Deleted all image tags for manifest {}", digest); debug!("Deleted all image tags for manifest {}", digest);
Ok(()) Ok(digests)
} }
async fn save_repository(&self, repository: &str) -> sqlx::Result<()> { async fn save_repository(&self, repository: &str) -> sqlx::Result<()> {

View File

@ -2,16 +2,21 @@ mod api;
mod app_state; mod app_state;
mod database; mod database;
mod dto; mod dto;
mod storage;
use actix_web::{web, App, HttpServer}; use actix_web::{web, App, HttpServer};
use actix_web::middleware::Logger; use actix_web::middleware::Logger;
use sqlx::sqlite::SqlitePoolOptions; use sqlx::sqlite::SqlitePoolOptions;
use tokio::sync::Mutex;
use tracing::{debug, Level}; use tracing::{debug, Level};
use app_state::AppState; use app_state::AppState;
use database::Database; use database::Database;
use crate::storage::StorageDriver;
use crate::storage::filesystem::FilesystemDriver;
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let pool = SqlitePoolOptions::new() let pool = SqlitePoolOptions::new()
@ -22,7 +27,9 @@ async fn main() -> std::io::Result<()> {
//let db_conn: Mutex<dyn Database> = Mutex::new(SqliteConnection::establish("test.db").unwrap()); //let db_conn: Mutex<dyn Database> = Mutex::new(SqliteConnection::establish("test.db").unwrap());
//let db = Mutex::new(Database::new_sqlite_connection("test.db").unwrap()); //let db = Mutex::new(Database::new_sqlite_connection("test.db").unwrap());
let state = web::Data::new(AppState::new(pool)); let storage_driver: Mutex<Box<dyn StorageDriver>> = Mutex::new(Box::new(FilesystemDriver::new("registry/blobs")));
let state = web::Data::new(AppState::new(pool, storage_driver));
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_max_level(Level::DEBUG) .with_max_level(Level::DEBUG)

107
src/storage/filesystem.rs Normal file
View File

@ -0,0 +1,107 @@
use std::{path::Path, io::ErrorKind};
use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking};
use tracing::debug;
use super::StorageDriver;
pub struct FilesystemDriver {
storage_path: String,
}
impl FilesystemDriver {
pub fn new(storage_path: &str) -> Self {
Self {
storage_path: storage_path.to_string(),
}
}
fn get_digest_path(&self, digest: &str) -> String {
format!("{}/{}", self.storage_path, digest)
}
}
#[async_trait]
impl StorageDriver for FilesystemDriver {
async fn has_digest(&self, digest: &str) -> anyhow::Result<bool> {
let path = self.get_digest_path(digest);
spawn_blocking(move || {
return Path::new(&path).exists()
}).await.context("FilesystemDriver: Failure to spawn blocking thread to check digest")
}
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>> {
let mut file = match fs::File::open(self.get_digest_path(digest))
.await {
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
return Ok(None)
},
_ => {
return Err(e)
.context("FilesystemDriver: Failure to open digest file");
}
}
};
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
Ok(Some(Bytes::from_iter(buf)))
}
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>> {
let file = match fs::File::open(self.get_digest_path(digest))
.await {
Ok(f) => f,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
return Ok(None)
},
_ => {
return Err(e)
.context("FilesystemDriver: Failure to open digest file");
}
}
};
Ok(Some(file.metadata().await?.len() as usize))
}
async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()> {
let path = self.get_digest_path(digest);
let mut file = fs::OpenOptions::new()
.write(true)
.append(append)
.create(true)
.open(path).await?;
file.write_all(&bytes).await?;
Ok(())
}
async fn delete_digest(&self, digest: &str) -> anyhow::Result<()> {
let path = self.get_digest_path(digest);
fs::remove_file(path).await?;
Ok(())
}
async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()> {
let path = self.get_digest_path(uuid);
let path = Path::new(&path);
let parent = path.clone().parent().unwrap();
fs::rename(path, format!("{}/{}", parent.as_os_str().to_str().unwrap(), digest)).await?;
Ok(())
}
}

15
src/storage/mod.rs Normal file
View File

@ -0,0 +1,15 @@
pub mod filesystem;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::io::{AsyncWrite, AsyncRead};
#[async_trait]
pub trait StorageDriver: Send/* : AsyncWrite + AsyncRead */ {
async fn has_digest(&self, digest: &str) -> anyhow::Result<bool>;
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>>;
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>>;
async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>;
async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>;
async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>;
}