Module pachyderm_sdk.api.pfs.extension

Handwritten classes/methods that augment the existing PFS API.

Expand source code
"""Handwritten classes/methods that augment the existing PFS API."""
import io
import os
from contextlib import contextmanager
from dataclasses import fields
from functools import wraps
from pathlib import Path
from typing import Callable, ContextManager, Iterable, List, Union, TYPE_CHECKING

from betterproto.lib.google.protobuf import Empty
import grpc

from ...errors import InvalidTransactionOperation
from . import ApiStub as _GeneratedApiStub
from . import (
    Branch,
    Commit,
    CommitInfo,
    CommitSet,
    CommitState,
    File,
    Project,
    Repo,
    ModifyFileRequest,
    AddFile,
    AddFileUrlSource,
    CopyFile,
    DeleteFile,
)
from .file import PFSFile, PFSTarFile

if TYPE_CHECKING:
    from _typeshed import SupportsRead

BUFFER_SIZE = 19 * 1024 * 1024  # 19MB

__all__ = ("ApiStub", "ClosedCommit", "OpenCommit")


def transaction_incompatible(pfs_method: Callable) -> Callable:
    """Decorator for marking methods of the PFS API which are
    not allowed to occur during a transaction."""

    @wraps(pfs_method)
    def wrapper(stub: "ApiStub", *args, **kwargs):
        if stub.within_transaction:
            raise InvalidTransactionOperation()
        return pfs_method(stub, *args, **kwargs)

    return wrapper


class ClosedCommit(Commit):
    """A ClosedCommit is an extension of the pfs.Commit message with some
    helpful methods. Cannot write to a closed commit.
    """

    def __init__(self, commit: "Commit", stub: "ApiStub"):
        """Internal Use: Do not create this object yourself.

        Parameters
        ----------
        commit : pfs.Commit
            The commit.
        stub : pfs.ApiStub
            The API class to route requests though.
        """
        self._commit = commit
        self._stub = stub

        # This is required to maintain serialization capabilities while being
        #   future compatible with any new fields to the pfs.Commit message.
        super().__init__(
            **{field.name: getattr(commit, field.name) for field in fields(commit)}
        )

    def wait(self) -> "CommitInfo":
        """Waits until the commit is finished being created.

        See OpenCommit docstring for an example.
        """
        return self._stub.wait_commit(self)

    def wait_all(self) -> List["CommitInfo"]:
        """Similar to Commit.wait but streams back the pfs.CommitInfo
        from all the downstream jobs that were initiated by this commit.
        """
        return self._stub.wait_commit_set(CommitSet(id=self._commit.id))


class OpenCommit(ClosedCommit):
    """An OpenCommit is an extension of the pfs.Commit message with some
    helpful methods that provide a more intuitive UX when writing to a commit.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("data@master")) as commit:
    >>>     commit.put_file_from_bytes("/greeting.txt", b"Hello!")
    >>>     commit.delete_file("/rude/insult.txt")
    >>> commit.wait()
    """

    def __init__(self, commit: "Commit", stub: "ApiStub"):
        """Internal Use: Do not create this object yourself.

        Parameters
        ----------
        commit : pfs.Commit
            The "open" commit to write to.
        stub : pfs.ApiStub
            The API class to route requests though.
        """
        self._commit = commit
        self._stub = stub
        super().__init__(commit, stub)

    def _close(self):
        """Transform an OpenCommit into a ClosedCommit."""
        self.__class__ = ClosedCommit

    def put_files(self, *, source: Union[Path, str], path: str) -> None:
        """Recursively insert the contents of source into the open commit under path,
        matching the directory structure of source.

        This is roughly equivalent to ``pachctl put file -r``

        Parameters
        ----------
        source : Union[Path, str]
            The directory to recursively insert content from.
        path : str
            The destination path in PFS.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> branch = pfs.Branch.from_uri("images@master")
        >>> with client.pfs.commit(branch=branch) as commit:
        >>>     commit.put_files(source="path/to/local/files", path="/")
        """
        self._stub.put_files(commit=self._commit, source=source, path=path)

    def put_file_from_bytes(self, path: str, data: bytes, append: bool = False) -> "File":
        """Uploads a PFS file from a bytestring.

        Parameters
        ----------
        path : str
            The path in the repo the data will be written to.
        data : bytes
            The file contents as bytes.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the uploaded file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.put_file_from_bytes(path="/file.txt", data=b"SOME BYTES")
        """
        self._stub.put_file_from_bytes(commit=self, path=path, data=data, append=append)
        return File(commit=self._commit, path=path)

    def put_file_from_url(
        self,
        *,
        path: str,
        url: str,
        recursive: bool = False,
    ) -> "File":
        """Uploads a PFS file from an url.

        Parameters
        ----------
        path : str
            The path in the repo the data will be written to.
        url : str
            The URL of the file to put.
        recursive : bool
            If true, allows for recursive scraping on some types URLs, for
            example on s3:// URLs

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the uploaded file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.put_file_from_url(
        >>>         path="/index.html", url="https://www.pachyderm.com/index.html"
        >>>     )
        """
        self._stub.put_file_from_url(commit=self, path=path, url=url, recursive=recursive)
        return File(commit=self._commit, path=path)

    def put_file_from_file(
        self, *, path: str, file: "SupportsRead[bytes]", append: bool = False
    ) -> "File":
        """Uploads a PFS file from an open file object.

        Parameters
        ----------
        path : str
            The path in the repo the data will be written to.
        file : SupportsRead[bytes]
            An open file object to read the data from.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the uploaded file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     with open("local_file.dat", "rb") as source:
        >>>         commit.put_file_from_file(path="/index.html", file=source)
        """
        self._stub.put_file_from_file(commit=self, path=path, file=file, append=append)
        return File(commit=self._commit, path=path)

    def copy_file(self, *, src: "File", dst: str, append: bool = False) -> "File":
        """Copies a file within PFS

        Parameters
        ----------
        src : pfs.File
            This file to be copied.
        dst : str
            The destination of the file, as a string path.
        append : bool
            If true, appends the contents of src to dst if it exists.
            Otherwise, overwrites the file.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the new file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> source = pfs.File.from_uri("images@master:/file.dat")
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.copy_file(src=source, dst="/copy.dat")
        """
        self._stub.copy_file(commit=self, src=src, dst=dst, append=append)
        return File(commit=self._commit, path=dst)

    def delete_file(self, *, path: str) -> "File":
        """Copies a file within PFS

        Parameters
        ----------
        path : str
            The path of the file to be deleted.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the deleted file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.delete_file(path="/file.dat")
        """
        self._stub.delete_file(commit=self, path=path)
        return File(commit=self._commit, path=path)


