From 83d0e61fde8bc151b7768ab58b276adf3b6031eb Mon Sep 17 00:00:00 2001 From: SeanOMik Date: Sat, 22 Jul 2023 01:11:08 -0400 Subject: [PATCH] Allow range requests when pulling blobs --- src/api/blobs.rs | 61 ++++++++++++++++++++++++++++++++++------------ src/byte_stream.rs | 9 ++++++- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 85a9e6c..8dc78ce 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -2,9 +2,10 @@ use std::sync::Arc; use axum::body::StreamBody; use axum::extract::{State, Path}; -use axum::http::{StatusCode, header, HeaderName}; +use axum::http::{StatusCode, header, HeaderName, HeaderMap, HeaderValue}; use axum::response::{IntoResponse, Response}; use tokio_util::io::ReaderStream; +use tracing::debug; use crate::app_state::AppState; use crate::error::AppError; @@ -18,6 +19,7 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin StatusCode::OK, [ (header::CONTENT_LENGTH, size.to_string()), + (header::ACCEPT_RANGES, "true".to_string()), (HeaderName::from_static("docker-content-digest"), layer_digest) ] ).into_response()); @@ -27,30 +29,59 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin Ok(StatusCode::NOT_FOUND.into_response()) } -pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, state: State>) -> Result { +pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, header_map: HeaderMap, state: State>) -> Result { let storage = state.storage.lock().await; if let Some(len) = storage.digest_length(&layer_digest).await? { - let stream = match storage.get_digest_stream(&layer_digest).await? { + let mut stream = match storage.get_digest_stream(&layer_digest).await? { Some(s) => s, None => { return Ok(StatusCode::NOT_FOUND.into_response()); } }; - // convert the `AsyncRead` into a `Stream` - let stream = ReaderStream::new(stream.into_async_read()); - // convert the `Stream` into an `axum::body::HttpBody` - let body = StreamBody::new(stream); + if let Some(range) = header_map.get(header::CONTENT_RANGE) { + let range = range.to_str().unwrap(); + debug!("Range request received: {}", range); + let range = &range[6..]; - Ok(( - StatusCode::OK, - [ - (header::CONTENT_LENGTH, len.to_string()), - (HeaderName::from_static("docker-content-digest"), layer_digest) - ], - body - ).into_response()) + let (starting, ending) = range.split_once("-").unwrap(); + let (starting, ending) = (starting.parse::().unwrap(), ending.parse::().unwrap()); + + // recreate the ByteStream, skipping elements + stream = stream.skip_recreate(starting as usize); + + // convert the `AsyncRead` into a `Stream` + let stream = ReaderStream::new(stream.into_async_read()); + // convert the `Stream` into an `axum::body::HttpBody` + let body = StreamBody::new(stream); + + Ok(( + StatusCode::OK, + [ + (header::CONTENT_LENGTH, (starting - ending).to_string()), + (header::RANGE, format!("bytes {}-{}/{}", starting, ending, len)), + (HeaderName::from_static("docker-content-digest"), layer_digest) + ], + body + ).into_response()) + } else { + // convert the `AsyncRead` into a `Stream` + let stream = ReaderStream::new(stream.into_async_read()); + // convert the `Stream` into an `axum::body::HttpBody` + let body = StreamBody::new(stream); + + Ok(( + StatusCode::OK, + [ + (header::CONTENT_LENGTH, len.to_string()), + (HeaderName::from_static("docker-content-digest"), layer_digest) + ], + body + ).into_response()) + } + + } else { Ok(StatusCode::NOT_FOUND.into_response()) } diff --git a/src/byte_stream.rs b/src/byte_stream.rs index 4313d3a..d4e2cca 100644 --- a/src/byte_stream.rs +++ b/src/byte_stream.rs @@ -17,7 +17,7 @@ pin_project! { #[allow(dead_code)] impl ByteStream { /// Create a new `ByteStream` by wrapping a `futures` stream. - pub fn new(stream: S) -> ByteStream + pub fn new(stream: S) -> Self where S: Stream> + Send + 'static, { @@ -27,6 +27,13 @@ impl ByteStream { } } + /// Recreate the ByteStream, skipping `n` elements + pub fn skip_recreate(mut self, n: usize) -> Self { + self.inner = Box::pin(self.inner.skip(n)); + + self + } + pub(crate) fn size_hint(&self) -> Option { self.size_hint }