Implement streaming bytes for pulling images

This commit is contained in:
SeanOMik 2023-04-20 23:56:54 -04:00
parent 72c3aa2ecd
commit a232e59788
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
7 changed files with 185 additions and 50 deletions

11
Cargo.lock generated
View File

@ -659,6 +659,7 @@ dependencies = [
"sha256", "sha256",
"sqlx", "sqlx",
"tokio", "tokio",
"tokio-util",
"tracing", "tracing",
"tracing-log", "tracing-log",
"tracing-subscriber", "tracing-subscriber",
@ -1928,9 +1929,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.21.2" version = "1.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",
@ -1943,7 +1944,7 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
"winapi", "windows-sys",
] ]
[[package]] [[package]]
@ -1981,9 +1982,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.4" version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",

View File

@ -18,6 +18,7 @@ bytes = "1.4.0"
chrono = "0.4.23" chrono = "0.4.23"
actix-web = "4" actix-web = "4"
tokio = { version = "1.21.2", features = [ "fs", "macros" ] } tokio = { version = "1.21.2", features = [ "fs", "macros" ] }
tokio-util = { version = "0.7.7", features = [ "io" ] }
clap = { version = "4.0.23", features = [ "derive" ] } clap = { version = "4.0.23", features = [ "derive" ] }

View File

@ -1,4 +1,5 @@
use actix_web::{HttpResponse, get, HttpRequest, web, head, delete}; use actix_web::{HttpResponse, get, HttpRequest, web, head, delete};
use futures::StreamExt;
use crate::app_state::AppState; use crate::app_state::AppState;
@ -11,7 +12,7 @@ pub async fn digest_exists(path: web::Path<(String, String)>, state: web::Data<A
let storage = state.storage.lock().await; let storage = state.storage.lock().await;
if storage.has_digest(&layer_digest).await.unwrap() { if storage.has_digest(&layer_digest).unwrap() {
if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() { if let Some(size) = storage.digest_length(&layer_digest).await.unwrap() {
return HttpResponse::Ok() return HttpResponse::Ok()
.insert_header(("Content-Length", size)) .insert_header(("Content-Length", size))
@ -29,11 +30,22 @@ pub async fn pull_digest(path: web::Path<(String, String)>, state: web::Data<App
let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned()); let (_name, layer_digest) = (path.0.to_owned(), path.1.to_owned());
let storage = state.storage.lock().await; let storage = state.storage.lock().await;
if let Some(bytes) = storage.get_digest(&layer_digest).await.unwrap() {
if let Some(len) = storage.digest_length(&layer_digest).await.unwrap() {
let stream = storage.stream_bytes(&layer_digest).unwrap().unwrap();
/* let s = async_stream::stream! {
let stream = storage.stream_bytes(&layer_digest).unwrap().unwrap().as_ref();
while let Some(item) = stream.next().await {
}
}; */
HttpResponse::Ok() HttpResponse::Ok()
.insert_header(("Content-Length", bytes.len())) .insert_header(("Content-Length", len))
.insert_header(("Docker-Content-Digest", layer_digest)) .insert_header(("Docker-Content-Digest", layer_digest))
.body(bytes) .streaming(stream)
} else { } else {
HttpResponse::NotFound() HttpResponse::NotFound()
.finish() .finish()

100
src/byte_stream.rs Normal file
View File

@ -0,0 +1,100 @@
use std::{pin::Pin, task::{Context, Poll}};
use tokio::io::{AsyncRead, ReadBuf};
use bytes::{Bytes, BytesMut, BufMut};
use futures::{Stream, stream, StreamExt};
use pin_project_lite::pin_project;
pin_project! {
/// Stream of bytes.
pub struct ByteStream {
size_hint: Option<usize>,
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static>>,
}
}
impl ByteStream {
/// Create a new `ByteStream` by wrapping a `futures` stream.
pub fn new<S>(stream: S) -> ByteStream
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
{
ByteStream {
size_hint: None,
inner: Box::pin(stream),
}
}
pub(crate) fn size_hint(&self) -> Option<usize> {
self.size_hint
}
pub fn into_async_read(self) -> impl AsyncRead + Send + 'static {
ImplAsyncRead::new(self.inner)
}
}
impl From<Vec<u8>> for ByteStream {
fn from(buf: Vec<u8>) -> ByteStream {
ByteStream {
size_hint: Some(buf.len()),
inner: Box::pin(stream::once(async move { Ok(Bytes::from(buf)) })),
}
}
}
impl std::fmt::Debug for ByteStream {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "<ByteStream size_hint={:?}>", self.size_hint)
}
}
impl Stream for ByteStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.inner.poll_next(cx)
}
}
pin_project! {
struct ImplAsyncRead {
buffer: BytesMut,
#[pin]
stream: futures::stream::Fuse<Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>>,
}
}
impl ImplAsyncRead {
fn new(stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>) -> Self {
ImplAsyncRead {
buffer: BytesMut::new(),
stream: stream.fuse(),
}
}
}
impl AsyncRead for ImplAsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf,
) -> Poll<std::io::Result<()>> {
let this = self.project();
if this.buffer.is_empty() {
match futures::ready!(this.stream.poll_next(cx)) {
None => return Poll::Ready(Ok(())),
Some(Err(e)) => return Poll::Ready(Err(e)),
Some(Ok(bytes)) => {
this.buffer.put(bytes);
}
}
}
let available = std::cmp::min(buf.remaining(), this.buffer.len());
let bytes = this.buffer.split_to(available);
buf.put_slice(&bytes);
Poll::Ready(Ok(()))
}
}

