From 16da8aa190bdbfbfe07b7e6c61fe9533fe67f7ec Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Mon, 17 Apr 2023 23:18:56 -0400 Subject: [PATCH] Delete manifests, list tags --- src/api/manifests.rs | 23 +++++++++++- src/api/mod.rs | 1 + src/api/tags.rs | 65 ++++++++++++++++++++++++++++++++++ src/database/mod.rs | 83 +++++++++++++++++++++++++++++++++++++------- src/main.rs | 5 +++ 5 files changed, 164 insertions(+), 13 deletions(-) create mode 100644 src/api/tags.rs diff --git a/src/api/manifests.rs b/src/api/manifests.rs index b7f0250..e15e8aa 100644 --- a/src/api/manifests.rs +++ b/src/api/manifests.rs @@ -1,4 +1,4 @@ -use actix_web::{HttpResponse, web, put, get, head}; +use actix_web::{HttpResponse, web, put, get, head, delete, HttpRequest}; use tracing::log::warn; use tracing::{debug, info}; @@ -122,4 +122,25 @@ pub async fn manifest_exists(path: web::Path<(String, String)>, state: web::Data .append_header(("Content-Length", manifest_content.len())) .append_header(("Docker-Distribution-API-Version", "registry/2.0")) .body(manifest_content) +} + +#[delete("/{reference}")] +pub async fn delete_manifest(path: web::Path<(String, String)>, req: HttpRequest, state: web::Data) -> HttpResponse { + let (name, reference) = (path.0.to_owned(), path.1.to_owned()); + + let headers = req.headers(); + let _authorization = headers.get("Authorization").unwrap(); // TODO: + + let database = &state.database; + + // If `reference` is a digest, then we're deleting a manifest, else a tag + if Digest::is_digest(&reference) { + database.delete_manifest(&name, &reference).await.unwrap(); + } else { + database.delete_tag(&name, &reference).await.unwrap(); + } + + HttpResponse::Accepted() + .append_header(("Content-Length", "None")) + .finish() } \ No newline at end of file diff --git a/src/api/mod.rs b/src/api/mod.rs index d7915e5..6bfd15c 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,6 +1,7 @@ pub mod blobs; pub mod uploads; pub mod manifests; +pub mod tags; use actix_web::{HttpResponse, get}; diff --git a/src/api/tags.rs b/src/api/tags.rs new file mode 100644 index 0000000..6f44fa7 --- /dev/null +++ b/src/api/tags.rs @@ -0,0 +1,65 @@ +use actix_web::{HttpResponse, web, get, HttpRequest}; +use qstring::QString; +use serde::{Serialize, Deserialize}; + +use crate::{app_state::AppState, database::Database}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TagList { + name: String, + tags: Vec, +} + +#[get("/list")] +pub async fn list_tags(path: web::Path<(String, )>, req: HttpRequest, state: web::Data) -> HttpResponse { + let name = path.0.to_owned(); + + // Get limit and last tag from query params if present. + let qs = QString::from(req.query_string()); + let limit = qs.get("n"); + let last_tag = qs.get("last"); + + let mut link_header = None; + + // Paginate tag results if n was specified, else just pull everything. + let database = &state.database; + let tags = match limit { + Some(limit) => { + let limit: u32 = limit.parse().unwrap(); + + let last_tag = last_tag.and_then(|t| Some(t.to_string())); + + // Construct the link header + let mut url = format!("/v2/{}/tags/list?n={}", name, limit); + if let Some(last_tag) = last_tag.clone() { + url += &format!("&limit={}", last_tag); + } + url += ";rel=\"next\""; + link_header = Some(url); + + database.list_repository_tags_page(&name, limit, last_tag).await.unwrap() + }, + None => { + let database = &state.database; + database.list_repository_tags(&name).await.unwrap() + } + }; + + // Convert the `Vec` to a `TagList` which will be serialized to json. + let tag_list = TagList { + name, + tags: tags.into_iter().map(|t| t.name).collect(), + }; + let response_body = serde_json::to_string(&tag_list).unwrap(); + + // Construct the response, optionally adding the Link header if it was constructed. + let mut resp = HttpResponse::Ok(); + resp.append_header(("Content-Type", "application/json")); + + if let Some(link_header) = link_header { + resp.append_header(("Link", link_header)); + } + + resp.body(response_body) +} \ No newline at end of file diff --git a/src/database/mod.rs b/src/database/mod.rs index d6a8ddf..ece5768 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -16,12 +16,10 @@ pub trait Database { /// Create the tables in the database async fn create_schema(&self) -> sqlx::Result<()>; - /// Check if the database is storing the digest. - async fn has_digest(&self, digest: &str) -> bool; /// Get the digest bytes async fn get_digest(&self, digest: &str) -> sqlx::Result>; /// Get the length of the digest - async fn digest_length(&self, digest: &str) -> usize; + async fn digest_length(&self, digest: &str) -> sqlx::Result; /// Save digest bytes async fn save_digest(&self, digest: &str, bytes: &Bytes) -> sqlx::Result<()>; /// Delete digest @@ -29,10 +27,13 @@ pub trait Database { /// Replace the uuid with a digest async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>; async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>; - async fn unlink_manifest_layer(&self, repository: &str, layer_digest: &str); + async fn unlink_manifest_layer(&self, repository: &str, layer_digest: &str) -> sqlx::Result<()>; // Tag related functions + /// Get tags associated with a repository + async fn list_repository_tags(&self, repository: &str,) -> sqlx::Result>; + async fn list_repository_tags_page(&self, repository: &str, limit: u32, last_tag: Option) -> sqlx::Result>; /// Get a manifest digest using the tag name. async fn get_tag(&self, repository: &str, tag: &str) -> sqlx::Result>; /// Save a tag and reference it to the manifest digest. @@ -53,6 +54,8 @@ pub trait Database { /// Create a repository async fn save_repository(&self, repository: &str) -> sqlx::Result<()>; + /// List all repositories + async fn list_repositories(&self) -> sqlx::Result>; } #[async_trait] @@ -66,10 +69,6 @@ impl Database for Pool { Ok(()) } - async fn has_digest(&self, _digest: &str) -> bool { - todo!() - } - async fn get_digest(&self, digest: &str) -> sqlx::Result> { // Handle RowNotFound errors let row: (Vec, ) = match sqlx::query_as("SELECT blob FROM layer_blobs WHERE digest = ?") @@ -93,8 +92,12 @@ impl Database for Pool { Ok(Some(bytes)) } - async fn digest_length(&self, _digest: &str) -> usize { - todo!() + async fn digest_length(&self, digest: &str) -> sqlx::Result { + let row: (i64, ) = sqlx::query_as("SELECT length(blob) FROM layer_blobs WHERE digest = ?") + .bind(digest) + .fetch_one(self).await?; + + Ok(row.0 as usize) } async fn save_digest(&self, digest: &str, bytes: &Bytes) -> sqlx::Result<()> { @@ -143,10 +146,57 @@ impl Database for Pool { Ok(()) } - async fn unlink_manifest_layer(&self, _repository: &str, _layer_digest: &str) { - todo!() + async fn unlink_manifest_layer(&self, repository: &str, layer_digest: &str) -> sqlx::Result<()> { + sqlx::query("DELETE FROM manifest_layers WHERE layer_digest = ? AND manifest IN (SELECT digest FROM image_manifests WHERE repository = ?) RETURNING manifest, layer_digest") + .bind(layer_digest) + .bind(repository) + .execute(self).await?; + + debug!("Removed link of layer {} from manifest in {} repository", layer_digest, repository); + + Ok(()) } + async fn list_repository_tags(&self, repository: &str,) -> sqlx::Result> { + let rows: Vec<(String, String, i64, )> = sqlx::query_as("SELECT name, image_manifest, last_updated FROM image_tags WHERE repository = ?") + .bind(repository) + .fetch_all(self).await?; + + // Convert the rows into `Tag` + let tags: Vec = rows.into_iter().map(|row| { + let last_updated: DateTime = DateTime::from_utc(NaiveDateTime::from_timestamp_opt(row.2, 0).unwrap(), Utc); + Tag::new(row.0, repository.to_string(), last_updated, row.1) + }).collect(); + + Ok(tags) + } + + async fn list_repository_tags_page(&self, repository: &str, limit: u32, last_tag: Option) -> sqlx::Result> { + // Query differently depending on if `last_tag` was specified + let rows: Vec<(String, String, i64, )> = match last_tag { + Some(last_tag) => { + sqlx::query_as("SELECT name, image_manifest, last_updated FROM image_tags WHERE repository = ? AND name > ? ORDER BY name LIMIT ?") + .bind(repository) + .bind(last_tag) + .bind(limit) + .fetch_all(self).await? + }, + None => { + sqlx::query_as("SELECT name, image_manifest, last_updated FROM image_tags WHERE repository = ? ORDER BY name LIMIT ?") + .bind(repository) + .bind(limit) + .fetch_all(self).await? + } + }; + + // Convert the rows into `Tag` + let tags: Vec = rows.into_iter().map(|row| { + let last_updated: DateTime = DateTime::from_utc(NaiveDateTime::from_timestamp_opt(row.2, 0).unwrap(), Utc); + Tag::new(row.0, repository.to_string(), last_updated, row.1) + }).collect(); + + Ok(tags) + } async fn get_tag(&self, repository: &str, tag: &str) -> sqlx::Result> { let row: (String, i64, ) = match sqlx::query_as("SELECT image_manifest, last_updated FROM image_tags WHERE name = ? AND repository = ?") @@ -234,4 +284,13 @@ impl Database for Pool { Ok(()) } + + async fn list_repositories(&self) -> sqlx::Result> { + let repos: Vec<(String, )> = sqlx::query_as("SELECT name FROM repositories") + .fetch_all(self).await?; + // Move out of repos + let repos = repos.into_iter().map(|row| row.0).collect(); + + Ok(repos) + } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index eb6d1fa..8510733 100644 --- a/src/main.rs +++ b/src/main.rs @@ -42,11 +42,16 @@ async fn main() -> std::io::Result<()> { .service(api::version_check) .service( web::scope("/{name}") + .service( + web::scope("/tags") + .service(api::tags::list_tags) + ) .service( web::scope("/manifests") .service(api::manifests::upload_manifest) .service(api::manifests::pull_manifest) .service(api::manifests::manifest_exists) + .service(api::manifests::delete_manifest) ) .service( web::scope("/blobs")