"tempfile",
]
-[[package]]
-name = "async-channel"
-version = "1.6.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
-dependencies = [
- "concurrent-queue",
- "event-listener",
- "futures-core",
-]
-
[[package]]
name = "async-compression"
version = "0.3.14"
"tokio",
]
-[[package]]
-name = "async-fs"
-version = "1.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2"
-dependencies = [
- "async-lock",
- "blocking",
- "futures-lite",
-]
-
-[[package]]
-name = "async-lock"
-version = "2.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6"
-dependencies = [
- "event-listener",
-]
-
[[package]]
name = "async-stream"
version = "0.3.3"
"syn",
]
-[[package]]
-name = "async-task"
-version = "4.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9"
-
[[package]]
name = "async-trait"
version = "0.1.56"
"syn",
]
-[[package]]
-name = "async-walkdir"
-version = "0.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "826d88d73e87e7504b635b6e427561faa6a65f4a2f59e75efcbfa51a0876bb90"
-dependencies = [
- "async-fs",
- "futures-lite",
-]
-
[[package]]
name = "async_io_utilities"
version = "0.1.3"
"tokio",
]
-[[package]]
-name = "atomic-waker"
-version = "1.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
-
[[package]]
name = "autocfg"
version = "1.1.0"
"generic-array",
]
-[[package]]
-name = "blocking"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc"
-dependencies = [
- "async-channel",
- "async-task",
- "atomic-waker",
- "fastrand",
- "futures-lite",
- "once_cell",
-]
-
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
-[[package]]
-name = "cache-padded"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
-
[[package]]
name = "cc"
version = "1.0.73"
"os_str_bytes",
]
-[[package]]
-name = "concurrent-queue"
-version = "1.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
-dependencies = [
- "cache-padded",
-]
-
[[package]]
name = "core-foundation"
version = "0.9.3"
"assert_cmd",
"assert_fs",
"async-stream",
- "async-walkdir",
"async_zip",
"base64",
"chrono",
"url",
"urlencoding",
"uuid",
+ "walkdir",
"xml-rs",
]
"cfg-if",
]
-[[package]]
-name = "event-listener"
-version = "2.5.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
-
[[package]]
name = "fastrand"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
-[[package]]
-name = "futures-lite"
-version = "1.12.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
-dependencies = [
- "fastrand",
- "futures-core",
- "futures-io",
- "memchr",
- "parking",
- "pin-project-lite",
- "waker-fn",
-]
-
[[package]]
name = "futures-macro"
version = "0.3.21"
"winapi",
]
-[[package]]
-name = "parking"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
-
[[package]]
name = "parking_lot"
version = "0.12.1"
"libc",
]
-[[package]]
-name = "waker-fn"
-version = "1.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
-
[[package]]
name = "walkdir"
version = "2.3.2"
use crate::tls::{TlsAcceptor, TlsStream};
use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use futures::future::join_all;
logger::init().map_err(|e| format!("Failed to init logger, {}", e))?;
let args = Args::parse(matches())?;
let args = Arc::new(args);
- let handles = serve(args.clone())?;
+ let running = Arc::new(AtomicBool::new(true));
+ let handles = serve(args.clone(), running.clone())?;
print_listening(args)?;
tokio::select! {
Ok(())
},
_ = shutdown_signal() => {
+ running.store(false, Ordering::SeqCst);
Ok(())
},
}
}
-fn serve(args: Arc<Args>) -> BoxResult<Vec<JoinHandle<Result<(), hyper::Error>>>> {
- let inner = Arc::new(Server::new(args.clone()));
+fn serve(
+ args: Arc<Args>,
+ running: Arc<AtomicBool>,
+) -> BoxResult<Vec<JoinHandle<Result<(), hyper::Error>>>> {
+ let inner = Arc::new(Server::new(args.clone(), running));
let mut handles = vec![];
let port = args.port;
for ip in args.addrs.iter() {
use crate::streamer::Streamer;
use crate::utils::{decode_uri, encode_uri, get_file_name, try_get_file_name};
use crate::{Args, BoxResult};
-use async_walkdir::{Filtering, WalkDir};
+use walkdir::WalkDir;
use xml::escape::escape_str_pcdata;
use async_zip::write::{EntryOptions, ZipFileWriter};
use async_zip::Compression;
use chrono::{TimeZone, Utc};
-use futures::stream::StreamExt;
use futures::TryStreamExt;
use headers::{
AcceptRanges, AccessControlAllowCredentials, AccessControlAllowHeaders,
use std::io::SeekFrom;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::fs::File;
pub struct Server {
args: Arc<Args>,
assets_prefix: String,
+ running: Arc<AtomicBool>,
}
impl Server {
- pub fn new(args: Arc<Args>) -> Self {
+ pub fn new(args: Arc<Args>, running: Arc<AtomicBool>) -> Self {
let assets_prefix = format!("{}__dufs_v{}_", args.uri_prefix, env!("CARGO_PKG_VERSION"));
Self {
args,
+ running,
assets_prefix,
}
}
res: &mut Response,
) -> BoxResult<()> {
let mut paths: Vec<PathItem> = vec![];
+ let path_buf = path.to_path_buf();
let hidden = self.args.hidden.to_string();
- let search = search.to_string();
- let mut walkdir = WalkDir::new(path).filter(move |entry| {
- let hidden_cloned = hidden.clone();
- let search_cloned = search.clone();
- async move {
+ let running = self.running.clone();
+ let search = search.to_lowercase();
+ let search_paths = tokio::task::spawn_blocking(move || {
+ let mut it = WalkDir::new(&path_buf).into_iter();
+ let mut paths: Vec<PathBuf> = vec![];
+ while let Some(Ok(entry)) = it.next() {
+ if !running.load(Ordering::SeqCst) {
+ break;
+ }
let entry_path = entry.path();
- let base_name = get_file_name(&entry_path);
- if is_hidden(&hidden_cloned, base_name) {
- return Filtering::IgnoreDir;
+ let base_name = get_file_name(entry_path);
+ let file_type = entry.file_type();
+ if is_hidden(&hidden, base_name) {
+ if file_type.is_dir() {
+ it.skip_current_dir();
+ }
+ continue;
}
- if !base_name
- .to_lowercase()
- .contains(&search_cloned.to_lowercase())
- {
- return Filtering::Ignore;
+ if !base_name.to_lowercase().contains(&search) {
+ continue;
}
- if fs::symlink_metadata(entry.path()).await.is_err() {
- return Filtering::Ignore;
+ if entry.path().symlink_metadata().is_err() {
+ continue;
}
- Filtering::Continue
+ paths.push(entry_path.to_path_buf());
}
- });
- while let Some(entry) = walkdir.next().await {
- if let Ok(entry) = entry {
- if let Ok(Some(item)) = self.to_pathitem(entry.path(), path.to_path_buf()).await {
- paths.push(item);
- }
+ paths
+ })
+ .await?;
+ for search_path in search_paths.into_iter() {
+ if let Ok(Some(item)) = self.to_pathitem(search_path, path.to_path_buf()).await {
+ paths.push(item);
}
}
self.send_index(path, paths, true, head_only, res)
}
let path = path.to_owned();
let hidden = self.args.hidden.clone();
+ let running = self.running.clone();
tokio::spawn(async move {
- if let Err(e) = zip_dir(&mut writer, &path, &hidden).await {
+ if let Err(e) = zip_dir(&mut writer, &path, &hidden, running).await {
error!("Failed to zip {}, {}", path.display(), e);
}
});
));
}
-async fn zip_dir<W: AsyncWrite + Unpin>(writer: &mut W, dir: &Path, hidden: &str) -> BoxResult<()> {
+async fn zip_dir<W: AsyncWrite + Unpin>(
+ writer: &mut W,
+ dir: &Path,
+ hidden: &str,
+ running: Arc<AtomicBool>,
+) -> BoxResult<()> {
let mut writer = ZipFileWriter::new(writer);
+ let hidden = Arc::new(hidden.to_string());
let hidden = hidden.to_string();
- let mut walkdir = WalkDir::new(dir).filter(move |entry| {
- let hidden = hidden.clone();
- async move {
+ let dir_path_buf = dir.to_path_buf();
+ let zip_paths = tokio::task::spawn_blocking(move || {
+ let mut it = WalkDir::new(&dir_path_buf).into_iter();
+ let mut paths: Vec<PathBuf> = vec![];
+ while let Some(Ok(entry)) = it.next() {
+ if !running.load(Ordering::SeqCst) {
+ break;
+ }
let entry_path = entry.path();
- let base_name = get_file_name(&entry_path);
+ let base_name = get_file_name(entry_path);
+ let file_type = entry.file_type();
if is_hidden(&hidden, base_name) {
- return Filtering::IgnoreDir;
+ if file_type.is_dir() {
+ it.skip_current_dir();
+ }
+ continue;
}
- let meta = match fs::symlink_metadata(entry.path()).await {
- Ok(meta) => meta,
- Err(_) => return Filtering::Ignore,
- };
- if !meta.is_file() {
- return Filtering::Ignore;
+ if entry.path().symlink_metadata().is_err() {
+ continue;
}
- Filtering::Continue
- }
- });
- while let Some(entry) = walkdir.next().await {
- if let Ok(entry) = entry {
- let entry_path = entry.path();
- let filename = match entry_path.strip_prefix(dir).ok().and_then(|v| v.to_str()) {
- Some(v) => v,
- None => continue,
- };
- let entry_options = EntryOptions::new(filename.to_owned(), Compression::Deflate)
- .unix_permissions(0o644);
- let mut file = File::open(&entry_path).await?;
- let mut file_writer = writer.write_entry_stream(entry_options).await?;
- io::copy(&mut file, &mut file_writer).await?;
- file_writer.close().await?;
+ if !file_type.is_file() {
+ continue;
+ }
+ paths.push(entry_path.to_path_buf());
}
+ paths
+ })
+ .await?;
+ for zip_path in zip_paths.into_iter() {
+ let filename = match zip_path.strip_prefix(dir).ok().and_then(|v| v.to_str()) {
+ Some(v) => v,
+ None => continue,
+ };
+ let entry_options =
+ EntryOptions::new(filename.to_owned(), Compression::Deflate).unix_permissions(0o644);
+ let mut file = File::open(&zip_path).await?;
+ let mut file_writer = writer.write_entry_stream(entry_options).await?;
+ io::copy(&mut file, &mut file_writer).await?;
+ file_writer.close().await?;
}
writer.close().await?;
Ok(())