class ApiStub(_GeneratedApiStub):
    """An extension to the API stub generated from the PFS protobufs."""

    def __init__(
        self,
        channel: grpc.Channel,
        *,
        get_transaction_id: Callable[[], str],
    ):
        self._get_transaction_id = get_transaction_id
        super().__init__(channel=channel)

    @property
    def within_transaction(self) -> bool:
        """For internal use.

        Whether the client is currently within a transaction.
        """
        return bool(self._get_transaction_id())

    @contextmanager
    def commit(
        self, *, parent: "Commit" = None, description: str = "", branch: "Branch" = None
    ) -> ContextManager["OpenCommit"]:
        """A context manager for running operations within a commit.

        When inside this context, the returned object is an OpenCommit which accepts
          write-operations. Upon exiting the context, the commit is closed and the
          OpenCommit becomes a ClosedCommit and no longer allowing write-operations.

        Parameters
        ----------
        parent : pfs.Commit, optional
            The parent commit of the new commit. parent may be empty in which case
            the commit that Branch points to will be used as the parent.
            If the branch does not exist, the commit will have no parent.
        description : str, optional
            A description of the commit.
        branch : pfs.Branch, optional
            The branch where the commit is created.

        Yields
        -------
        pfs.Commit
            A protobuf object that represents a commit.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     c.delete_file(c, "/dir/delete_me.txt")
        >>>     c.put_file_from_bytes(c, "/new_file.txt", b"DATA")
        """
        commit = self.start_commit(parent=parent, description=description, branch=branch)
        commit_obj = OpenCommit(commit=commit, stub=self)
        try:
            yield commit_obj
        finally:
            commit_obj._close()
            self.finish_commit(commit=commit)

    def wait_commit(self, commit: "Commit") -> "CommitInfo":
        """Waits until the commit is finished being created."""
        return self.inspect_commit(commit=commit, wait=CommitState.FINISHED)

    def wait_commit_set(self, commit_set: "CommitSet") -> List["CommitInfo"]:
        """Similar to client.pfs.wait_commit but streams back the pfs.CommitInfo
        from all the downstream jobs that were initiated by this commit.
        """
        return list(self.inspect_commit_set(commit_set=commit_set, wait=True))

    @transaction_incompatible
    def put_files(self, *, commit: "Commit", source: Union[Path, str], path: str) -> None:
        """Recursively insert the contents of source into the open commit under path,
        matching the directory structure of source.

        This is roughly equivalent to ``pachctl put file -r``

        Note: This method opens multiple gRPC streams and this appears to break in
          some REPL environment (such as those based in IDEs). If you encounter this
          problem, please try a different REPL environment or run as a script.

        Parameters
        ----------
        commit : pfs.Commit
            The open commit to add files to.
        source : Union[Path, str]
            The directory to recursively insert content from.
        path : str
            The destination path in PFS.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     client.pfs.put_files(commit=c, source="path/to/local/files", path="/")
        """
        source = Path(source)
        if not source.exists():
            raise FileNotFoundError(f"source does not exist: {source}")
        if not source.is_dir():
            raise NotADirectoryError(f"source is not a directory: {source}")
        for root, _, filenames in os.walk(source):
            for filename in filenames:
                src = os.path.join(root, filename)
                dst = os.path.join(path, os.path.relpath(src, start=source))
                with open(src, "rb") as file:
                    self.put_file_from_file(commit=commit, path=dst, file=file)

    @transaction_incompatible
    def put_file_from_bytes(
        self, *, commit: "Commit", path: str, data: bytes, append: bool = False
    ) -> Empty:
        """Uploads a PFS file from a bytestring.

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path in the repo the data will be written to.
        data : bytes
            The file contents as bytes.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     client.pfs.put_file_from_bytes(
        >>>         commit=c, path="/file.txt", data=b"SOME BYTES"
        >>>     )
        """
        return self.put_file_from_file(
            commit=commit, path=path, file=io.BytesIO(data), append=append
        )

    @transaction_incompatible
    def put_file_from_url(
        self,
        *,
        commit: "Commit",
        path: str,
        url: str,
        recursive: bool = False,
    ) -> Empty:
        """Uploads a PFS file from an url.

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path in the repo the data will be written to.
        url : str
            The URL of the file to put.
        recursive : bool
            If true, allows for recursive scraping on some types URLs, for
            example on s3:// URLs

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     client.pfs.put_file_from_url(
        >>>         commit=c, path="/index.html", url="www.pachyderm.com/index.html"
        >>>     )
        """
        operations = [
            ModifyFileRequest(set_commit=commit),
            ModifyFileRequest(delete_file=DeleteFile(path=path)),
            ModifyFileRequest(
                add_file=AddFile(
                    path=path, url=AddFileUrlSource(url=url, recursive=recursive)
                )
            ),
        ]
        return self.modify_file(iter(operations))

    @transaction_incompatible
    def put_file_from_file(
        self,
        *,
        commit: "Commit",
        path: str,
        file: "SupportsRead[bytes]",
        append: bool = False,
    ) -> Empty:
        """Uploads a PFS file from an open file object.

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path in the repo the data will be written to.
        file : SupportsRead[bytes]
            An open file object to read the data from.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     with open("local_file.dat", "rb") as source:
        >>>         client.pfs.put_file_from_file(
        >>>             commit=c, path="/index.html", file=source
        >>>         )
        """
        check = file.read(BUFFER_SIZE)
        if len(check) > 0:
            if not isinstance(check, bytes):
                raise TypeError("File must output bytes")

        def file_iterator():
            if not check:
                return
            yield check
            while True:
                data = file.read(BUFFER_SIZE)
                if len(data) == 0:
                    return
                yield data

        def operations() -> Iterable[ModifyFileRequest]:
            yield ModifyFileRequest(set_commit=commit)
            if not append:
                yield ModifyFileRequest(delete_file=DeleteFile(path=path))
            yield ModifyFileRequest(add_file=AddFile(path=path, raw=b""))
            for data in file_iterator():
                yield ModifyFileRequest(add_file=AddFile(path=path, raw=data))

        return self.modify_file(operations())

    @transaction_incompatible
    def copy_file(
        self, *, commit: "Commit", src: "File", dst: str, append: bool = False
    ) -> Empty:
        """Copies a file within PFS

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        src : pfs.File
            This file to be copied.
        dst : str
            The destination of the file, as a string path.
        append : bool
            If true, appends the contents of src to dst if it exists.
            Otherwise, overwrites the file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> source = pfs.File.from_uri("images@master:/file.dat")
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     commit.pfs.copy_file(commit=c, src=source, dst="/copy.dat")
        """
        operations = [
            ModifyFileRequest(set_commit=commit),
            ModifyFileRequest(copy_file=CopyFile(dst=dst, src=src, append=append)),
        ]
        return self.modify_file(iter(operations))

    @transaction_incompatible
    def delete_file(self, *, commit: "Commit", path: str) -> Empty:
        """Copies a file within PFS

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path of the file to be deleted.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     commit.pfs.delete_file(commit=c, path="/file.dat")
        """
        operations = [
            ModifyFileRequest(set_commit=commit),
            ModifyFileRequest(delete_file=DeleteFile(path=path)),
        ]
        return self.modify_file(iter(operations))

    def project_exists(self, project: "Project") -> bool:
        """Checks whether a project exists.

        Parameters
        ----------
        project: pfs.Project
            The project to check.

        Returns
        -------
        bool
            Whether the project exists.
        """
        try:
            self.inspect_project(project=project)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def repo_exists(self, repo: "Repo") -> bool:
        """Checks whether a repo exists.

        Parameters
        ----------
        repo: pfs.Repo
            The repo to check.

        Returns
        -------
        bool
            Whether the repo exists.
        """
        try:
            self.inspect_repo(repo=repo)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def branch_exists(self, branch: "Branch") -> bool:
        """Checks whether a branch exists.

        Parameters
        ----------
        branch: pfs.Branch
            The branch to check.

        Returns
        -------
        bool
            Whether the branch exists.
        """
        try:
            self.inspect_branch(branch=branch)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def commit_exists(self, commit: "Commit") -> bool:
        """Checks whether a commit exists.

        Parameters
        ----------
        commit: pfs.Commit
            The commit to check.

        Returns
        -------
        bool
            Whether the commit exists.
        """
        try:
            self.inspect_commit(commit=commit)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def path_exists(self, file: "File") -> bool:
        """Checks whether the path exists in the specified commit, agnostic to
        whether `path` is a file or a directory.

        Parameters
        ----------
        file : pfs.File
            The file (or directory) to check.

        Raises
        ------
        ValueError: If commit does not exist.

        Returns
        -------
        bool
            True if the path exists.
        """
        try:
            self.inspect_commit(commit=file.commit)
        except grpc.RpcError as e:
            raise ValueError("commit does not exist") from e

        try:
            self.inspect_file(file=file)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def pfs_file(self, file: "File") -> "PFSFile":
        """Wraps the response stream of a client.pfs.get_file() call with a
        PFSFile object. This wrapper class allows you to interact with the
        file stream as a normal file object.

        Parameters
        ----------
        file : pfs.File
            The file to retrieve.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> source = pfs.File.from_uri("images@master:/example.csv")
        >>> with client.pfs.pfs_file(file=source) as pfs_file:
        >>>     for line in pfs_file:
        >>>         print(line)
        """
        stream = self.get_file(file=file)
        return PFSFile(stream)

    def pfs_tar_file(self, file: "File") -> "PFSTarFile":
        """Wraps the response stream of a client.pfs.get_tar_file() call with a
        PFSTarFile object. This wrapper class allows you to interact with the
        file stream as a standard tarfile.TarFile object.

        Parameters
        ----------
        file : pfs.File
            The file (or directory) to retrieve.
        """
        stream = self.get_file_tar(file=file)
        return PFSTarFile.open(fileobj=PFSFile(stream), mode="r|*")

