Compare commits

..

No commits in common. "5a0cdd271ffeee7a7e5095b49f003d899fcc582f" and "6f22e84969bb02855f802d583bc8168cde3551c7" have entirely different histories.

4 changed files with 19 additions and 77 deletions

View File

@ -2,10 +2,9 @@ use std::sync::Arc;
use axum::body::StreamBody; use axum::body::StreamBody;
use axum::extract::{State, Path}; use axum::extract::{State, Path};
use axum::http::{StatusCode, header, HeaderName, HeaderMap, HeaderValue}; use axum::http::{StatusCode, header, HeaderName};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use tracing::debug;
use crate::app_state::AppState; use crate::app_state::AppState;
use crate::error::AppError; use crate::error::AppError;
@ -19,7 +18,6 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin
StatusCode::OK, StatusCode::OK,
[ [
(header::CONTENT_LENGTH, size.to_string()), (header::CONTENT_LENGTH, size.to_string()),
(header::ACCEPT_RANGES, "true".to_string()),
(HeaderName::from_static("docker-content-digest"), layer_digest) (HeaderName::from_static("docker-content-digest"), layer_digest)
] ]
).into_response()); ).into_response());
@ -29,63 +27,30 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin
Ok(StatusCode::NOT_FOUND.into_response()) Ok(StatusCode::NOT_FOUND.into_response())
} }
pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, header_map: HeaderMap, state: State<Arc<AppState>>) -> Result<Response, AppError> { pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Result<Response, AppError> {
let storage = state.storage.lock().await; let storage = state.storage.lock().await;
if let Some(len) = storage.digest_length(&layer_digest).await? { if let Some(len) = storage.digest_length(&layer_digest).await? {
let mut stream = match storage.get_digest_stream(&layer_digest).await? { let stream = match storage.get_digest_stream(&layer_digest).await? {
Some(s) => s, Some(s) => s,
None => { None => {
return Ok(StatusCode::NOT_FOUND.into_response()); return Ok(StatusCode::NOT_FOUND.into_response());
} }
}; };
if let Some(range) = header_map.get(header::CONTENT_RANGE) { // convert the `AsyncRead` into a `Stream`
let range = range.to_str().unwrap(); let stream = ReaderStream::new(stream.into_async_read());
debug!("Range request received: {}", range); // convert the `Stream` into an `axum::body::HttpBody`
let range = &range[6..]; let body = StreamBody::new(stream);
let (starting, ending) = range.split_once("-").unwrap(); Ok((
let (starting, ending) = (starting.parse::<i32>().unwrap(), ending.parse::<i32>().unwrap()); StatusCode::OK,
[
// recreate the ByteStream, skipping elements (header::CONTENT_LENGTH, len.to_string()),
stream = stream.skip_recreate(starting as usize); (HeaderName::from_static("docker-content-digest"), layer_digest)
],
// convert the `AsyncRead` into a `Stream` body
let stream = ReaderStream::new(stream.into_async_read()); ).into_response())
// convert the `Stream` into an `axum::body::HttpBody`
let body = StreamBody::new(stream);
debug!("length of range request: {}", starting - ending);
Ok((
StatusCode::OK,
[
(header::CONTENT_LENGTH, (starting - ending).to_string()),
(header::RANGE, format!("bytes {}-{}/{}", starting, ending, len)),
(HeaderName::from_static("docker-content-digest"), layer_digest)
],
body
).into_response())
} else {
// convert the `AsyncRead` into a `Stream`
let stream = ReaderStream::new(stream.into_async_read());
// convert the `Stream` into an `axum::body::HttpBody`
let body = StreamBody::new(stream);
debug!("length of streamed request: {}", len);
Ok((
StatusCode::OK,
[
(header::CONTENT_LENGTH, len.to_string()),
(HeaderName::from_static("docker-content-digest"), layer_digest)
],
body
).into_response())
}
} else { } else {
Ok(StatusCode::NOT_FOUND.into_response()) Ok(StatusCode::NOT_FOUND.into_response())
} }

View File

@ -17,7 +17,7 @@ pin_project! {
#[allow(dead_code)] #[allow(dead_code)]
impl ByteStream { impl ByteStream {
/// Create a new `ByteStream` by wrapping a `futures` stream. /// Create a new `ByteStream` by wrapping a `futures` stream.
pub fn new<S>(stream: S) -> Self pub fn new<S>(stream: S) -> ByteStream
where where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static, S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
{ {
@ -27,13 +27,6 @@ impl ByteStream {
} }
} }
/// Recreate the ByteStream, skipping `n` elements
pub fn skip_recreate(mut self, n: usize) -> Self {
self.inner = Box::pin(self.inner.skip(n));
self
}
pub(crate) fn size_hint(&self) -> Option<usize> { pub(crate) fn size_hint(&self) -> Option<usize> {
self.size_hint self.size_hint
} }

View File

@ -64,8 +64,6 @@ pub struct Config {
pub listen_address: String, pub listen_address: String,
pub listen_port: String, pub listen_port: String,
url: Option<String>, url: Option<String>,
#[serde(default)]
pub extra_logging: bool,
#[serde(deserialize_with = "serialize_log_level", default = "default_log_level")] #[serde(deserialize_with = "serialize_log_level", default = "default_log_level")]
pub log_level: Level, pub log_level: Level,
pub ldap: Option<LdapConnectionConfig>, pub ldap: Option<LdapConnectionConfig>,

View File

@ -32,8 +32,6 @@ use tracing::{debug, info};
use app_state::AppState; use app_state::AppState;
use database::Database; use database::Database;
use tracing_subscriber::filter;
use tracing_subscriber::{filter::FilterFn, layer::{Layer as TracingLayer, SubscriberExt}, util::SubscriberInitExt,};
use crate::storage::StorageDriver; use crate::storage::StorageDriver;
use crate::storage::filesystem::FilesystemDriver; use crate::storage::filesystem::FilesystemDriver;
@ -74,21 +72,9 @@ async fn main() -> anyhow::Result<()> {
let mut config = Config::new() let mut config = Config::new()
.expect("Failure to parse config!"); .expect("Failure to parse config!");
// Create a tracing subscriber tracing_subscriber::fmt()
if !config.extra_logging { .with_max_level(config.log_level)
// only allow logs from the registry .init();
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(filter::Targets::new()
.with_target("orca_registry", config.log_level)
)
.init();
} else {
// allow all logs from any crates
tracing_subscriber::fmt()
.with_max_level(config.log_level)
.init();
}
let sqlite_config = match &config.database { let sqlite_config = match &config.database {
DatabaseConfig::Sqlite(sqlite) => sqlite, DatabaseConfig::Sqlite(sqlite) => sqlite,