Implement image pulling and pushing

This commit is contained in:
SeanOMik 2023-04-17 21:47:24 -04:00
parent d7390da019
commit 38c83f4ac7
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
12 changed files with 470 additions and 53 deletions

62
Cargo.lock generated
View File

@ -286,6 +286,15 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "block-buffer"
version = "0.10.3"
@ -579,13 +588,22 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "digest"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c"
dependencies = [
"block-buffer",
"block-buffer 0.10.3",
"crypto-common",
"subtle",
]
@ -605,6 +623,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"sha256",
"sqlx",
"tokio",
"tracing",
@ -865,7 +884,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
"digest 0.10.5",
]
[[package]]
@ -989,7 +1008,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"sha2",
"sha2 0.10.6",
]
[[package]]
@ -1161,6 +1180,12 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "os_str_bytes"
version = "6.4.0"
@ -1520,7 +1545,20 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"digest 0.10.5",
]
[[package]]
name = "sha2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
dependencies = [
"block-buffer 0.9.0",
"cfg-if",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
]
[[package]]
@ -1531,7 +1569,17 @@ checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
"digest 0.10.5",
]
[[package]]
name = "sha256"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "328169f167261957e83d82be47f9e36629e257c62308129033d7f7e7c173d180"
dependencies = [
"hex",
"sha2 0.9.9",
]
[[package]]
@ -1648,7 +1696,7 @@ dependencies = [
"percent-encoding",
"rustls",
"rustls-pemfile",
"sha2",
"sha2 0.10.6",
"smallvec",
"sqlformat",
"sqlx-rt",
@ -1671,7 +1719,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote 1.0.26",
"sha2",
"sha2 0.10.6",
"sqlx-core",
"sqlx-rt",
"syn 1.0.109",

View File

@ -30,3 +30,4 @@ jws = "0.2.7"
async-trait = "0.1.68"
futures = "0.3.28"
qstring = "0.7.2"
sha256 = "1.1.2"

15
manifest.json Normal file
View File

@ -0,0 +1,15 @@
{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"digest": "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 3370629,
"digest": "sha256:8921db27df2831fa6eaa85321205a2470c669b855f3ec95d5a3c2b46de0442c9"
}
]
}

View File