Classes

class ApiStub (channel: grpc.Channel, *, get_transaction_id: Callable[[], str])

An extension to the API stub generated from the PFS protobufs.

Expand source code
class ApiStub(_GeneratedApiStub):
    """An extension to the API stub generated from the PFS protobufs."""

    def __init__(
        self,
        channel: grpc.Channel,
        *,
        get_transaction_id: Callable[[], str],
    ):
        self._get_transaction_id = get_transaction_id
        super().__init__(channel=channel)

    @property
    def within_transaction(self) -> bool:
        """For internal use.

        Whether the client is currently within a transaction.
        """
        return bool(self._get_transaction_id())

    @contextmanager
    def commit(
        self, *, parent: "Commit" = None, description: str = "", branch: "Branch" = None
    ) -> ContextManager["OpenCommit"]:
        """A context manager for running operations within a commit.

        When inside this context, the returned object is an OpenCommit which accepts
          write-operations. Upon exiting the context, the commit is closed and the
          OpenCommit becomes a ClosedCommit and no longer allowing write-operations.

        Parameters
        ----------
        parent : pfs.Commit, optional
            The parent commit of the new commit. parent may be empty in which case
            the commit that Branch points to will be used as the parent.
            If the branch does not exist, the commit will have no parent.
        description : str, optional
            A description of the commit.
        branch : pfs.Branch, optional
            The branch where the commit is created.

        Yields
        -------
        pfs.Commit
            A protobuf object that represents a commit.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     c.delete_file(c, "/dir/delete_me.txt")
        >>>     c.put_file_from_bytes(c, "/new_file.txt", b"DATA")
        """
        commit = self.start_commit(parent=parent, description=description, branch=branch)
        commit_obj = OpenCommit(commit=commit, stub=self)
        try:
            yield commit_obj
        finally:
            commit_obj._close()
            self.finish_commit(commit=commit)

    def wait_commit(self, commit: "Commit") -> "CommitInfo":
        """Waits until the commit is finished being created."""
        return self.inspect_commit(commit=commit, wait=CommitState.FINISHED)

    def wait_commit_set(self, commit_set: "CommitSet") -> List["CommitInfo"]:
        """Similar to client.pfs.wait_commit but streams back the pfs.CommitInfo
        from all the downstream jobs that were initiated by this commit.
        """
        return list(self.inspect_commit_set(commit_set=commit_set, wait=True))

    @transaction_incompatible
    def put_files(self, *, commit: "Commit", source: Union[Path, str], path: str) -> None:
        """Recursively insert the contents of source into the open commit under path,
        matching the directory structure of source.

        This is roughly equivalent to ``pachctl put file -r``

        Note: This method opens multiple gRPC streams and this appears to break in
          some REPL environment (such as those based in IDEs). If you encounter this
          problem, please try a different REPL environment or run as a script.

        Parameters
        ----------
        commit : pfs.Commit
            The open commit to add files to.
        source : Union[Path, str]
            The directory to recursively insert content from.
        path : str
            The destination path in PFS.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     client.pfs.put_files(commit=c, source="path/to/local/files", path="/")
        """
        source = Path(source)
        if not source.exists():
            raise FileNotFoundError(f"source does not exist: {source}")
        if not source.is_dir():
            raise NotADirectoryError(f"source is not a directory: {source}")
        for root, _, filenames in os.walk(source):
            for filename in filenames:
                src = os.path.join(root, filename)
                dst = os.path.join(path, os.path.relpath(src, start=source))
                with open(src, "rb") as file:
                    self.put_file_from_file(commit=commit, path=dst, file=file)

    @transaction_incompatible
    def put_file_from_bytes(
        self, *, commit: "Commit", path: str, data: bytes, append: bool = False
    ) -> Empty:
        """Uploads a PFS file from a bytestring.

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path in the repo the data will be written to.
        data : bytes
            The file contents as bytes.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     client.pfs.put_file_from_bytes(
        >>>         commit=c, path="/file.txt", data=b"SOME BYTES"
        >>>     )
        """
        return self.put_file_from_file(
            commit=commit, path=path, file=io.BytesIO(data), append=append
        )

    @transaction_incompatible
    def put_file_from_url(
        self,
        *,
        commit: "Commit",
        path: str,
        url: str,
        recursive: bool = False,
    ) -> Empty:
        """Uploads a PFS file from an url.

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path in the repo the data will be written to.
        url : str
            The URL of the file to put.
        recursive : bool
            If true, allows for recursive scraping on some types URLs, for
            example on s3:// URLs

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     client.pfs.put_file_from_url(
        >>>         commit=c, path="/index.html", url="www.pachyderm.com/index.html"
        >>>     )
        """
        operations = [
            ModifyFileRequest(set_commit=commit),
            ModifyFileRequest(delete_file=DeleteFile(path=path)),
            ModifyFileRequest(
                add_file=AddFile(
                    path=path, url=AddFileUrlSource(url=url, recursive=recursive)
                )
            ),
        ]
        return self.modify_file(iter(operations))

    @transaction_incompatible
    def put_file_from_file(
        self,
        *,
        commit: "Commit",
        path: str,
        file: "SupportsRead[bytes]",
        append: bool = False,
    ) -> Empty:
        """Uploads a PFS file from an open file object.

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path in the repo the data will be written to.
        file : SupportsRead[bytes]
            An open file object to read the data from.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     with open("local_file.dat", "rb") as source:
        >>>         client.pfs.put_file_from_file(
        >>>             commit=c, path="/index.html", file=source
        >>>         )
        """
        check = file.read(BUFFER_SIZE)
        if len(check) > 0:
            if not isinstance(check, bytes):
                raise TypeError("File must output bytes")

        def file_iterator():
            if not check:
                return
            yield check
            while True:
                data = file.read(BUFFER_SIZE)
                if len(data) == 0:
                    return
                yield data

        def operations() -> Iterable[ModifyFileRequest]:
            yield ModifyFileRequest(set_commit=commit)
            if not append:
                yield ModifyFileRequest(delete_file=DeleteFile(path=path))
            yield ModifyFileRequest(add_file=AddFile(path=path, raw=b""))
            for data in file_iterator():
                yield ModifyFileRequest(add_file=AddFile(path=path, raw=data))

        return self.modify_file(operations())

    @transaction_incompatible
    def copy_file(
        self, *, commit: "Commit", src: "File", dst: str, append: bool = False
    ) -> Empty:
        """Copies a file within PFS

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        src : pfs.File
            This file to be copied.
        dst : str
            The destination of the file, as a string path.
        append : bool
            If true, appends the contents of src to dst if it exists.
            Otherwise, overwrites the file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> source = pfs.File.from_uri("images@master:/file.dat")
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     commit.pfs.copy_file(commit=c, src=source, dst="/copy.dat")
        """
        operations = [
            ModifyFileRequest(set_commit=commit),
            ModifyFileRequest(copy_file=CopyFile(dst=dst, src=src, append=append)),
        ]
        return self.modify_file(iter(operations))

    @transaction_incompatible
    def delete_file(self, *, commit: "Commit", path: str) -> Empty:
        """Copies a file within PFS

        Parameters
        ----------
        commit : pfs.Commit
            An open commit to modify.
        path : str
            The path of the file to be deleted.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
        >>>     commit.pfs.delete_file(commit=c, path="/file.dat")
        """
        operations = [
            ModifyFileRequest(set_commit=commit),
            ModifyFileRequest(delete_file=DeleteFile(path=path)),
        ]
        return self.modify_file(iter(operations))

    def project_exists(self, project: "Project") -> bool:
        """Checks whether a project exists.

        Parameters
        ----------
        project: pfs.Project
            The project to check.

        Returns
        -------
        bool
            Whether the project exists.
        """
        try:
            self.inspect_project(project=project)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def repo_exists(self, repo: "Repo") -> bool:
        """Checks whether a repo exists.

        Parameters
        ----------
        repo: pfs.Repo
            The repo to check.

        Returns
        -------
        bool
            Whether the repo exists.
        """
        try:
            self.inspect_repo(repo=repo)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def branch_exists(self, branch: "Branch") -> bool:
        """Checks whether a branch exists.

        Parameters
        ----------
        branch: pfs.Branch
            The branch to check.

        Returns
        -------
        bool
            Whether the branch exists.
        """
        try:
            self.inspect_branch(branch=branch)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def commit_exists(self, commit: "Commit") -> bool:
        """Checks whether a commit exists.

        Parameters
        ----------
        commit: pfs.Commit
            The commit to check.

        Returns
        -------
        bool
            Whether the commit exists.
        """
        try:
            self.inspect_commit(commit=commit)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def path_exists(self, file: "File") -> bool:
        """Checks whether the path exists in the specified commit, agnostic to
        whether `path` is a file or a directory.

        Parameters
        ----------
        file : pfs.File
            The file (or directory) to check.

        Raises
        ------
        ValueError: If commit does not exist.

        Returns
        -------
        bool
            True if the path exists.
        """
        try:
            self.inspect_commit(commit=file.commit)
        except grpc.RpcError as e:
            raise ValueError("commit does not exist") from e

        try:
            self.inspect_file(file=file)
            return True
        except grpc.RpcError as err:
            err: grpc.Call
            if err.code() == grpc.StatusCode.NOT_FOUND:
                return False
            raise err

    def pfs_file(self, file: "File") -> "PFSFile":
        """Wraps the response stream of a client.pfs.get_file() call with a
        PFSFile object. This wrapper class allows you to interact with the
        file stream as a normal file object.

        Parameters
        ----------
        file : pfs.File
            The file to retrieve.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> source = pfs.File.from_uri("images@master:/example.csv")
        >>> with client.pfs.pfs_file(file=source) as pfs_file:
        >>>     for line in pfs_file:
        >>>         print(line)
        """
        stream = self.get_file(file=file)
        return PFSFile(stream)

    def pfs_tar_file(self, file: "File") -> "PFSTarFile":
        """Wraps the response stream of a client.pfs.get_tar_file() call with a
        PFSTarFile object. This wrapper class allows you to interact with the
        file stream as a standard tarfile.TarFile object.

        Parameters
        ----------
        file : pfs.File
            The file (or directory) to retrieve.
        """
        stream = self.get_file_tar(file=file)
        return PFSTarFile.open(fileobj=PFSFile(stream), mode="r|*")

