]> OzVa Git service - ozva-cloud/commitdiff
feat: supports resumable uploads (#343)
authorsigoden <sigoden@gmail.com>
Thu, 11 Jan 2024 06:56:30 +0000 (14:56 +0800)
committerGitHub <noreply@github.com>
Thu, 11 Jan 2024 06:56:30 +0000 (14:56 +0800)
README.md
assets/index.css
assets/index.js
src/server.rs
tests/http.rs

index ea86f591eafb54483bf55396e3cdcb791dbc2cf1..bdbf07e25ec3be5b54a3ef8b3ee4c75d0a40089a 100644 (file)
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ Dufs is a distinctive utility file server that supports static serving, uploadin
 - Download folder as zip file
 - Upload files and folders (Drag & Drop)
 - Create/Edit/Search files
-- Partial responses (Parallel/Resume download)
+- Resumable/partial uploads/downloads
 - Access control
 - Support https
 - Support webdav
@@ -195,8 +195,22 @@ curl http://127.0.0.1:5000?json                   # output paths in json format
 With authorization
 
 ```
-curl http://192.168.8.10:5000/file --user user:pass                 # basic auth
-curl http://192.168.8.10:5000/file --user user:pass --digest        # digest auth
+curl http://127.0.0.1:5000/file --user user:pass                 # basic auth
+curl http://127.0.0.1:5000/file --user user:pass --digest        # digest auth
+```
+
+Resumable downloads
+
+```
+curl -C- -o file http://127.0.0.1:5000/file
+```
+
+Resumable uploads
+
+```
+upload_offset=$(curl -I -s http://127.0.0.1:5000/file | tr -d '\r' | sed -n 's/content-length: //p')
+dd skip=$upload_offset if=file status=none ibs=1 | \
+  curl -X PATCH -H "X-Update-Range: append" --data-binary @- http://127.0.0.1:5000/file
 ```
 
 <details>
index d0ab5a8320d2d5d754a1ac3cbc4ad03ffc671d82..5718c15d554278c2a231bdadc1413fd0b6848855 100644 (file)
@@ -134,7 +134,6 @@ body {
 }
 
 .cell-status span {
-  width: 80px;
   display: inline-block;
 }
 
@@ -239,6 +238,10 @@ body {
   font-style: italic;
 }
 
+.retry-btn {
+  cursor: pointer;
+}
+
 @media (min-width: 768px) {
   .path a {
     min-width: 400px;
index 0f049bebe4be87cc83e29b91650b4f911a929b83..7e6de9e31de619780420f07458d3559c1a8e975f 100644 (file)
@@ -59,6 +59,11 @@ const ICONS = {
   view: `<svg width="16" height="16" viewBox="0 0 16 16"><path d="M4 0a2 2 0 0 0-2 2v12a2 2 0 0 0 2 2h8a2 2 0 0 0 2-2V2a2 2 0 0 0-2-2zm0 1h8a1 1 0 0 1 1 1v12a1 1 0 0 1-1 1H4a1 1 0 0 1-1-1V2a1 1 0 0 1 1-1"/></svg>`,
 }
 
+/**
+ * @type Map<string, Uploader>
+ */
+const failUploaders = new Map();
+
 /**
  * @type Element
  */
@@ -128,23 +133,24 @@ class Uploader {
   /**
    *
    * @param {File} file
-   * @param {string[]} dirs
+   * @param {string[]} pathParts
    */
-  constructor(file, dirs) {
+  constructor(file, pathParts) {
     /**
      * @type Element
      */
     this.$uploadStatus = null
     this.uploaded = 0;
+    this.uploadOffset = 0;
     this.lastUptime = 0;
-    this.name = [...dirs, file.name].join("/");
+    this.name = [...pathParts, file.name].join("/");
     this.idx = Uploader.globalIdx++;
     this.file = file;
+    this.url = newUrl(this.name);
   }
 
   upload() {
-    const { idx, name } = this;
-    const url = newUrl(name);
+    const { idx, name, url } = this;
     const encodedName = encodedStr(name);
     $uploadersTable.insertAdjacentHTML("beforeend", `
   <tr id="upload${idx}" class="uploader">
@@ -160,13 +166,25 @@ class Uploader {
     $emptyFolder.classList.add("hidden");
     this.$uploadStatus = document.getElementById(`uploadStatus${idx}`);
     this.$uploadStatus.innerHTML = '-';
+    this.$uploadStatus.addEventListener("click", e => {
+      const nodeId = e.target.id;
+      const matches = /^retry(\d+)$/.exec(nodeId);
+      if (matches) {
+        const id = parseInt(matches[1]);
+        let uploader = failUploaders.get(id);
+        if (uploader) uploader.retry();
+      }
+    });
     Uploader.queues.push(this);
     Uploader.runQueue();
   }
 
   ajax() {
-    const url = newUrl(this.name);
+    const { url } = this;
+
+    this.uploaded = 0;
     this.lastUptime = Date.now();
+
     const ajax = new XMLHttpRequest();
     ajax.upload.addEventListener("progress", e => this.progress(e), false);
     ajax.addEventListener("readystatechange", () => {
@@ -174,37 +192,64 @@ class Uploader {
         if (ajax.status >= 200 && ajax.status < 300) {
           this.complete();
         } else {
-          this.fail();
+          if (ajax.status != 0) {
+            this.fail(`${ajax.status} ${ajax.statusText}`);
+          }
         }
       }
     })
     ajax.addEventListener("error", () => this.fail(), false);
     ajax.addEventListener("abort", () => this.fail(), false);
-    ajax.open("PUT", url);
-    ajax.send(this.file);
+    if (this.uploadOffset > 0) {
+      ajax.open("PATCH", url);
+      ajax.setRequestHeader("X-Update-Range", "append");
+      ajax.send(this.file.slice(this.uploadOffset));
+    } else {
+      ajax.open("PUT", url);
+      ajax.send(this.file);
+      // setTimeout(() => ajax.abort(), 3000);
+    }
   }
 
+  async retry() {
+    const { url } = this;
+    let res = await fetch(url, {
+      method: "HEAD",
+    });
+    let uploadOffset = 0;
+    if (res.status == 200) {
+      let value = res.headers.get("content-length");
+      uploadOffset = parseInt(value) || 0;
+    }
+    this.uploadOffset = uploadOffset;
+    this.ajax()
+  }
 
   progress(event) {
     const now = Date.now();
     const speed = (event.loaded - this.uploaded) / (now - this.lastUptime) * 1000;
     const [speedValue, speedUnit] = formatSize(speed);
     const speedText = `${speedValue} ${speedUnit}/s`;
-    const progress = formatPercent((event.loaded / event.total) * 100);
+    const progress = formatPercent(((event.loaded + this.uploadOffset) / this.file.size) * 100);
     const duration = formatDuration((event.total - event.loaded) / speed)
-    this.$uploadStatus.innerHTML = `<span>${speedText}</span><span>${progress} ${duration}</span>`;
+    this.$uploadStatus.innerHTML = `<span style="width: 80px;">${speedText}</span><span>${progress} ${duration}</span>`;
     this.uploaded = event.loaded;
     this.lastUptime = now;
   }
 
   complete() {
-    this.$uploadStatus.innerHTML = `✓`;
+    const $uploadStatusNew = this.$uploadStatus.cloneNode(true);
+    $uploadStatusNew.innerHTML = `✓`;
+    this.$uploadStatus.parentNode.replaceChild($uploadStatusNew, this.$uploadStatus);
+    this.$uploadStatus = null;
+    failUploaders.delete(this.idx);
     Uploader.runnings--;
     Uploader.runQueue();
   }
 
-  fail() {
-    this.$uploadStatus.innerHTML = `✗`;
+  fail(reason = "") {
+    this.$uploadStatus.innerHTML = `<span style="width: 20px;" title="${reason}">✗</span><span class="retry-btn" id="retry${this.idx}" title="Retry">↻</span>`;
+    failUploaders.set(this.idx, this);
     Uploader.runnings--;
     Uploader.runQueue();
   }
@@ -801,7 +846,7 @@ function padZero(value, size) {
 }
 
 function formatSize(size) {
-  if (size == null) return []
+  if (size == null) return [0, "B"]
   const sizes = ['B', 'KB', 'MB', 'GB', 'TB'];
   if (size == 0) return [0, "B"];
   const i = parseInt(Math.floor(Math.log(size) / Math.log(1024)));
index a905e054b065ad5c6febd411b5be01bdf68314ce..e101e6fd27b5939a55eba43441581e20c5236d31 100644 (file)
@@ -42,6 +42,7 @@ use std::time::SystemTime;
 use tokio::fs::File;
 use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite};
 use tokio::{fs, io};
+
 use tokio_util::compat::FuturesAsyncWriteCompatExt;
 use tokio_util::io::{ReaderStream, StreamReader};
 use uuid::Uuid;
@@ -57,7 +58,8 @@ const INDEX_JS: &str = include_str!("../assets/index.js");
 const FAVICON_ICO: &[u8] = include_bytes!("../assets/favicon.ico");
 const INDEX_NAME: &str = "index.html";
 const BUF_SIZE: usize = 65536;
-const TEXT_MAX_SIZE: u64 = 4194304; // 4M
+const EDITABLE_TEXT_MAX_SIZE: u64 = 4194304; // 4M
+const RESUMABLE_UPLOAD_MIN_SIZE: u64 = 20971520; // 20M
 
 pub struct Server {
     args: Args,
@@ -327,10 +329,37 @@ impl Server {
                 set_webdav_headers(&mut res);
             }
             Method::PUT => {
-                if !allow_upload || (!allow_delete && is_file && size > 0) {
+                if is_dir || !allow_upload || (!allow_delete && size > 0) {
                     status_forbid(&mut res);
                 } else {
-                    self.handle_upload(path, req, &mut res).await?;
+                    self.handle_upload(path, None, size, req, &mut res).await?;
+                }
+            }
+            Method::PATCH => {
+                if is_miss {
+                    status_not_found(&mut res);
+                } else if !allow_upload {
+                    status_forbid(&mut res);
+                } else {
+                    let offset = match parse_upload_offset(headers, size) {
+                        Ok(v) => v,
+                        Err(err) => {
+                            status_bad_request(&mut res, &err.to_string());
+                            return Ok(res);
+                        }
+                    };
+                    match offset {
+                        Some(offset) => {
+                            if offset < size && !allow_delete {
+                                status_forbid(&mut res);
+                            }
+                            self.handle_upload(path, Some(offset), size, req, &mut res)
+                                .await?;
+                        }
+                        None => {
+                            *res.status_mut() = StatusCode::METHOD_NOT_ALLOWED;
+                        }
+                    }
                 }
             }
             Method::DELETE => {
@@ -417,17 +446,27 @@ impl Server {
         Ok(res)
     }
 
-    async fn handle_upload(&self, path: &Path, req: Request, res: &mut Response) -> Result<()> {
+    async fn handle_upload(
+        &self,
+        path: &Path,
+        upload_offset: Option<u64>,
+        size: u64,
+        req: Request,
+        res: &mut Response,
+    ) -> Result<()> {
         ensure_path_parent(path).await?;
-
-        let mut file = match fs::File::create(&path).await {
-            Ok(v) => v,
-            Err(_) => {
-                status_forbid(res);
-                return Ok(());
+        let (mut file, status) = match upload_offset {
+            None => (fs::File::create(path).await?, StatusCode::CREATED),
+            Some(offset) if offset == size => (
+                fs::OpenOptions::new().append(true).open(path).await?,
+                StatusCode::NO_CONTENT,
+            ),
+            Some(offset) => {
+                let mut file = fs::OpenOptions::new().write(true).open(path).await?;
+                file.seek(SeekFrom::Start(offset)).await?;
+                (file, StatusCode::NO_CONTENT)
             }
         };
-
         let stream = IncomingStream::new(req.into_body());
 
         let body_with_io_error = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
@@ -436,13 +475,19 @@ impl Server {
         pin_mut!(body_reader);
 
         let ret = io::copy(&mut body_reader, &mut file).await;
+        let size = fs::metadata(path)
+            .await
+            .map(|v| v.len())
+            .unwrap_or_default();
         if ret.is_err() {
-            tokio::fs::remove_file(&path).await?;
-
+            if upload_offset.is_none() && size < RESUMABLE_UPLOAD_MIN_SIZE {
+                let _ = tokio::fs::remove_file(&path).await;
+            }
             ret?;
         }
 
-        *res.status_mut() = StatusCode::CREATED;
+        *res.status_mut() = status;
+
         Ok(())
     }
 
@@ -830,7 +875,8 @@ impl Server {
         );
         let mut buffer: Vec<u8> = vec![];
         file.take(1024).read_to_end(&mut buffer).await?;
-        let editable = meta.len() <= TEXT_MAX_SIZE && content_inspector::inspect(&buffer).is_text();
+        let editable =
+            meta.len() <= EDITABLE_TEXT_MAX_SIZE && content_inspector::inspect(&buffer).is_text();
         let data = EditData {
             href,
             kind,
@@ -867,7 +913,7 @@ impl Server {
             Some(v) => match v.to_str().ok().and_then(|v| v.parse().ok()) {
                 Some(v) => v,
                 None => {
-                    *res.status_mut() = StatusCode::BAD_REQUEST;
+                    status_bad_request(res, "");
                     return Ok(());
                 }
             },
@@ -1545,6 +1591,13 @@ fn status_no_content(res: &mut Response) {
     *res.status_mut() = StatusCode::NO_CONTENT;
 }
 
+fn status_bad_request(res: &mut Response, body: &str) {
+    *res.status_mut() = StatusCode::BAD_REQUEST;
+    if !body.is_empty() {
+        *res.body_mut() = body_full(body.to_string());
+    }
+}
+
 fn set_content_diposition(res: &mut Response, inline: bool, filename: &str) -> Result<()> {
     let kind = if inline { "inline" } else { "attachment" };
     let filename: String = filename
@@ -1584,10 +1637,12 @@ fn is_hidden(hidden: &[String], file_name: &str, is_dir_type: bool) -> bool {
 fn set_webdav_headers(res: &mut Response) {
     res.headers_mut().insert(
         "Allow",
-        HeaderValue::from_static("GET,HEAD,PUT,OPTIONS,DELETE,PROPFIND,COPY,MOVE"),
+        HeaderValue::from_static("GET,HEAD,PUT,OPTIONS,DELETE,PATCH,PROPFIND,COPY,MOVE"),
+    );
+    res.headers_mut().insert(
+        "DAV",
+        HeaderValue::from_static("1, 2, 3, sabredav-partialupdate"),
     );
-    res.headers_mut()
-        .insert("DAV", HeaderValue::from_static("1,2"));
 }
 
 async fn get_content_type(path: &Path) -> Result<String> {
@@ -1620,3 +1675,17 @@ async fn get_content_type(path: &Path) -> Result<String> {
     };
     Ok(content_type)
 }
+
+fn parse_upload_offset(headers: &HeaderMap<HeaderValue>, size: u64) -> Result<Option<u64>> {
+    let value = match headers.get("x-update-range") {
+        Some(v) => v,
+        None => return Ok(None),
+    };
+    let err = || anyhow!("Invalid X-Updage-Range header");
+    let value = value.to_str().map_err(|_| err())?;
+    if value == "append" {
+        return Ok(Some(size));
+    }
+    let (start, _) = parse_range(value, size).ok_or_else(err)?;
+    Ok(Some(start))
+}
index 88fb51352561c2df8c763d1b79da2582ba88d009..f5ad7c41d84d2147c5ad46dd070dcd625ac2c4f8 100644 (file)
@@ -250,9 +250,12 @@ fn options_dir(server: TestServer) -> Result<(), Error> {
     assert_eq!(resp.status(), 200);
     assert_eq!(
         resp.headers().get("allow").unwrap(),
-        "GET,HEAD,PUT,OPTIONS,DELETE,PROPFIND,COPY,MOVE"
+        "GET,HEAD,PUT,OPTIONS,DELETE,PATCH,PROPFIND,COPY,MOVE"
+    );
+    assert_eq!(
+        resp.headers().get("dav").unwrap(),
+        "1, 2, 3, sabredav-partialupdate"
     );
-    assert_eq!(resp.headers().get("dav").unwrap(), "1,2");
     Ok(())
 }
 
@@ -330,3 +333,19 @@ fn get_file_content_type(server: TestServer) -> Result<(), Error> {
     );
     Ok(())
 }
+
+#[rstest]
+fn resumable_upload(#[with(&["--allow-upload"])] server: TestServer) -> Result<(), Error> {
+    let url = format!("{}file1", server.url());
+    let resp = fetch!(b"PUT", &url).body(b"abc".to_vec()).send()?;
+    assert_eq!(resp.status(), 201);
+    let resp = fetch!(b"PATCH", &url)
+        .header("X-Update-Range", "append")
+        .body(b"123".to_vec())
+        .send()?;
+    assert_eq!(resp.status(), 204);
+    let resp = reqwest::blocking::get(url)?;
+    assert_eq!(resp.status(), 200);
+    assert_eq!(resp.text().unwrap(), "abc123");
+    Ok(())
+}