View File

@ -3,6 +3,7 @@ mod app_state;
mod database; mod database;
mod dto; mod dto;
mod storage; mod storage;
mod byte_stream;
use std::sync::Arc; use std::sync::Arc;

View File

@ -1,19 +1,18 @@
use std::{path::Path, io::ErrorKind, sync::Arc, collections::HashMap}; use std::{path::Path, io::{ErrorKind, BufRead, BufReader, Read}, sync::Arc, collections::HashMap};
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures::{executor::block_on, StreamExt}; use futures::{executor::block_on, StreamExt, Stream};
use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking, sync::{Mutex, mpsc}}; use tokio::{fs, io::{AsyncWriteExt, AsyncReadExt}, task::spawn_blocking, sync::{Mutex, mpsc}};
use tokio_util::io::ReaderStream;
use tracing::debug; use tracing::debug;
use crate::byte_stream::ByteStream;
use super::{StorageDriver, StorageDriverStreamer, Streamer}; use super::{StorageDriver, StorageDriverStreamer, Streamer};
pub struct FilesystemStreamer { pub struct FilesystemStreamer {
/* new_streams_channel: mpsc::Receiver<(String, mpsc::Receiver<Bytes>)>,
// (digest, receiver)
streaming_channels: Vec<(String, )>, */
storage_path: String, storage_path: String,
chunk_channel: mpsc::Receiver<(String, Bytes)>, chunk_channel: mpsc::Receiver<(String, Bytes)>,
cached_files: HashMap<String, fs::File>, cached_files: HashMap<String, fs::File>,
@ -61,20 +60,13 @@ impl Streamer for FilesystemStreamer {
pub struct FilesystemDriver { pub struct FilesystemDriver {
storage_path: String, storage_path: String,
// (digest, receiver)
streaming_channels: Vec<(String, mpsc::Receiver<Bytes>)>,
streamer_sender: mpsc::Sender<(String, Bytes)>, streamer_sender: mpsc::Sender<(String, Bytes)>,
} }
impl FilesystemDriver { impl FilesystemDriver {
//pub fn new(storage_path: &str) -> FilesystemDriver {
pub fn new(storage_path: String, stream_sender: mpsc::Sender<(String, Bytes)>) -> FilesystemDriver { pub fn new(storage_path: String, stream_sender: mpsc::Sender<(String, Bytes)>) -> FilesystemDriver {
/* let (send, recv) = mpsc::channel::<(String, Bytes)>(50);
let streamer = Arc::new(FilesystemStreamer::new(storage_path.to_string(), recv)); */
Self { Self {
storage_path, storage_path,
streaming_channels: vec![],
streamer_sender: stream_sender, streamer_sender: stream_sender,
} }
} }
@ -82,25 +74,6 @@ impl FilesystemDriver {
fn get_digest_path(&self, digest: &str) -> String { fn get_digest_path(&self, digest: &str) -> String {
format!("{}/{}", self.storage_path, digest) format!("{}/{}", self.storage_path, digest)
} }
async fn write_payload(&self, digest: &str, mut payload: actix_web::web::Payload, append: bool) -> anyhow::Result<usize> {
let path = self.get_digest_path(digest);
let mut file = fs::OpenOptions::new()
.write(true)
.append(append)
.create(true)
.open(path).await?;
let mut total_size = 0;
while let Some(item) = payload.next().await {
let item = item?;
total_size += item.len();
file.write_all(&item).await?;
}
Ok(total_size)
}
} }
impl StorageDriverStreamer for FilesystemDriver { impl StorageDriverStreamer for FilesystemDriver {
@ -111,17 +84,44 @@ impl StorageDriverStreamer for FilesystemDriver {
fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)> { fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)> {
self.streamer_sender.clone() self.streamer_sender.clone()
} }
fn has_digest(&self, digest: &str) -> anyhow::Result<bool> {
let path = self.get_digest_path(digest);
Ok(Path::new(&path).exists())
}
fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>> {
let path = self.get_digest_path(digest);
if self.has_digest(digest)? {
//tokio::spawn(async {
let s = async_stream::try_stream! {
let file = fs::File::open(&path).await.unwrap();
let mut s = ReaderStream::new(file);
while let Some(item) = s.next().await {
if let Ok(bytes) = item {
yield bytes;
//sender.send((layer_uuid.clone(), bytes)).await.unwrap();
}
}
//file.re
};
//});
Ok(Some(ByteStream::new(s)))
} else {
Ok(None)
}
}
} }
#[async_trait] #[async_trait]
impl StorageDriver for FilesystemDriver { impl StorageDriver for FilesystemDriver {
async fn has_digest(&self, digest: &str) -> anyhow::Result<bool> {
let path = self.get_digest_path(digest);
spawn_blocking(move || {
return Path::new(&path).exists()
}).await.context("FilesystemDriver: Failure to spawn blocking thread to check digest")
}
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>> { async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>> {
let mut file = match fs::File::open(self.get_digest_path(digest)) let mut file = match fs::File::open(self.get_digest_path(digest))
@ -137,7 +137,7 @@ impl StorageDriver for FilesystemDriver {
.context("FilesystemDriver: Failure to open digest file"); .context("FilesystemDriver: Failure to open digest file");
} }
} }
}; };
let mut buf = Vec::new(); let mut buf = Vec::new();
file.read_to_end(&mut buf).await?; file.read_to_end(&mut buf).await?;
@ -145,6 +145,20 @@ impl StorageDriver for FilesystemDriver {
Ok(Some(Bytes::from_iter(buf))) Ok(Some(Bytes::from_iter(buf)))
} }
async fn get_digest_stream(&self, digest: &str) -> anyhow::Result<Option<ByteStream>> {
let path = self.get_digest_path(digest);
/* if self.has_digest(digest) {
let s = async_stream::stream! {
};
} else {
Ok(None)
} */
todo!()
}
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>> { async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>> {
let file = match fs::File::open(self.get_digest_path(digest)) let file = match fs::File::open(self.get_digest_path(digest))
.await { .await {

View File

@ -9,6 +9,8 @@ use tokio::{io::{AsyncWrite, AsyncRead}, sync::{Mutex, mpsc}};
use actix_web::web; use actix_web::web;
use crate::byte_stream::ByteStream;
pub trait Streamer { pub trait Streamer {
fn start(&'static mut self) -> anyhow::Result<()>; fn start(&'static mut self) -> anyhow::Result<()>;
} }
@ -16,14 +18,18 @@ pub trait Streamer {
pub trait StorageDriverStreamer { pub trait StorageDriverStreamer {
fn supports_streaming(&self) -> bool; fn supports_streaming(&self) -> bool;
fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>; fn start_stream_channel(&self) -> mpsc::Sender<(String, Bytes)>;
fn has_digest(&self, digest: &str) -> anyhow::Result<bool>;
fn stream_bytes(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
} }
#[async_trait] #[async_trait]
pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ { pub trait StorageDriver: Send + StorageDriverStreamer/* : AsyncWrite + AsyncRead */ {
async fn has_digest(&self, digest: &str) -> anyhow::Result<bool>;
async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>>; async fn get_digest(&self, digest: &str) -> anyhow::Result<Option<Bytes>>;
async fn get_digest_stream(&self, digest: &str) -> anyhow::Result<Option<ByteStream>>;
async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>>; async fn digest_length(&self, digest: &str) -> anyhow::Result<Option<usize>>;
async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>; async fn save_digest(&self, digest: &str, bytes: &Bytes, append: bool) -> anyhow::Result<()>;
async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>; async fn delete_digest(&self, digest: &str) -> anyhow::Result<()>;
async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>; async fn replace_digest(&self, uuid: &str, digest: &str) -> anyhow::Result<()>;
//async fn stream_bytes(&self, stream: Box<dyn Stream<Item = Bytes>>) -> anyhow::Result<()>;
} }