Ancestors

Instance variables

var within_transaction : bool

For internal use.

Whether the client is currently within a transaction.

Expand source code
@property
def within_transaction(self) -> bool:
    """For internal use.

    Whether the client is currently within a transaction.
    """
    return bool(self._get_transaction_id())

Methods

def commit(self, *, parent: Commit = None, description: str = '', branch: Branch = None) ‑> ContextManager[OpenCommit]

A context manager for running operations within a commit.

When inside this context, the returned object is an OpenCommit which accepts write-operations. Upon exiting the context, the commit is closed and the OpenCommit becomes a ClosedCommit and no longer allowing write-operations.

Parameters

parent : pfs.Commit, optional
The parent commit of the new commit. parent may be empty in which case the commit that Branch points to will be used as the parent. If the branch does not exist, the commit will have no parent.
description : str, optional
A description of the commit.
branch : pfs.Branch, optional
The branch where the commit is created.

Yields

pfs.Commit
A protobuf object that represents a commit.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
>>>     c.delete_file(c, "/dir/delete_me.txt")
>>>     c.put_file_from_bytes(c, "/new_file.txt", b"DATA")
Expand source code
@contextmanager
def commit(
    self, *, parent: "Commit" = None, description: str = "", branch: "Branch" = None
) -> ContextManager["OpenCommit"]:
    """A context manager for running operations within a commit.

    When inside this context, the returned object is an OpenCommit which accepts
      write-operations. Upon exiting the context, the commit is closed and the
      OpenCommit becomes a ClosedCommit and no longer allowing write-operations.

    Parameters
    ----------
    parent : pfs.Commit, optional
        The parent commit of the new commit. parent may be empty in which case
        the commit that Branch points to will be used as the parent.
        If the branch does not exist, the commit will have no parent.
    description : str, optional
        A description of the commit.
    branch : pfs.Branch, optional
        The branch where the commit is created.

    Yields
    -------
    pfs.Commit
        A protobuf object that represents a commit.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
    >>>     c.delete_file(c, "/dir/delete_me.txt")
    >>>     c.put_file_from_bytes(c, "/new_file.txt", b"DATA")
    """
    commit = self.start_commit(parent=parent, description=description, branch=branch)
    commit_obj = OpenCommit(commit=commit, stub=self)
    try:
        yield commit_obj
    finally:
        commit_obj._close()
        self.finish_commit(commit=commit)
def wait_commit(self, commit: Commit) ‑> CommitInfo

Waits until the commit is finished being created.

Expand source code
def wait_commit(self, commit: "Commit") -> "CommitInfo":
    """Waits until the commit is finished being created."""
    return self.inspect_commit(commit=commit, wait=CommitState.FINISHED)
def wait_commit_set(self, commit_set: CommitSet) ‑> List[CommitInfo]

Similar to client.pfs.wait_commit but streams back the pfs.CommitInfo from all the downstream jobs that were initiated by this commit.

