Allow range requests when pulling blobs

This commit is contained in:
SeanOMik 2023-07-22 01:11:08 -04:00
parent 6f22e84969
commit 83d0e61fde
Signed by: SeanOMik
GPG Key ID: 568F326C7EB33ACB
2 changed files with 54 additions and 16 deletions

View File

@ -2,9 +2,10 @@ use std::sync::Arc;
use axum::body::StreamBody;
use axum::extract::{State, Path};
use axum::http::{StatusCode, header, HeaderName};
use axum::http::{StatusCode, header, HeaderName, HeaderMap, HeaderValue};
use axum::response::{IntoResponse, Response};
use tokio_util::io::ReaderStream;
use tracing::debug;
use crate::app_state::AppState;
use crate::error::AppError;
@ -18,6 +19,7 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin
StatusCode::OK,
[
(header::CONTENT_LENGTH, size.to_string()),
(header::ACCEPT_RANGES, "true".to_string()),
(HeaderName::from_static("docker-content-digest"), layer_digest)
]
).into_response());
@ -27,30 +29,59 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin
Ok(StatusCode::NOT_FOUND.into_response())
}
pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Result<Response, AppError> {
pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, header_map: HeaderMap, state: State<Arc<AppState>>) -> Result<Response, AppError> {
let storage = state.storage.lock().await;
if let Some(len) = storage.digest_length(&layer_digest).await? {
let stream = match storage.get_digest_stream(&layer_digest).await? {
let mut stream = match storage.get_digest_stream(&layer_digest).await? {
Some(s) => s,
None => {
return Ok(StatusCode::NOT_FOUND.into_response());
}
};
// convert the `AsyncRead` into a `Stream`
let stream = ReaderStream::new(stream.into_async_read());
// convert the `Stream` into an `axum::body::HttpBody`
let body = StreamBody::new(stream);
if let Some(range) = header_map.get(header::CONTENT_RANGE) {
let range = range.to_str().unwrap();
debug!("Range request received: {}", range);
let range = &range[6..];
let (starting, ending) = range.split_once("-").unwrap();
let (starting, ending) = (starting.parse::<i32>().unwrap(), ending.parse::<i32>().unwrap());
// recreate the ByteStream, skipping elements
stream = stream.skip_recreate(starting as usize);
// convert the `AsyncRead` into a `Stream`
let stream = ReaderStream::new(stream.into_async_read());
// convert the `Stream` into an `axum::body::HttpBody`
let body = StreamBody::new(stream);
Ok((
StatusCode::OK,
[
(header::CONTENT_LENGTH, (starting - ending).to_string()),
(header::RANGE, format!("bytes {}-{}/{}", starting, ending, len)),
(HeaderName::from_static("docker-content-digest"), layer_digest)
],
body
).into_response())
} else {
// convert the `AsyncRead` into a `Stream`
let stream = ReaderStream::new(stream.into_async_read());
// convert the `Stream` into an `axum::body::HttpBody`
let body = StreamBody::new(stream);
Ok((
StatusCode::OK,
[
(header::CONTENT_LENGTH, len.to_string()),
(HeaderName::from_static("docker-content-digest"), layer_digest)
],
body
).into_response())
}
Ok((
StatusCode::OK,
[
(header::CONTENT_LENGTH, len.to_string()),
(HeaderName::from_static("docker-content-digest"), layer_digest)
],
body
).into_response())
} else {
Ok(StatusCode::NOT_FOUND.into_response())
}

View File

@ -17,7 +17,7 @@ pin_project! {
#[allow(dead_code)]
impl ByteStream {
/// Create a new `ByteStream` by wrapping a `futures` stream.
pub fn new<S>(stream: S) -> ByteStream
pub fn new<S>(stream: S) -> Self
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
{
@ -27,6 +27,13 @@ impl ByteStream {
}
}
/// Recreate the ByteStream, skipping `n` elements
pub fn skip_recreate(mut self, n: usize) -> Self {
self.inner = Box::pin(self.inner.skip(n));
self
}
pub(crate) fn size_hint(&self) -> Option<usize> {
self.size_hint
}