"""Nextcloud API for working with the file system."""
import builtins
import os
from pathlib import Path
from urllib.parse import quote
from httpx import Headers
from .._exceptions import NextcloudException, NextcloudExceptionNotFound, check_error
from .._misc import random_string, require_capabilities
from .._session import NcSessionBasic
from . import FsNode, LockType, SystemTag
from ._files import (
PROPFIND_PROPERTIES,
PropFindType,
build_find_request,
build_list_by_criteria_req,
build_list_tag_req,
build_list_tags_response,
build_listdir_req,
build_listdir_response,
build_setfav_req,
build_tags_ids_for_object,
build_update_tag_req,
dav_get_obj_path,
element_tree_as_str,
etag_fileid_from_response,
get_propfind_properties,
lf_parse_webdav_response,
)
from .sharing import _FilesSharingAPI
[docs]
class FilesAPI:
"""Class that encapsulates file system and file sharing API, avalaible as **nc.files.<method>**."""
sharing: _FilesSharingAPI
"""API for managing Files Shares"""
def __init__(self, session: NcSessionBasic):
self._session = session
self.sharing = _FilesSharingAPI(session)
[docs]
def listdir(self, path: str | FsNode = "", depth: int = 1, exclude_self=True) -> list[FsNode]:
"""Returns a list of all entries in the specified directory.
:param path: path to the directory to get the list.
:param depth: how many directory levels should be included in output. Default = **1** (only specified directory)
:param exclude_self: boolean value indicating whether the `path` itself should be excluded from the list or not.
Default = **True**.
"""
if exclude_self and not depth:
raise ValueError("Wrong input parameters, query will return nothing.")
properties = get_propfind_properties(self._session.capabilities)
path = path.user_path if isinstance(path, FsNode) else path
return self._listdir(self._session.user, path, properties=properties, depth=depth, exclude_self=exclude_self)
[docs]
def by_id(self, file_id: int | str | FsNode) -> FsNode | None:
"""Returns :py:class:`~nc_py_api.files.FsNode` by file_id if any.
:param file_id: can be full file ID with Nextcloud instance ID or only clear file ID.
"""
file_id = file_id.file_id if isinstance(file_id, FsNode) else file_id
result = self.find(req=["eq", "fileid", file_id])
return result[0] if result else None
[docs]
def by_path(self, path: str | FsNode) -> FsNode | None:
"""Returns :py:class:`~nc_py_api.files.FsNode` by exact path if any."""
path = path.user_path if isinstance(path, FsNode) else path
result = self.listdir(path, depth=0, exclude_self=False)
return result[0] if result else None
[docs]
def find(self, req: list, path: str | FsNode = "") -> list[FsNode]:
"""Searches a directory for a file or subdirectory with a name.
:param req: list of conditions to search for. Detailed description here...
:param path: path where to search from. Default = **""**.
"""
# `req` possible keys: "name", "mime", "last_modified", "size", "favorite", "fileid"
root = build_find_request(req, path, self._session.user, self._session.capabilities)
webdav_response = self._session.adapter_dav.request(
"SEARCH", "", content=element_tree_as_str(root), headers={"Content-Type": "text/xml"}
)
request_info = f"find: {self._session.user}, {req}, {path}"
return lf_parse_webdav_response(self._session.cfg.dav_url_suffix, webdav_response, request_info)
[docs]
def download(self, path: str | FsNode) -> bytes:
"""Downloads and returns the content of a file."""
path = path.user_path if isinstance(path, FsNode) else path
response = self._session.adapter_dav.get(quote(dav_get_obj_path(self._session.user, path)))
check_error(response, f"download: user={self._session.user}, path={path}")
return response.content
[docs]
def download2stream(self, path: str | FsNode, fp, **kwargs) -> None:
"""Downloads file to the given `fp` object.
:param path: path to download file.
:param fp: filename (string), pathlib.Path object or a file object.
The object must implement the ``file.write`` method and be able to write binary data.
:param kwargs: **chunk_size** an int value specifying chunk size to write. Default = **5Mb**
"""
path = quote(dav_get_obj_path(self._session.user, path.user_path if isinstance(path, FsNode) else path))
self._session.download2stream(path, fp, dav=True, **kwargs)
[docs]
def download_directory_as_zip(self, path: str | FsNode, local_path: str | Path | None = None, **kwargs) -> Path:
"""Downloads a remote directory as zip archive.
:param path: path to directory to download.
:param local_path: relative or absolute file path to save zip file.
:returns: Path to the saved zip archive.
.. note:: This works only for directories, you should not use this to download a file.
"""
path = path.user_path if isinstance(path, FsNode) else path
result_path = local_path if local_path else os.path.basename(path)
with open(result_path, "wb") as fp:
if self._session.nc_version["major"] >= 31:
full_path = dav_get_obj_path(self._session.user, path)
accept_header = f"application/{kwargs.get('format', 'zip')}"
self._session.download2fp(quote(full_path), fp, dav=True, headers={"Accept": accept_header})
else:
self._session.download2fp(
"/index.php/apps/files/ajax/download.php", fp, dav=False, params={"dir": path}, **kwargs
)
return Path(result_path)
[docs]
def upload(self, path: str | FsNode, content: bytes | str) -> FsNode:
"""Creates a file with the specified content at the specified path.
:param path: file's upload path.
:param content: content to create the file. If it is a string, it will be encoded into bytes using UTF-8.
"""
path = path.user_path if isinstance(path, FsNode) else path
full_path = dav_get_obj_path(self._session.user, path)
response = self._session.adapter_dav.put(quote(full_path), content=content)
check_error(response, f"upload: user={self._session.user}, path={path}, size={len(content)}")
return FsNode(full_path.strip("/"), **etag_fileid_from_response(response))
[docs]
def upload_stream(self, path: str | FsNode, fp, **kwargs) -> FsNode:
"""Creates a file with content provided by `fp` object at the specified path.
:param path: file's upload path.
:param fp: filename (string), pathlib.Path object or a file object.
The object must implement the ``file.read`` method providing data with str or bytes type.
:param kwargs: **chunk_size** an int value specifying chunk size to read. Default = **5Mb**
"""
path = path.user_path if isinstance(path, FsNode) else path
chunk_size = kwargs.get("chunk_size", 5 * 1024 * 1024)
if isinstance(fp, str | Path):
with builtins.open(fp, "rb") as f:
return self.__upload_stream(path, f, chunk_size)
elif hasattr(fp, "read"):
return self.__upload_stream(path, fp, chunk_size)
else:
raise TypeError("`fp` must be a path to file or an object with `read` method.")
[docs]
def mkdir(self, path: str | FsNode) -> FsNode:
"""Creates a new directory.
:param path: path of the directory to be created.
"""
path = path.user_path if isinstance(path, FsNode) else path
full_path = dav_get_obj_path(self._session.user, path)
response = self._session.adapter_dav.request("MKCOL", quote(full_path))
check_error(response)
full_path += "/" if not full_path.endswith("/") else ""
return FsNode(full_path.lstrip("/"), **etag_fileid_from_response(response))
[docs]
def makedirs(self, path: str | FsNode, exist_ok=False) -> FsNode | None:
"""Creates a new directory and subdirectories.
:param path: path of the directories to be created.
:param exist_ok: ignore error if any of pathname components already exists.
:returns: `FsNode` if directory was created or ``None`` if it was already created.
"""
_path = ""
path = path.user_path if isinstance(path, FsNode) else path
path = path.lstrip("/")
result = None
for i in Path(path).parts:
_path = f"{_path}/{i}"
if not exist_ok:
result = self.mkdir(_path)
else:
try:
result = self.mkdir(_path)
except NextcloudException as e:
if e.status_code != 405:
raise e from None
return result
[docs]
def delete(self, path: str | FsNode, not_fail=False) -> None:
"""Deletes a file/directory (moves to trash if trash is enabled).
:param path: path to delete.
:param not_fail: if set to ``True`` and the object is not found, it does not raise an exception.
"""
path = path.user_path if isinstance(path, FsNode) else path
response = self._session.adapter_dav.delete(quote(dav_get_obj_path(self._session.user, path)))
if response.status_code == 404 and not_fail:
return
check_error(response)
[docs]
def move(self, path_src: str | FsNode, path_dest: str | FsNode, overwrite=False) -> FsNode:
"""Moves an existing file or a directory.
:param path_src: path of an existing file/directory.
:param path_dest: name of the new one.
:param overwrite: if ``True`` and the destination object already exists, it gets overwritten.
Default = **False**.
"""
path_src = path_src.user_path if isinstance(path_src, FsNode) else path_src
full_dest_path = dav_get_obj_path(
self._session.user, path_dest.user_path if isinstance(path_dest, FsNode) else path_dest
)
dest = self._session.cfg.dav_endpoint + quote(full_dest_path)
headers = Headers({"Destination": dest, "Overwrite": "T" if overwrite else "F"}, encoding="utf-8")
response = self._session.adapter_dav.request(
"MOVE",
quote(dav_get_obj_path(self._session.user, path_src)),
headers=headers,
)
check_error(response, f"move: user={self._session.user}, src={path_src}, dest={dest}, {overwrite}")
return self.find(req=["eq", "fileid", response.headers["OC-FileId"]])[0]
[docs]
def copy(self, path_src: str | FsNode, path_dest: str | FsNode, overwrite=False) -> FsNode:
"""Copies an existing file/directory.
:param path_src: path of an existing file/directory.
:param path_dest: name of the new one.
:param overwrite: if ``True`` and the destination object already exists, it gets overwritten.
Default = **False**.
"""
path_src = path_src.user_path if isinstance(path_src, FsNode) else path_src
full_dest_path = dav_get_obj_path(
self._session.user, path_dest.user_path if isinstance(path_dest, FsNode) else path_dest
)
dest = self._session.cfg.dav_endpoint + quote(full_dest_path)
headers = Headers({"Destination": dest, "Overwrite": "T" if overwrite else "F"}, encoding="utf-8")
response = self._session.adapter_dav.request(
"COPY",
quote(dav_get_obj_path(self._session.user, path_src)),
headers=headers,
)
check_error(response, f"copy: user={self._session.user}, src={path_src}, dest={dest}, {overwrite}")
return self.find(req=["eq", "fileid", response.headers["OC-FileId"]])[0]
[docs]
def list_by_criteria(
self, properties: list[str] | None = None, tags: list[int | SystemTag] | None = None
) -> list[FsNode]:
"""Returns a list of all files/directories for the current user filtered by the specified values.
:param properties: List of ``properties`` that should have been set for the file.
Supported values: **favorite**
:param tags: List of ``tags ids`` or ``SystemTag`` that should have been set for the file.
"""
root = build_list_by_criteria_req(properties, tags, self._session.capabilities)
webdav_response = self._session.adapter_dav.request(
"REPORT", dav_get_obj_path(self._session.user), content=element_tree_as_str(root)
)
request_info = f"list_files_by_criteria: {self._session.user}"
check_error(webdav_response, request_info)
return lf_parse_webdav_response(self._session.cfg.dav_url_suffix, webdav_response, request_info)
[docs]
def setfav(self, path: str | FsNode, value: int | bool) -> None:
"""Sets or unsets favourite flag for specific file.
:param path: path to the object to set the state.
:param value: value to set for the ``favourite`` state.
"""
path = path.user_path if isinstance(path, FsNode) else path
root = build_setfav_req(value)
webdav_response = self._session.adapter_dav.request(
"PROPPATCH", quote(dav_get_obj_path(self._session.user, path)), content=element_tree_as_str(root)
)
check_error(webdav_response, f"setfav: path={path}, value={value}")
[docs]
def trashbin_list(self) -> list[FsNode]:
"""Returns a list of all entries in the TrashBin."""
properties = PROPFIND_PROPERTIES
properties += ["nc:trashbin-filename", "nc:trashbin-original-location", "nc:trashbin-deletion-time"]
return self._listdir(
self._session.user, "", properties=properties, depth=1, exclude_self=False, prop_type=PropFindType.TRASHBIN
)
[docs]
def trashbin_restore(self, path: str | FsNode) -> None:
"""Restore a file/directory from the TrashBin.
:param path: path to delete, e.g., the ``user_path`` field from ``FsNode`` or the **FsNode** class itself.
"""
restore_name = path.name if isinstance(path, FsNode) else path.split("/", maxsplit=1)[-1]
path = path.user_path if isinstance(path, FsNode) else path
dest = self._session.cfg.dav_endpoint + f"/trashbin/{self._session.user}/restore/{restore_name}"
headers = Headers({"Destination": dest}, encoding="utf-8")
response = self._session.adapter_dav.request(
"MOVE",
quote(f"/trashbin/{self._session.user}/{path}"),
headers=headers,
)
check_error(response, f"trashbin_restore: user={self._session.user}, src={path}, dest={dest}")
[docs]
def trashbin_delete(self, path: str | FsNode, not_fail=False) -> None:
"""Deletes a file/directory permanently from the TrashBin.
:param path: path to delete, e.g., the ``user_path`` field from ``FsNode`` or the **FsNode** class itself.
:param not_fail: if set to ``True`` and the object is not found, it does not raise an exception.
"""
path = path.user_path if isinstance(path, FsNode) else path
response = self._session.adapter_dav.delete(quote(f"/trashbin/{self._session.user}/{path}"))
if response.status_code == 404 and not_fail:
return
check_error(response)
[docs]
def trashbin_cleanup(self) -> None:
"""Empties the TrashBin."""
check_error(self._session.adapter_dav.delete(f"/trashbin/{self._session.user}/trash"))
[docs]
def get_versions(self, file_object: FsNode) -> list[FsNode]:
"""Returns a list of all file versions if any."""
require_capabilities("files.versioning", self._session.capabilities)
return self._listdir(
self._session.user,
str(file_object.info.fileid) if file_object.info.fileid else file_object.file_id,
properties=PROPFIND_PROPERTIES,
depth=1,
exclude_self=False,
prop_type=PropFindType.VERSIONS_FILEID if file_object.info.fileid else PropFindType.VERSIONS_FILE_ID,
)
[docs]
def restore_version(self, file_object: FsNode) -> None:
"""Restore a file with specified version.
:param file_object: The **FsNode** class from :py:meth:`~nc_py_api.files.files.FilesAPI.get_versions`.
"""
require_capabilities("files.versioning", self._session.capabilities)
dest = self._session.cfg.dav_endpoint + f"/versions/{self._session.user}/restore/{file_object.name}"
headers = Headers({"Destination": dest}, encoding="utf-8")
response = self._session.adapter_dav.request(
"MOVE",
quote(f"/versions/{self._session.user}/{file_object.user_path}"),
headers=headers,
)
check_error(response, f"restore_version: user={self._session.user}, src={file_object.user_path}")
[docs]
def create_tag(self, name: str, user_visible: bool = True, user_assignable: bool = True) -> None:
"""Creates a new Tag.
:param name: Name of the tag.
:param user_visible: Should be Tag visible in the UI.
:param user_assignable: Can Tag be assigned from the UI.
"""
response = self._session.adapter_dav.post(
"/systemtags",
json={
"name": name,
"userVisible": user_visible,
"userAssignable": user_assignable,
},
)
check_error(response, info=f"create_tag({name})")
[docs]
def update_tag(
self,
tag_id: int | SystemTag,
name: str | None = None,
user_visible: bool | None = None,
user_assignable: bool | None = None,
) -> None:
"""Updates the Tag information."""
tag_id = tag_id.tag_id if isinstance(tag_id, SystemTag) else tag_id
root = build_update_tag_req(name, user_visible, user_assignable)
response = self._session.adapter_dav.request(
"PROPPATCH", f"/systemtags/{tag_id}", content=element_tree_as_str(root)
)
check_error(response)
[docs]
def delete_tag(self, tag_id: int | SystemTag) -> None:
"""Deletes the tag."""
tag_id = tag_id.tag_id if isinstance(tag_id, SystemTag) else tag_id
response = self._session.adapter_dav.delete(f"/systemtags/{tag_id}")
check_error(response)
[docs]
def tag_by_name(self, tag_name: str) -> SystemTag:
"""Returns Tag info by its name if found or ``None`` otherwise."""
r = [i for i in self.list_tags() if i.display_name == tag_name]
if not r:
raise NextcloudExceptionNotFound(f"Tag with name='{tag_name}' not found.")
return r[0]
[docs]
def assign_tag(self, file_id: FsNode | int, tag_id: SystemTag | int) -> None:
"""Assigns Tag to a file/directory."""
self._file_change_tag_state(file_id, tag_id, True)
[docs]
def unassign_tag(self, file_id: FsNode | int, tag_id: SystemTag | int) -> None:
"""Removes Tag from a file/directory."""
self._file_change_tag_state(file_id, tag_id, False)
[docs]
def lock(self, path: FsNode | str, lock_type: LockType = LockType.MANUAL_LOCK) -> None:
"""Locks the file.
.. note:: Exception codes: 423 - existing lock present.
"""
require_capabilities("files.locking", self._session.capabilities)
full_path = dav_get_obj_path(self._session.user, path.user_path if isinstance(path, FsNode) else path)
response = self._session.adapter_dav.request(
"LOCK",
quote(full_path),
headers={"X-User-Lock": "1", "X-User-Lock-Type": str(lock_type.value)},
)
check_error(response, f"lock: user={self._session.user}, path={full_path}")
[docs]
def unlock(self, path: FsNode | str) -> None:
"""Unlocks the file.
.. note:: Exception codes: 412 - the file is not locked, 423 - the lock is owned by another user.
"""
require_capabilities("files.locking", self._session.capabilities)
full_path = dav_get_obj_path(self._session.user, path.user_path if isinstance(path, FsNode) else path)
response = self._session.adapter_dav.request(
"UNLOCK",
quote(full_path),
headers={"X-User-Lock": "1"},
)
check_error(response, f"unlock: user={self._session.user}, path={full_path}")
def _file_change_tag_state(self, file_id: FsNode | int, tag_id: SystemTag | int, tag_state: bool) -> None:
fs_object = file_id.info.fileid if isinstance(file_id, FsNode) else file_id
tag = tag_id.tag_id if isinstance(tag_id, SystemTag) else tag_id
response = self._session.adapter_dav.request(
"PUT" if tag_state else "DELETE", f"/systemtags-relations/files/{fs_object}/{tag}"
)
check_error(
response,
info=f"({'Adding' if tag_state else 'Removing'} `{tag}` {'to' if tag_state else 'from'} {fs_object})",
)
def _listdir(
self,
user: str,
path: str,
properties: list[str],
depth: int,
exclude_self: bool,
prop_type: PropFindType = PropFindType.DEFAULT,
) -> list[FsNode]:
root, dav_path = build_listdir_req(user, path, properties, prop_type)
webdav_response = self._session.adapter_dav.request(
"PROPFIND",
quote(dav_path),
content=element_tree_as_str(root),
headers={"Depth": "infinity" if depth == -1 else str(depth)},
)
return build_listdir_response(
self._session.cfg.dav_url_suffix, webdav_response, user, path, properties, exclude_self, prop_type
)
def __upload_stream(self, path: str, fp, chunk_size: int) -> FsNode:
_tmp_path = "nc-py-api-" + random_string(56)
_dav_path = quote(dav_get_obj_path(self._session.user, _tmp_path, root_path="/uploads"))
_v2 = bool(self._session.cfg.options.upload_chunk_v2 and chunk_size >= 5 * 1024 * 1024)
full_path = dav_get_obj_path(self._session.user, path)
headers = Headers({"Destination": self._session.cfg.dav_endpoint + quote(full_path)}, encoding="utf-8")
if _v2:
response = self._session.adapter_dav.request("MKCOL", _dav_path, headers=headers)
else:
response = self._session.adapter_dav.request("MKCOL", _dav_path)
check_error(response)
try:
start_bytes = end_bytes = 0
chunk_number = 1
while True:
piece = fp.read(chunk_size)
if not piece:
break
end_bytes = start_bytes + len(piece)
if _v2:
response = self._session.adapter_dav.put(
_dav_path + "/" + str(chunk_number), content=piece, headers=headers
)
else:
_filename = str(start_bytes).rjust(15, "0") + "-" + str(end_bytes).rjust(15, "0")
response = self._session.adapter_dav.put(_dav_path + "/" + _filename, content=piece)
check_error(
response,
f"upload_stream(v={_v2}): user={self._session.user}, path={path}, cur_size={end_bytes}",
)
start_bytes = end_bytes
chunk_number += 1
response = self._session.adapter_dav.request(
"MOVE",
_dav_path + "/.file",
headers=headers,
)
check_error(
response,
f"upload_stream(v={_v2}): user={self._session.user}, path={path}, total_size={end_bytes}",
)
return FsNode(full_path.strip("/"), **etag_fileid_from_response(response))
finally:
self._session.adapter_dav.delete(_dav_path)