Expand source code
def wait_commit_set(self, commit_set: "CommitSet") -> List["CommitInfo"]:
    """Similar to client.pfs.wait_commit but streams back the pfs.CommitInfo
    from all the downstream jobs that were initiated by this commit.
    """
    return list(self.inspect_commit_set(commit_set=commit_set, wait=True))
def put_files(self, *, commit: Commit, source: Union[pathlib.Path, str], path: str) ‑> None

Recursively insert the contents of source into the open commit under path, matching the directory structure of source.

This is roughly equivalent to pachctl put file -r

Note: This method opens multiple gRPC streams and this appears to break in some REPL environment (such as those based in IDEs). If you encounter this problem, please try a different REPL environment or run as a script.

Parameters

commit : pfs.Commit
The open commit to add files to.
source : Union[Path, str]
The directory to recursively insert content from.
path : str
The destination path in PFS.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
>>>     client.pfs.put_files(commit=c, source="path/to/local/files", path="/")
Expand source code
@transaction_incompatible
def put_files(self, *, commit: "Commit", source: Union[Path, str], path: str) -> None:
    """Recursively insert the contents of source into the open commit under path,
    matching the directory structure of source.

    This is roughly equivalent to ``pachctl put file -r``

    Note: This method opens multiple gRPC streams and this appears to break in
      some REPL environment (such as those based in IDEs). If you encounter this
      problem, please try a different REPL environment or run as a script.

    Parameters
    ----------
    commit : pfs.Commit
        The open commit to add files to.
    source : Union[Path, str]
        The directory to recursively insert content from.
    path : str
        The destination path in PFS.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
    >>>     client.pfs.put_files(commit=c, source="path/to/local/files", path="/")
    """
    source = Path(source)
    if not source.exists():
        raise FileNotFoundError(f"source does not exist: {source}")
    if not source.is_dir():
        raise NotADirectoryError(f"source is not a directory: {source}")
    for root, _, filenames in os.walk(source):
        for filename in filenames:
            src = os.path.join(root, filename)
            dst = os.path.join(path, os.path.relpath(src, start=source))
            with open(src, "rb") as file:
                self.put_file_from_file(commit=commit, path=dst, file=file)
def put_file_from_bytes(self, *, commit: Commit, path: str, data: bytes, append: bool = False) ‑> betterproto.lib.google.protobuf.Empty

Uploads a PFS file from a bytestring.

Parameters

commit : pfs.Commit
An open commit to modify.
path : str
The path in the repo the data will be written to.
data : bytes
The file contents as bytes.
append : bool, optional
If true, appends the data to the file specified at path, if they already exist. Otherwise, overwrites them.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
>>>     client.pfs.put_file_from_bytes(
>>>         commit=c, path="/file.txt", data=b"SOME BYTES"
>>>     )
Expand source code
@transaction_incompatible
def put_file_from_bytes(
    self, *, commit: "Commit", path: str, data: bytes, append: bool = False
) -> Empty:
    """Uploads a PFS file from a bytestring.

    Parameters
    ----------
    commit : pfs.Commit
        An open commit to modify.
    path : str
        The path in the repo the data will be written to.
    data : bytes
        The file contents as bytes.
    append : bool, optional
        If true, appends the data to the file specified at `path`, if
        they already exist. Otherwise, overwrites them.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
    >>>     client.pfs.put_file_from_bytes(
    >>>         commit=c, path="/file.txt", data=b"SOME BYTES"
    >>>     )
    """
    return self.put_file_from_file(
        commit=commit, path=path, file=io.BytesIO(data), append=append
    )
def put_file_from_url(self, *, commit: Commit, path: str, url: str, recursive: bool = False) ‑> betterproto.lib.google.protobuf.Empty

Uploads a PFS file from an url.

Parameters

commit : pfs.Commit
An open commit to modify.
path : str
The path in the repo the data will be written to.
url : str
The URL of the file to put.
recursive : bool
If true, allows for recursive scraping on some types URLs, for example on s3:// URLs

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
>>>     client.pfs.put_file_from_url(
>>>         commit=c, path="/index.html", url="www.pachyderm.com/index.html"
>>>     )
Expand source code
@transaction_incompatible
def put_file_from_url(
    self,
    *,
    commit: "Commit",
    path: str,
    url: str,
    recursive: bool = False,
) -> Empty:
    """Uploads a PFS file from an url.

    Parameters
    ----------
    commit : pfs.Commit
        An open commit to modify.
    path : str
        The path in the repo the data will be written to.
    url : str
        The URL of the file to put.
    recursive : bool
        If true, allows for recursive scraping on some types URLs, for
        example on s3:// URLs

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
    >>>     client.pfs.put_file_from_url(
    >>>         commit=c, path="/index.html", url="www.pachyderm.com/index.html"
    >>>     )
    """
    operations = [
        ModifyFileRequest(set_commit=commit),
        ModifyFileRequest(delete_file=DeleteFile(path=path)),
        ModifyFileRequest(
            add_file=AddFile(
                path=path, url=AddFileUrlSource(url=url, recursive=recursive)
            )
        ),
    ]
    return self.modify_file(iter(operations))
def put_file_from_file(self, *, commit: Commit, path: str, file: SupportsRead[bytes], append: bool = False) ‑> betterproto.lib.google.protobuf.Empty

Uploads a PFS file from an open file object.

Parameters

commit : pfs.Commit
An open commit to modify.
path : str
The path in the repo the data will be written to.
file : SupportsRead[bytes]
An open file object to read the data from.
append : bool, optional
If true, appends the data to the file specified at path, if they already exist. Otherwise, overwrites them.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
>>>     with open("local_file.dat", "rb") as source:
>>>         client.pfs.put_file_from_file(
>>>             commit=c, path="/index.html", file=source
>>>         )
Expand source code
@transaction_incompatible
def put_file_from_file(
    self,
    *,
    commit: "Commit",
    path: str,
    file: "SupportsRead[bytes]",
    append: bool = False,
) -> Empty:
    """Uploads a PFS file from an open file object.

    Parameters
    ----------
    commit : pfs.Commit
        An open commit to modify.
    path : str
        The path in the repo the data will be written to.
    file : SupportsRead[bytes]
        An open file object to read the data from.
    append : bool, optional
        If true, appends the data to the file specified at `path`, if
        they already exist. Otherwise, overwrites them.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
    >>>     with open("local_file.dat", "rb") as source:
    >>>         client.pfs.put_file_from_file(
    >>>             commit=c, path="/index.html", file=source
    >>>         )
    """
    check = file.read(BUFFER_SIZE)
    if len(check) > 0:
        if not isinstance(check, bytes):
            raise TypeError("File must output bytes")

    def file_iterator():
        if not check:
            return
        yield check
        while True:
            data = file.read(BUFFER_SIZE)
            if len(data) == 0:
                return
            yield data

    def operations() -> Iterable[ModifyFileRequest]:
        yield ModifyFileRequest(set_commit=commit)
        if not append:
            yield ModifyFileRequest(delete_file=DeleteFile(path=path))
        yield ModifyFileRequest(add_file=AddFile(path=path, raw=b""))
        for data in file_iterator():
            yield ModifyFileRequest(add_file=AddFile(path=path, raw=data))

    return self.modify_file(operations())
def copy_file(self, *, commit: Commit, src: File, dst: str, append: bool = False) ‑> betterproto.lib.google.protobuf.Empty

Copies a file within PFS

Parameters

