Compare commits
2 Commits
6f22e84969
...
5a0cdd271f
Author | SHA1 | Date |
---|---|---|
SeanOMik | 5a0cdd271f | |
SeanOMik | 83d0e61fde |
|
@ -2,9 +2,10 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use axum::body::StreamBody;
|
use axum::body::StreamBody;
|
||||||
use axum::extract::{State, Path};
|
use axum::extract::{State, Path};
|
||||||
use axum::http::{StatusCode, header, HeaderName};
|
use axum::http::{StatusCode, header, HeaderName, HeaderMap, HeaderValue};
|
||||||
use axum::response::{IntoResponse, Response};
|
use axum::response::{IntoResponse, Response};
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::app_state::AppState;
|
use crate::app_state::AppState;
|
||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
|
@ -18,6 +19,7 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin
|
||||||
StatusCode::OK,
|
StatusCode::OK,
|
||||||
[
|
[
|
||||||
(header::CONTENT_LENGTH, size.to_string()),
|
(header::CONTENT_LENGTH, size.to_string()),
|
||||||
|
(header::ACCEPT_RANGES, "true".to_string()),
|
||||||
(HeaderName::from_static("docker-content-digest"), layer_digest)
|
(HeaderName::from_static("docker-content-digest"), layer_digest)
|
||||||
]
|
]
|
||||||
).into_response());
|
).into_response());
|
||||||
|
@ -27,22 +29,52 @@ pub async fn digest_exists_head(Path((_name, layer_digest)): Path<(String, Strin
|
||||||
Ok(StatusCode::NOT_FOUND.into_response())
|
Ok(StatusCode::NOT_FOUND.into_response())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, state: State<Arc<AppState>>) -> Result<Response, AppError> {
|
pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>, header_map: HeaderMap, state: State<Arc<AppState>>) -> Result<Response, AppError> {
|
||||||
let storage = state.storage.lock().await;
|
let storage = state.storage.lock().await;
|
||||||
|
|
||||||
if let Some(len) = storage.digest_length(&layer_digest).await? {
|
if let Some(len) = storage.digest_length(&layer_digest).await? {
|
||||||
let stream = match storage.get_digest_stream(&layer_digest).await? {
|
let mut stream = match storage.get_digest_stream(&layer_digest).await? {
|
||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
None => {
|
None => {
|
||||||
return Ok(StatusCode::NOT_FOUND.into_response());
|
return Ok(StatusCode::NOT_FOUND.into_response());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(range) = header_map.get(header::CONTENT_RANGE) {
|
||||||
|
let range = range.to_str().unwrap();
|
||||||
|
debug!("Range request received: {}", range);
|
||||||
|
let range = &range[6..];
|
||||||
|
|
||||||
|
let (starting, ending) = range.split_once("-").unwrap();
|
||||||
|
let (starting, ending) = (starting.parse::<i32>().unwrap(), ending.parse::<i32>().unwrap());
|
||||||
|
|
||||||
|
// recreate the ByteStream, skipping elements
|
||||||
|
stream = stream.skip_recreate(starting as usize);
|
||||||
|
|
||||||
// convert the `AsyncRead` into a `Stream`
|
// convert the `AsyncRead` into a `Stream`
|
||||||
let stream = ReaderStream::new(stream.into_async_read());
|
let stream = ReaderStream::new(stream.into_async_read());
|
||||||
// convert the `Stream` into an `axum::body::HttpBody`
|
// convert the `Stream` into an `axum::body::HttpBody`
|
||||||
let body = StreamBody::new(stream);
|
let body = StreamBody::new(stream);
|
||||||
|
|
||||||
|
debug!("length of range request: {}", starting - ending);
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
StatusCode::OK,
|
||||||
|
[
|
||||||
|
(header::CONTENT_LENGTH, (starting - ending).to_string()),
|
||||||
|
(header::RANGE, format!("bytes {}-{}/{}", starting, ending, len)),
|
||||||
|
(HeaderName::from_static("docker-content-digest"), layer_digest)
|
||||||
|
],
|
||||||
|
body
|
||||||
|
).into_response())
|
||||||
|
} else {
|
||||||
|
// convert the `AsyncRead` into a `Stream`
|
||||||
|
let stream = ReaderStream::new(stream.into_async_read());
|
||||||
|
// convert the `Stream` into an `axum::body::HttpBody`
|
||||||
|
let body = StreamBody::new(stream);
|
||||||
|
|
||||||
|
debug!("length of streamed request: {}", len);
|
||||||
|
|
||||||
Ok((
|
Ok((
|
||||||
StatusCode::OK,
|
StatusCode::OK,
|
||||||
[
|
[
|
||||||
|
@ -51,6 +83,9 @@ pub async fn pull_digest_get(Path((_name, layer_digest)): Path<(String, String)>
|
||||||
],
|
],
|
||||||
body
|
body
|
||||||
).into_response())
|
).into_response())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
Ok(StatusCode::NOT_FOUND.into_response())
|
Ok(StatusCode::NOT_FOUND.into_response())
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ pin_project! {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
impl ByteStream {
|
impl ByteStream {
|
||||||
/// Create a new `ByteStream` by wrapping a `futures` stream.
|
/// Create a new `ByteStream` by wrapping a `futures` stream.
|
||||||
pub fn new<S>(stream: S) -> ByteStream
|
pub fn new<S>(stream: S) -> Self
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
|
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + 'static,
|
||||||
{
|
{
|
||||||
|
@ -27,6 +27,13 @@ impl ByteStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Recreate the ByteStream, skipping `n` elements
|
||||||
|
pub fn skip_recreate(mut self, n: usize) -> Self {
|
||||||
|
self.inner = Box::pin(self.inner.skip(n));
|
||||||
|
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn size_hint(&self) -> Option<usize> {
|
pub(crate) fn size_hint(&self) -> Option<usize> {
|
||||||
self.size_hint
|
self.size_hint
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,8 @@ pub struct Config {
|
||||||
pub listen_address: String,
|
pub listen_address: String,
|
||||||
pub listen_port: String,
|
pub listen_port: String,
|
||||||
url: Option<String>,
|
url: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub extra_logging: bool,
|
||||||
#[serde(deserialize_with = "serialize_log_level", default = "default_log_level")]
|
#[serde(deserialize_with = "serialize_log_level", default = "default_log_level")]
|
||||||
pub log_level: Level,
|
pub log_level: Level,
|
||||||
pub ldap: Option<LdapConnectionConfig>,
|
pub ldap: Option<LdapConnectionConfig>,
|
||||||
|
|
14
src/main.rs
14
src/main.rs
|
@ -32,6 +32,8 @@ use tracing::{debug, info};
|
||||||
|
|
||||||
use app_state::AppState;
|
use app_state::AppState;
|
||||||
use database::Database;
|
use database::Database;
|
||||||
|
use tracing_subscriber::filter;
|
||||||
|
use tracing_subscriber::{filter::FilterFn, layer::{Layer as TracingLayer, SubscriberExt}, util::SubscriberInitExt,};
|
||||||
|
|
||||||
use crate::storage::StorageDriver;
|
use crate::storage::StorageDriver;
|
||||||
use crate::storage::filesystem::FilesystemDriver;
|
use crate::storage::filesystem::FilesystemDriver;
|
||||||
|
@ -72,9 +74,21 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let mut config = Config::new()
|
let mut config = Config::new()
|
||||||
.expect("Failure to parse config!");
|
.expect("Failure to parse config!");
|
||||||
|
|
||||||
|
// Create a tracing subscriber
|
||||||
|
if !config.extra_logging {
|
||||||
|
// only allow logs from the registry
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(tracing_subscriber::fmt::layer())
|
||||||
|
.with(filter::Targets::new()
|
||||||
|
.with_target("orca_registry", config.log_level)
|
||||||
|
)
|
||||||
|
.init();
|
||||||
|
} else {
|
||||||
|
// allow all logs from any crates
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
.with_max_level(config.log_level)
|
.with_max_level(config.log_level)
|
||||||
.init();
|
.init();
|
||||||
|
}
|
||||||
|
|
||||||
let sqlite_config = match &config.database {
|
let sqlite_config = match &config.database {
|
||||||
DatabaseConfig::Sqlite(sqlite) => sqlite,
|
DatabaseConfig::Sqlite(sqlite) => sqlite,
|
||||||
|
|
Loading…
Reference in New Issue