@ -20,7 +20,23 @@ pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data<A
}
}
#[get("/{digest}")]
pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned());
let database = &state.database;
if let Some(bytes) = database.get_digest(&layer_digest).await.unwrap() {
HttpResponse::Ok()
.insert_header(("Content-Length", bytes.len()))
.insert_header(("Docker-Content-Digest", layer_digest))
.body(bytes)
} else {
HttpResponse::NotFound()
.finish()
}
}
#[delete("/{digest}")]
pub async fn delete_layer(req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
pub async fn delete_digest(req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
todo!()
}

View File

@ -1,16 +1,125 @@
use actix_web::{HttpResponse, HttpRequest, web, put};
use tracing::{debug, trace};
use actix_web::{HttpResponse, HttpRequest, web, put, get, head};
use tracing::log::warn;
use tracing::{debug, trace, info};
use crate::app_state::AppState;
use crate::database::Database;
use crate::dto::digest::Digest;
use crate::dto::manifest::{Manifest, ImageManifest};
#[put("/{reference}")]
pub async fn upload_manifest(path: web::Path<(String, String)>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned());
pub async fn upload_manifest(path: web::Path<(String, String)>, body: String, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let (name, reference) = (path.0.to_owned(), path.1.to_owned());
// Calculate the sha256 digest for the manifest.
let calculated_hash = sha256::digest(body.clone());
let calculated_digest = format!("sha256:{}", calculated_hash);
let database = &state.database;
todo!()
// Create the image repository and save the image manifest.
database.save_repository(&name).await.unwrap();
database.save_manifest(&name, &calculated_digest, &body).await.unwrap();
// If the reference is not a digest, then it must be a tag name.
if !Digest::is_digest(&reference) {
database.save_tag(&name, &reference, &calculated_digest).await.unwrap();
}
info!("Saved manifest {}", calculated_digest);
match serde_json::from_str(&body).unwrap() {
Manifest::Image(image) => {
// Link the manifest to the image layer
database.link_manifest_layer(&calculated_digest, &image.config.digest).await.unwrap();
debug!("Linked manifest {} to layer {}", calculated_digest, image.config.digest);
for layer in image.layers {
database.link_manifest_layer(&calculated_digest, &layer.digest).await.unwrap();
debug!("Linked manifest {} to layer {}", calculated_digest, image.config.digest);
}
HttpResponse::Created()
.append_header(("Docker-Content-Digest", calculated_digest))
.finish()
},
Manifest::List(_list) => {
warn!("ManifestList request was received!");
HttpResponse::NotImplemented()
.finish()
}
}
}
#[get("/{reference}")]
pub async fn pull_manifest(path: web::Path<(String, String)>, req: HttpRequest, state: web::Data<AppState>) -> HttpResponse {
let (name, reference) = (path.0.to_owned(), path.1.to_owned());
let database = &state.database;
let digest = match Digest::is_digest(&reference) {
true => reference.clone(),
false => {
debug!("Attempting to get manifest digest using tag (name={}, reference={})", name, reference);
if let Some(tag) = database.get_tag(&name, &reference).await.unwrap() {
tag.manifest_digest
} else {
return HttpResponse::NotFound()
.finish();
}
}
};
let manifest_content = database.get_manifest(&name, &digest).await.unwrap();
if manifest_content.is_none() {
debug!("Failed to get manifest in repo {}, for digest {}", name, digest);
// The digest that was provided in the request was invalid.
// NOTE: This could also mean that there's a bug and the tag pointed to an invalid manifest.
return HttpResponse::NotFound()
.finish();
}
let manifest_content = manifest_content.unwrap();
HttpResponse::Ok()
.append_header(("Docker-Content-Digest", digest))
.append_header(("Content-Type", "application/vnd.docker.distribution.manifest.v2+json"))
.append_header(("Accept", "application/vnd.docker.distribution.manifest.v2+json"))
.append_header(("Docker-Distribution-API-Version", "registry/2.0"))
.body(manifest_content)
}
#[head("/{reference}")]
pub async fn manifest_exists(path: web::Path<(String, String)>, state: web::Data<AppState>) -> HttpResponse {
let (name, reference) = (path.0.to_owned(), path.1.to_owned());
// Get the digest from the reference path.
let database = &state.database;
let digest = match Digest::is_digest(&reference) {
true => reference.clone(),
false => {
if let Some(tag) = database.get_tag(&name, &reference).await.unwrap() {
tag.manifest_digest
} else {
return HttpResponse::NotFound()
.finish();
}
}
};
let manifest_content = database.get_manifest(&name, &digest).await.unwrap();
if manifest_content.is_none() {
// The digest that was provided in the request was invalid.
// NOTE: This could also mean that there's a bug and the tag pointed to an invalid manifest.
return HttpResponse::NotFound()
.finish();
}
let manifest_content = manifest_content.unwrap();
HttpResponse::Ok()
.append_header(("Docker-Content-Digest", digest))
.append_header(("Content-Type", "application/vnd.docker.distribution.manifest.v2+json"))
.append_header(("Content-Length", manifest_content.len()))
.append_header(("Docker-Distribution-API-Version", "registry/2.0"))
.body(manifest_content)
}

View File

@ -1,9 +1,7 @@
use actix_web::{HttpResponse, get, HttpRequest, post, web, patch, put, delete};
use actix_web::{HttpResponse, HttpRequest, post, web, patch, put, delete};
use bytes::{BytesMut, Bytes, BufMut};
use qstring::QString;
use tracing::{debug, trace};
use serde::Deserialize;
use futures::StreamExt;
use tracing::{debug};
use crate::app_state::AppState;

View File

@ -4,9 +4,17 @@ use async_trait::async_trait;
use bytes::Bytes;
use sqlx::{sqlite::SqliteConnection, Sqlite, Pool};
use tokio::sync::Mutex;
use tracing::debug;
use chrono::{DateTime, Utc, NaiveDateTime};
use crate::dto::Tag;
#[async_trait]
pub trait Database {
// Digest related functions
/// Create the tables in the database
async fn create_schema(&self) -> sqlx::Result<()>;
/// Check if the database is storing the digest.
@ -21,8 +29,31 @@ pub trait Database {
async fn delete_digest(&self, digest: &str) -> sqlx::Result<()>;
/// Replace the uuid with a digest
async fn replace_digest(&self, uuid: &str, new_digest: &str) -> sqlx::Result<()>;
async fn associate_manifest_blob(&self, manifest_digest: &str, layer_digest: &str);
async fn disassociate_manifest_blob(&self, repository: &str, layer_digest: &str);
async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()>;
async fn unlink_manifest_layer(&self, repository: &str, layer_digest: &str);
// Tag related functions
/// Get a manifest digest using the tag name.
async fn get_tag(&self, repository: &str, tag: &str) -> sqlx::Result<Option<Tag>>;
/// Save a tag and reference it to the manifest digest.
async fn save_tag(&self, repository: &str, tag: &str, manifest_digest: &str) -> sqlx::Result<()>;
/// Delete a tag.
async fn delete_tag(&self, repository: &str, tag: &str) -> sqlx::Result<()>;
// Manifest related functions
/// Get a manifest's content.
async fn get_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<Option<String>>;
/// Save a manifest's content.
async fn save_manifest(&self, repository: &str, digest: &str, content: &str) -> sqlx::Result<()>;
/// Delete a manifest
async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<()>;
// Repository related functions
/// Create a repository
async fn save_repository(&self, repository: &str) -> sqlx::Result<()>;
}
#[async_trait]
@ -31,6 +62,8 @@ impl Database for Pool<Sqlite> {
sqlx::query(include_str!("schemas/schema.sql"))
.execute(self).await?;
debug!("Created database schema");
Ok(())
}
@ -56,6 +89,8 @@ impl Database for Pool<Sqlite> {
let bytes = Bytes::from(row.0);
debug!("Got digest {}, {} bytes", digest, bytes.len());
Ok(Some(bytes))
}
@ -64,6 +99,7 @@ impl Database for Pool<Sqlite> {
}
async fn save_digest(&self, digest: &str, bytes: &Bytes) -> sqlx::Result<()> {
let bytes_len = bytes.len();
let bytes = bytes.bytes().map(|b| b.unwrap()).collect::<Vec<u8>>();
sqlx::query("INSERT INTO layer_blobs (digest, blob) VALUES (?, ?)")
@ -71,6 +107,8 @@ impl Database for Pool<Sqlite> {
.bind(bytes)
.execute(self).await?;
debug!("Saved digest {}, {} bytes", digest, bytes_len);
Ok(())
}
@ -79,6 +117,8 @@ impl Database for Pool<Sqlite> {
.bind(digest)
.execute(self).await?;
debug!("Deleted digest {}", digest);
Ok(())
}
@ -88,47 +128,111 @@ impl Database for Pool<Sqlite> {
.bind(uuid)
.execute(self).await?;
debug!("Replaced digest uuid {} to digest {}", uuid, new_digest);
Ok(())
}
async fn associate_manifest_blob(&self, manifest_digest: &str, layer_digest: &str) {
async fn link_manifest_layer(&self, manifest_digest: &str, layer_digest: &str) -> sqlx::Result<()> {
sqlx::query("INSERT INTO manifest_layers(manifest, layer_digest) VALUES (?, ?)")
.bind(manifest_digest)
.bind(layer_digest)
.execute(self).await?;
debug!("Linked manifest {} to layer {}", manifest_digest, layer_digest);
Ok(())
}
async fn unlink_manifest_layer(&self, repository: &str, layer_digest: &str) {
todo!()
}
async fn disassociate_manifest_blob(&self, repository: &str, layer_digest: &str) {
todo!()
async fn get_tag(&self, repository: &str, tag: &str) -> sqlx::Result<Option<Tag>> {
let row: (String, i64, ) = match sqlx::query_as("SELECT image_manifest, last_updated FROM image_tags WHERE name = ? AND repository = ?")
.bind(tag)
.bind(repository)
.fetch_one(self).await {
Ok(row) => row,
Err(e) => match e {
sqlx::Error::RowNotFound => {
return Ok(None)
},
_ => {
return Err(e);
}
}
};
let last_updated: DateTime<Utc> = DateTime::from_utc(NaiveDateTime::from_timestamp_opt(row.1, 0).unwrap(), Utc);
Ok(Some(Tag::new(tag.to_string(), repository.to_string(), last_updated, row.0)))
}
}
async fn save_tag(&self, repository: &str, tag: &str, digest: &str) -> sqlx::Result<()> {
sqlx::query("INSERT INTO image_tags (name, repository, image_manifest, last_updated) VALUES (?, ?, ?, ?)")
.bind(tag)
.bind(repository)
.bind(digest)
.bind(chrono::Utc::now().timestamp())
.execute(self).await?;
/* pub enum DatabaseConnection {
Sqlite(SqliteConnection),
Postgres(PgConnection),
}
pub struct Database {
connection: DatabaseConnection,
}
impl Database {
pub fn from_connection(connection: DatabaseConnection) -> Self {
Self {
connection,
}
Ok(())
}
pub fn new_sqlite_connection(url: &str) -> ConnectionResult<Self> {
let connection = DatabaseConnection::Sqlite(SqliteConnection::establish(url)?);
async fn delete_tag(&self, repository: &str, tag: &str) -> sqlx::Result<()> {
sqlx::query("DELETE FROM image_tags WHERE name = ? AND repository = ?")
.bind(tag)
.bind(repository)
.execute(self).await?;
Ok(Self {
connection,
})
Ok(())
}
pub fn new_postgres_connection(url: &str) -> ConnectionResult<Self> {
let connection = DatabaseConnection::Postgres(PgConnection::establish(url)?);
async fn get_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<Option<String>> {
let row: (String, ) = match sqlx::query_as("SELECT content FROM image_manifests where digest = ? AND repository = ?")
.bind(digest)
.bind(repository)
.fetch_one(self).await {
Ok(row) => row,
Err(e) => match e {
sqlx::Error::RowNotFound => {
return Ok(None)
},
_ => {
return Err(e);
}
}
};
Ok(Self {
connection,
})
Ok(Some(row.0))
}
} */
async fn save_manifest(&self, repository: &str, digest: &str, manifest: &str) -> sqlx::Result<()> {
sqlx::query("INSERT INTO image_manifests (digest, repository, content) VALUES (?, ?, ?)")
.bind(digest)
.bind(repository)
.bind(manifest)
.execute(self).await?;
Ok(())
}
async fn delete_manifest(&self, repository: &str, digest: &str) -> sqlx::Result<()> {
sqlx::query("DELETE FROM image_manifests where digest = ? AND repository = ?")
.bind(digest)
.bind(repository)
.execute(self).await?;
Ok(())
}
async fn save_repository(&self, repository: &str) -> sqlx::Result<()> {
sqlx::query("INSERT INTO repositories (name) VALUES (?)")
.bind(repository)
.execute(self).await?;
Ok(())
}
}

View File

@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS repositories (
CREATE TABLE IF NOT EXISTS image_manifests (
digest TEXT NOT NULL PRIMARY KEY,
repository TEXT NOT NULL,
value TEXT NOT NULL
content TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS layer_blobs (
@ -21,8 +21,8 @@ CREATE TABLE IF NOT EXISTS image_tags (
PRIMARY KEY (name, repository)
);
CREATE TABLE IF NOT EXISTS manifest_blobs (
CREATE TABLE IF NOT EXISTS manifest_layers (
manifest TEXT NOT NULL,
blob TEXT NOT NULL,
PRIMARY KEY (manifest, blob)
layer_digest TEXT NOT NULL,
PRIMARY KEY (manifest, layer_digest)
);

34
src/dto/digest.rs Normal file
View File

@ -0,0 +1,34 @@
pub struct Digest {
algorithm: String,
hex: String,
}
pub enum DigestError {
InvalidDigestString(String),
}
impl Digest {
/// Check if a string is a digest
pub fn is_digest(s: &str) -> bool {
if let Some(idx) = s.find(":") {
let (algo, hex) = s.split_at(idx);
return !algo.is_empty() && !hex.is_empty();
}
false
}
pub fn from_string(s: &str) -> Result<Self, DigestError> {
if let Some(idx) = s.find(":") {
let (algo, hex) = s.split_at(idx);
return Ok(Self {
algorithm: algo.to_string(),
hex: hex.to_string(),
})
}
Err(DigestError::InvalidDigestString(String::from(s)))
}
}

64
src/dto/manifest.rs Normal file
View File

@ -0,0 +1,64 @@
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ContainerConfig {
pub media_type: String,
pub size: Option<u32>,
pub digest: String
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Layer {
pub media_type: String,
pub size: u32,
pub digest: String,
pub urls: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ImageManifest {
pub schema_version: u32,
pub media_type: String,
pub config: ContainerConfig,
pub layers: Vec<Layer>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Platform {
pub architecture: String,
pub os: String,
#[serde(rename = "os.version")]
pub os_version: Option<String>,
#[serde(rename = "os.features")]
pub os_features: Option<Vec<String>>,
pub variant: Option<String>,
pub features: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ManifestListItem {
pub media_type: String,
pub size: u32,
pub digest: String,
pub platform: Platform,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ManifestList {
pub schema_version: u32,
pub media_type: String,
pub manifests: Vec<ManifestListItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Manifest {
Image(ImageManifest),
List(ManifestList)
}

23
src/dto/mod.rs Normal file
View File

@ -0,0 +1,23 @@
use chrono::{DateTime, Utc};
pub mod manifest;
pub mod digest;
#[derive(Debug)]
pub struct Tag {
pub name: String,
pub repository: String,
pub last_updated: DateTime<Utc>,
pub manifest_digest: String,
}
impl Tag {
pub fn new(name: String, repository: String, last_updated: DateTime<Utc>, manifest_digest: String) -> Self {
Self {
name,
repository,
last_updated,
manifest_digest,
}
}
}

View File

@ -1,6 +1,7 @@
mod api;
mod app_state;
mod database;
mod dto;
use std::sync::Arc;
@ -47,10 +48,14 @@ async fn main() -> std::io::Result<()> {
.service(
web::scope("/manifests")
.service(api::manifests::upload_manifest)
.service(api::manifests::pull_manifest)
.service(api::manifests::manifest_exists)
)
.service(
web::scope("/blobs")
.service(api::blobs::digest_exists)
.service(api::blobs::pull_digest)
.service(api::blobs::delete_digest)
.service(
web::scope("/uploads")
.service(api::uploads::start_upload)