commit : pfs.Commit
An open commit to modify.
src : pfs.File
This file to be copied.
dst : str
The destination of the file, as a string path.
append : bool
If true, appends the contents of src to dst if it exists. Otherwise, overwrites the file.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> source = pfs.File.from_uri("images@master:/file.dat")
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
>>>     commit.pfs.copy_file(commit=c, src=source, dst="/copy.dat")
Expand source code
@transaction_incompatible
def copy_file(
    self, *, commit: "Commit", src: "File", dst: str, append: bool = False
) -> Empty:
    """Copies a file within PFS

    Parameters
    ----------
    commit : pfs.Commit
        An open commit to modify.
    src : pfs.File
        This file to be copied.
    dst : str
        The destination of the file, as a string path.
    append : bool
        If true, appends the contents of src to dst if it exists.
        Otherwise, overwrites the file.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> source = pfs.File.from_uri("images@master:/file.dat")
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
    >>>     commit.pfs.copy_file(commit=c, src=source, dst="/copy.dat")
    """
    operations = [
        ModifyFileRequest(set_commit=commit),
        ModifyFileRequest(copy_file=CopyFile(dst=dst, src=src, append=append)),
    ]
    return self.modify_file(iter(operations))
def delete_file(self, *, commit: Commit, path: str) ‑> betterproto.lib.google.protobuf.Empty

Copies a file within PFS

Parameters

commit : pfs.Commit
An open commit to modify.
path : str
The path of the file to be deleted.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
>>>     commit.pfs.delete_file(commit=c, path="/file.dat")
Expand source code
@transaction_incompatible
def delete_file(self, *, commit: "Commit", path: str) -> Empty:
    """Copies a file within PFS

    Parameters
    ----------
    commit : pfs.Commit
        An open commit to modify.
    path : str
        The path of the file to be deleted.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as c:
    >>>     commit.pfs.delete_file(commit=c, path="/file.dat")
    """
    operations = [
        ModifyFileRequest(set_commit=commit),
        ModifyFileRequest(delete_file=DeleteFile(path=path)),
    ]
    return self.modify_file(iter(operations))
def project_exists(self, project: Project) ‑> bool

Checks whether a project exists.

Parameters

project : pfs.Project
The project to check.

Returns

bool
Whether the project exists.
Expand source code
def project_exists(self, project: "Project") -> bool:
    """Checks whether a project exists.

    Parameters
    ----------
    project: pfs.Project
        The project to check.

    Returns
    -------
    bool
        Whether the project exists.
    """
    try:
        self.inspect_project(project=project)
        return True
    except grpc.RpcError as err:
        err: grpc.Call
        if err.code() == grpc.StatusCode.NOT_FOUND:
            return False
        raise err
def repo_exists(self, repo: Repo) ‑> bool

Checks whether a repo exists.

Parameters

repo : pfs.Repo
The repo to check.

Returns

bool
Whether the repo exists.
Expand source code
def repo_exists(self, repo: "Repo") -> bool:
    """Checks whether a repo exists.

    Parameters
    ----------
    repo: pfs.Repo
        The repo to check.

    Returns
    -------
    bool
        Whether the repo exists.
    """
    try:
        self.inspect_repo(repo=repo)
        return True
    except grpc.RpcError as err:
        err: grpc.Call
        if err.code() == grpc.StatusCode.NOT_FOUND:
            return False
        raise err
def branch_exists(self, branch: Branch) ‑> bool

Checks whether a branch exists.

Parameters

branch : pfs.Branch
The branch to check.

Returns

bool
Whether the branch exists.
Expand source code
def branch_exists(self, branch: "Branch") -> bool:
    """Checks whether a branch exists.

    Parameters
    ----------
    branch: pfs.Branch
        The branch to check.

    Returns
    -------
    bool
        Whether the branch exists.
    """
    try:
        self.inspect_branch(branch=branch)
        return True
    except grpc.RpcError as err:
        err: grpc.Call
        if err.code() == grpc.StatusCode.NOT_FOUND:
            return False
        raise err
def commit_exists(self, commit: Commit) ‑> bool

Checks whether a commit exists.

Parameters

commit : pfs.Commit
The commit to check.

Returns

bool
Whether the commit exists.
Expand source code
def commit_exists(self, commit: "Commit") -> bool:
    """Checks whether a commit exists.

    Parameters
    ----------
    commit: pfs.Commit
        The commit to check.

    Returns
    -------
    bool
        Whether the commit exists.
    """
    try:
        self.inspect_commit(commit=commit)
        return True
    except grpc.RpcError as err:
        err: grpc.Call
        if err.code() == grpc.StatusCode.NOT_FOUND:
            return False
        raise err
def path_exists(self, file: File) ‑> bool

Checks whether the path exists in the specified commit, agnostic to whether path is a file or a directory.

Parameters

file : pfs.File
The file (or directory) to check.

Raises

ValueError: If commit does not exist.

Returns

bool
True if the path exists.
Expand source code
def path_exists(self, file: "File") -> bool:
    """Checks whether the path exists in the specified commit, agnostic to
    whether `path` is a file or a directory.

    Parameters
    ----------
    file : pfs.File
        The file (or directory) to check.

    Raises
    ------
    ValueError: If commit does not exist.

    Returns
    -------
    bool
        True if the path exists.
    """
    try:
        self.inspect_commit(commit=file.commit)
    except grpc.RpcError as e:
        raise ValueError("commit does not exist") from e

    try:
        self.inspect_file(file=file)
        return True
    except grpc.RpcError as err:
        err: grpc.Call
        if err.code() == grpc.StatusCode.NOT_FOUND:
            return False
        raise err
def pfs_file(self, file: File) ‑> PFSFile

Wraps the response stream of a client.pfs.get_file() call with a PFSFile object. This wrapper class allows you to interact with the file stream as a normal file object.

Parameters

file : pfs.File
The file to retrieve.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> source = pfs.File.from_uri("images@master:/example.csv")
>>> with client.pfs.pfs_file(file=source) as pfs_file:
>>>     for line in pfs_file:
>>>         print(line)
Expand source code
def pfs_file(self, file: "File") -> "PFSFile":
    """Wraps the response stream of a client.pfs.get_file() call with a
    PFSFile object. This wrapper class allows you to interact with the
    file stream as a normal file object.

    Parameters
    ----------
    file : pfs.File
        The file to retrieve.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> source = pfs.File.from_uri("images@master:/example.csv")
    >>> with client.pfs.pfs_file(file=source) as pfs_file:
    >>>     for line in pfs_file:
    >>>         print(line)
    """
    stream = self.get_file(file=file)
    return PFSFile(stream)
def pfs_tar_file(self, file: File) ‑> PFSTarFile

Wraps the response stream of a client.pfs.get_tar_file() call with a PFSTarFile object. This wrapper class allows you to interact with the file stream as a standard tarfile.TarFile object.

Parameters

file : pfs.File
The file (or directory) to retrieve.
Expand source code
def pfs_tar_file(self, file: "File") -> "PFSTarFile":
    """Wraps the response stream of a client.pfs.get_tar_file() call with a
    PFSTarFile object. This wrapper class allows you to interact with the
    file stream as a standard tarfile.TarFile object.

    Parameters
    ----------
    file : pfs.File
        The file (or directory) to retrieve.
    """
    stream = self.get_file_tar(file=file)
    return PFSTarFile.open(fileobj=PFSFile(stream), mode="r|*")
class ClosedCommit (commit: Commit, stub: ApiStub)

A ClosedCommit is an extension of the pfs.Commit message with some helpful methods. Cannot write to a closed commit.

Internal Use: Do not create this object yourself.

Parameters

commit : pfs.Commit
The commit.
stub : pfs.ApiStub
The API class to route requests though.
Expand source code
class ClosedCommit(Commit):
    """A ClosedCommit is an extension of the pfs.Commit message with some
    helpful methods. Cannot write to a closed commit.
    """

    def __init__(self, commit: "Commit", stub: "ApiStub"):
        """Internal Use: Do not create this object yourself.

        Parameters
        ----------
        commit : pfs.Commit
            The commit.
        stub : pfs.ApiStub
            The API class to route requests though.
        """
        self._commit = commit
        self._stub = stub

        # This is required to maintain serialization capabilities while being
        #   future compatible with any new fields to the pfs.Commit message.
        super().__init__(
            **{field.name: getattr(commit, field.name) for field in fields(commit)}
        )

    def wait(self) -> "CommitInfo":
        """Waits until the commit is finished being created.

        See OpenCommit docstring for an example.
        """
        return self._stub.wait_commit(self)

    def wait_all(self) -> List["CommitInfo"]:
        """Similar to Commit.wait but streams back the pfs.CommitInfo
        from all the downstream jobs that were initiated by this commit.
        """
        return self._stub.wait_commit_set(CommitSet(id=self._commit.id))

Ancestors

  • Commit
  • betterproto.Message
  • abc.ABC

Subclasses

Methods

def wait(self) ‑> CommitInfo

Waits until the commit is finished being created.

See OpenCommit docstring for an example.

Expand source code
def wait(self) -> "CommitInfo":
    """Waits until the commit is finished being created.

    See OpenCommit docstring for an example.
    """
    return self._stub.wait_commit(self)
def wait_all(self) ‑> List[CommitInfo]

Similar to Commit.wait but streams back the pfs.CommitInfo from all the downstream jobs that were initiated by this commit.

Expand source code
def wait_all(self) -> List["CommitInfo"]:
    """Similar to Commit.wait but streams back the pfs.CommitInfo
    from all the downstream jobs that were initiated by this commit.
    """
    return self._stub.wait_commit_set(CommitSet(id=self._commit.id))

Inherited members

class OpenCommit (commit: Commit, stub: ApiStub)

An OpenCommit is an extension of the pfs.Commit message with some helpful methods that provide a more intuitive UX when writing to a commit.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("data@master")) as commit:
>>>     commit.put_file_from_bytes("/greeting.txt", b"Hello!")
>>>     commit.delete_file("/rude/insult.txt")
>>> commit.wait()

Internal Use: Do not create this object yourself.

Parameters

commit : pfs.Commit
The "open" commit to write to.
stub : pfs.ApiStub
The API class to route requests though.
Expand source code
class OpenCommit(ClosedCommit):
    """An OpenCommit is an extension of the pfs.Commit message with some
    helpful methods that provide a more intuitive UX when writing to a commit.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("data@master")) as commit:
    >>>     commit.put_file_from_bytes("/greeting.txt", b"Hello!")
    >>>     commit.delete_file("/rude/insult.txt")
    >>> commit.wait()
    """

    def __init__(self, commit: "Commit", stub: "ApiStub"):
        """Internal Use: Do not create this object yourself.

        Parameters
        ----------
        commit : pfs.Commit
            The "open" commit to write to.
        stub : pfs.ApiStub
            The API class to route requests though.
        """
        self._commit = commit
        self._stub = stub
        super().__init__(commit, stub)

    def _close(self):
        """Transform an OpenCommit into a ClosedCommit."""
        self.__class__ = ClosedCommit

    def put_files(self, *, source: Union[Path, str], path: str) -> None:
        """Recursively insert the contents of source into the open commit under path,
        matching the directory structure of source.

        This is roughly equivalent to ``pachctl put file -r``

        Parameters
        ----------
        source : Union[Path, str]
            The directory to recursively insert content from.
        path : str
            The destination path in PFS.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> branch = pfs.Branch.from_uri("images@master")
        >>> with client.pfs.commit(branch=branch) as commit:
        >>>     commit.put_files(source="path/to/local/files", path="/")
        """
        self._stub.put_files(commit=self._commit, source=source, path=path)

    def put_file_from_bytes(self, path: str, data: bytes, append: bool = False) -> "File":
        """Uploads a PFS file from a bytestring.

        Parameters
        ----------
        path : str
            The path in the repo the data will be written to.
        data : bytes
            The file contents as bytes.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the uploaded file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.put_file_from_bytes(path="/file.txt", data=b"SOME BYTES")
        """
        self._stub.put_file_from_bytes(commit=self, path=path, data=data, append=append)
        return File(commit=self._commit, path=path)

    def put_file_from_url(
        self,
        *,
        path: str,
        url: str,
        recursive: bool = False,
    ) -> "File":
        """Uploads a PFS file from an url.

        Parameters
        ----------
        path : str
            The path in the repo the data will be written to.
        url : str
            The URL of the file to put.
        recursive : bool
            If true, allows for recursive scraping on some types URLs, for
            example on s3:// URLs

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the uploaded file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.put_file_from_url(
        >>>         path="/index.html", url="https://www.pachyderm.com/index.html"
        >>>     )
        """
        self._stub.put_file_from_url(commit=self, path=path, url=url, recursive=recursive)
        return File(commit=self._commit, path=path)

    def put_file_from_file(
        self, *, path: str, file: "SupportsRead[bytes]", append: bool = False
    ) -> "File":
        """Uploads a PFS file from an open file object.

        Parameters
        ----------
        path : str
            The path in the repo the data will be written to.
        file : SupportsRead[bytes]
            An open file object to read the data from.
        append : bool, optional
            If true, appends the data to the file specified at `path`, if
            they already exist. Otherwise, overwrites them.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the uploaded file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     with open("local_file.dat", "rb") as source:
        >>>         commit.put_file_from_file(path="/index.html", file=source)
        """
        self._stub.put_file_from_file(commit=self, path=path, file=file, append=append)
        return File(commit=self._commit, path=path)

    def copy_file(self, *, src: "File", dst: str, append: bool = False) -> "File":
        """Copies a file within PFS

        Parameters
        ----------
        src : pfs.File
            This file to be copied.
        dst : str
            The destination of the file, as a string path.
        append : bool
            If true, appends the contents of src to dst if it exists.
            Otherwise, overwrites the file.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the new file.
        NOTE: The commit must be closed before you can read this file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> source = pfs.File.from_uri("images@master:/file.dat")
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.copy_file(src=source, dst="/copy.dat")
        """
        self._stub.copy_file(commit=self, src=src, dst=dst, append=append)
        return File(commit=self._commit, path=dst)

    def delete_file(self, *, path: str) -> "File":
        """Copies a file within PFS

        Parameters
        ----------
        path : str
            The path of the file to be deleted.

        Raises
        ------
        ValueError: If the commit is closed.

        Returns
        -------
        A pfs.File object that that points to the deleted file.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>> from pachyderm_sdk.api import pfs
        >>> client: Client
        >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
        >>>     commit.delete_file(path="/file.dat")
        """
        self._stub.delete_file(commit=self, path=path)
        return File(commit=self._commit, path=path)

Ancestors

Methods

def put_files(self, *, source: Union[pathlib.Path, str], path: str) ‑> None

Recursively insert the contents of source into the open commit under path, matching the directory structure of source.

This is roughly equivalent to pachctl put file -r

Parameters

source : Union[Path, str]
The directory to recursively insert content from.
path : str
The destination path in PFS.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> branch = pfs.Branch.from_uri("images@master")
>>> with client.pfs.commit(branch=branch) as commit:
>>>     commit.put_files(source="path/to/local/files", path="/")
Expand source code
def put_files(self, *, source: Union[Path, str], path: str) -> None:
    """Recursively insert the contents of source into the open commit under path,
    matching the directory structure of source.

    This is roughly equivalent to ``pachctl put file -r``

    Parameters
    ----------
    source : Union[Path, str]
        The directory to recursively insert content from.
    path : str
        The destination path in PFS.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> branch = pfs.Branch.from_uri("images@master")
    >>> with client.pfs.commit(branch=branch) as commit:
    >>>     commit.put_files(source="path/to/local/files", path="/")
    """
    self._stub.put_files(commit=self._commit, source=source, path=path)
def put_file_from_bytes(self, path: str, data: bytes, append: bool = False) ‑> File

Uploads a PFS file from a bytestring.

Parameters

path : str
The path in the repo the data will be written to.
data : bytes
The file contents as bytes.
append : bool, optional
If true, appends the data to the file specified at path, if they already exist. Otherwise, overwrites them.

Raises

ValueError: If the commit is closed.

Returns

A pfs.File object that that points to the uploaded file. NOTE: The commit must be closed before you can read this file.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
>>>     commit.put_file_from_bytes(path="/file.txt", data=b"SOME BYTES")
Expand source code
def put_file_from_bytes(self, path: str, data: bytes, append: bool = False) -> "File":
    """Uploads a PFS file from a bytestring.

    Parameters
    ----------
    path : str
        The path in the repo the data will be written to.
    data : bytes
        The file contents as bytes.
    append : bool, optional
        If true, appends the data to the file specified at `path`, if
        they already exist. Otherwise, overwrites them.

    Raises
    ------
    ValueError: If the commit is closed.

    Returns
    -------
    A pfs.File object that that points to the uploaded file.
    NOTE: The commit must be closed before you can read this file.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
    >>>     commit.put_file_from_bytes(path="/file.txt", data=b"SOME BYTES")
    """
    self._stub.put_file_from_bytes(commit=self, path=path, data=data, append=append)
    return File(commit=self._commit, path=path)
def put_file_from_url(self, *, path: str, url: str, recursive: bool = False) ‑> File

Uploads a PFS file from an url.

Parameters

path : str
The path in the repo the data will be written to.
url : str
The URL of the file to put.
recursive : bool
If true, allows for recursive scraping on some types URLs, for example on s3:// URLs

Raises

ValueError: If the commit is closed.

Returns

A pfs.File object that that points to the uploaded file. NOTE: The commit must be closed before you can read this file.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
>>>     commit.put_file_from_url(
>>>         path="/index.html", url="https://www.pachyderm.com/index.html"
>>>     )
Expand source code
def put_file_from_url(
    self,
    *,
    path: str,
    url: str,
    recursive: bool = False,
) -> "File":
    """Uploads a PFS file from an url.

    Parameters
    ----------
    path : str
        The path in the repo the data will be written to.
    url : str
        The URL of the file to put.
    recursive : bool
        If true, allows for recursive scraping on some types URLs, for
        example on s3:// URLs

    Raises
    ------
    ValueError: If the commit is closed.

    Returns
    -------
    A pfs.File object that that points to the uploaded file.
    NOTE: The commit must be closed before you can read this file.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
    >>>     commit.put_file_from_url(
    >>>         path="/index.html", url="https://www.pachyderm.com/index.html"
    >>>     )
    """
    self._stub.put_file_from_url(commit=self, path=path, url=url, recursive=recursive)
    return File(commit=self._commit, path=path)
def put_file_from_file(self, *, path: str, file: SupportsRead[bytes], append: bool = False) ‑> File

Uploads a PFS file from an open file object.

Parameters

path : str
The path in the repo the data will be written to.
file : SupportsRead[bytes]
An open file object to read the data from.
append : bool, optional
If true, appends the data to the file specified at path, if they already exist. Otherwise, overwrites them.

Raises

ValueError: If the commit is closed.

Returns

A pfs.File object that that points to the uploaded file. NOTE: The commit must be closed before you can read this file.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
>>>     with open("local_file.dat", "rb") as source:
>>>         commit.put_file_from_file(path="/index.html", file=source)
Expand source code
def put_file_from_file(
    self, *, path: str, file: "SupportsRead[bytes]", append: bool = False
) -> "File":
    """Uploads a PFS file from an open file object.

    Parameters
    ----------
    path : str
        The path in the repo the data will be written to.
    file : SupportsRead[bytes]
        An open file object to read the data from.
    append : bool, optional
        If true, appends the data to the file specified at `path`, if
        they already exist. Otherwise, overwrites them.

    Raises
    ------
    ValueError: If the commit is closed.

    Returns
    -------
    A pfs.File object that that points to the uploaded file.
    NOTE: The commit must be closed before you can read this file.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
    >>>     with open("local_file.dat", "rb") as source:
    >>>         commit.put_file_from_file(path="/index.html", file=source)
    """
    self._stub.put_file_from_file(commit=self, path=path, file=file, append=append)
    return File(commit=self._commit, path=path)
def copy_file(self, *, src: File, dst: str, append: bool = False) ‑> File

Copies a file within PFS

Parameters

src : pfs.File
This file to be copied.
dst : str
The destination of the file, as a string path.
append : bool
If true, appends the contents of src to dst if it exists. Otherwise, overwrites the file.

Raises

ValueError: If the commit is closed.

Returns

A pfs.File object that that points to the new file. NOTE: The commit must be closed before you can read this file.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> source = pfs.File.from_uri("images@master:/file.dat")
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
>>>     commit.copy_file(src=source, dst="/copy.dat")
Expand source code
def copy_file(self, *, src: "File", dst: str, append: bool = False) -> "File":
    """Copies a file within PFS

    Parameters
    ----------
    src : pfs.File
        This file to be copied.
    dst : str
        The destination of the file, as a string path.
    append : bool
        If true, appends the contents of src to dst if it exists.
        Otherwise, overwrites the file.

    Raises
    ------
    ValueError: If the commit is closed.

    Returns
    -------
    A pfs.File object that that points to the new file.
    NOTE: The commit must be closed before you can read this file.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> source = pfs.File.from_uri("images@master:/file.dat")
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
    >>>     commit.copy_file(src=source, dst="/copy.dat")
    """
    self._stub.copy_file(commit=self, src=src, dst=dst, append=append)
    return File(commit=self._commit, path=dst)
def delete_file(self, *, path: str) ‑> File

Copies a file within PFS

Parameters

path : str
The path of the file to be deleted.

Raises

ValueError: If the commit is closed.

Returns

A pfs.File object that that points to the deleted file.

Examples

>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
>>>     commit.delete_file(path="/file.dat")
Expand source code
def delete_file(self, *, path: str) -> "File":
    """Copies a file within PFS

    Parameters
    ----------
    path : str
        The path of the file to be deleted.

    Raises
    ------
    ValueError: If the commit is closed.

    Returns
    -------
    A pfs.File object that that points to the deleted file.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>> from pachyderm_sdk.api import pfs
    >>> client: Client
    >>> with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
    >>>     commit.delete_file(path="/file.dat")
    """
    self._stub.delete_file(commit=self, path=path)
    return File(commit=self._commit, path=path)

Inherited members