Package pachyderm_sdk
Expand source code
import importlib.metadata as metadata
__version__ = ""
try:
__version__ = metadata.version(__name__) # type: ignore
except (FileNotFoundError, ModuleNotFoundError):
pass
from .api.pfs import _additions as __pfs_additions
from .client import Client
from .datum_batching import batch_all_datums
__all__ = [
"Client",
"batch_all_datums",
]
Sub-modules
pachyderm_sdk.api
pachyderm_sdk.client
-
The Client used to interact with a Pachyderm instance.
pachyderm_sdk.config
-
Functionality for parsing Pachyderm config files.
pachyderm_sdk.constants
pachyderm_sdk.datum_batching
-
A high-level decorator for pipeline code that uses the datum-batching feature.
pachyderm_sdk.errors
-
Errors that can be raised by this library.
pachyderm_sdk.interceptor
-
Implementation of a gRPC interceptor used to set request metadata and catch connection errors.
Functions
def batch_all_datums(user_code: Callable[..., None]) ‑> Callable[..., None]
-
A decorator that will repeatedly call the wrapped function until all datums have been processed. Before calling the wrapped function, this decorator will call the NextDatum endpoint within the worker and set any environment variables specified by the worker.
Any exceptions raised during the execution of the wrapped function will be reported back to the worker. See the pachyderm documentation for more information on how the datum batching feature works.
Note: This can only be used within a Pachyderm worker.
Examples
>>> from pachyderm_sdk import batch_all_datums >>> >>> @batch_all_datums >>> def pipeline(): >>> # process datums >>> pass >>> >>> if __name__ == '__main__': >>> # Perform an expensive computation here before >>> # entering your datum processing function >>> # i.e. initializing a model. >>> pipeline()
Check the following link for a more substatial example: github.com/pachyderm/examples/tree/master/object-detection
Expand source code
def batch_all_datums(user_code: PIPELINE_FUNC) -> PIPELINE_FUNC: """A decorator that will repeatedly call the wrapped function until all datums have been processed. Before calling the wrapped function, this decorator will call the NextDatum endpoint within the worker and set any environment variables specified by the worker. Any exceptions raised during the execution of the wrapped function will be reported back to the worker. See the pachyderm documentation for more information on how the datum batching feature works. Note: This can only be used within a Pachyderm worker. Examples -------- >>> from pachyderm_sdk import batch_all_datums >>> >>> @batch_all_datums >>> def pipeline(): >>> # process datums >>> pass >>> >>> if __name__ == '__main__': >>> # Perform an expensive computation here before >>> # entering your datum processing function >>> # i.e. initializing a model. >>> pipeline() Check the following link for a more substatial example: github.com/pachyderm/examples/tree/master/object-detection """ @wraps(user_code) def wrapper(*args, **kwargs) -> None: worker = Client().worker while True: with worker.batch_datum(): user_code(*args, **kwargs) return wrapper
Classes
class Client (host: str = 'localhost', port: int = 30650, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False)
-
The Client used to interact with a Pachyderm instance.
Examples
Connect to a pachyderm instance using your local config file:
>>> from pachyderm_sdk import Client >>> client = Client.from_config()
Connect to a pachyderm instance using a URL/address:
>>> from pachyderm_sdk import Client >>> client = Client.from_pachd_address("test.work.com:30080")
Creates a Pachyderm client.
Parameters
host
:str
, optional- The pachd host. Default is 'localhost', which is used with
pachctl port-forward
. port
:int
, optional- The port to connect to. Default is 30650.
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
root_certs
:bytes
, optional- The PEM-encoded root certificates as byte string.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
tls
:bool
- Whether TLS should be used. If
root_certs
are specified, they are used. Otherwise, we use the certs provided by certifi.
Expand source code
class Client: """The Client used to interact with a Pachyderm instance. Examples -------- Connect to a pachyderm instance using your local config file: >>> from pachyderm_sdk import Client >>> client = Client.from_config() Connect to a pachyderm instance using a URL/address: >>> from pachyderm_sdk import Client >>> client = Client.from_pachd_address("test.work.com:30080") """ def __init__( self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False, ): """ Creates a Pachyderm client. Parameters ---------- host : str, optional The pachd host. Default is 'localhost', which is used with ``pachctl port-forward``. port : int, optional The port to connect to. Default is 30650. auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. transaction_id : str, optional The ID of the transaction to run operations on. tls : bool Whether TLS should be used. If `root_certs` are specified, they are used. Otherwise, we use the certs provided by certifi. """ host = host or DEFAULT_HOST port = port or DEFAULT_PORT if auth_token is None: auth_token = os.environ.get(AUTH_TOKEN_ENV) tls = tls or (root_certs is not None) if tls and root_certs is None: # load default certs if none are specified import certifi with open(certifi.where(), "rb") as f: root_certs = f.read() self.address = "{}:{}".format(host, port) self.root_certs = root_certs channel = _create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ) self._auth_token = auth_token self._transaction_id = transaction_id self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor(channel, self._metadata) # See implementation for api layout. self._init_api() # Worker stub is loaded when accessed through the worker property. self._worker = None if not auth_token and (oidc_token := os.environ.get(OIDC_TOKEN_ENV)): self.auth_token = self.auth.authenticate(id_token=oidc_token) def _init_api(self): self.admin = _AdminStub(self._channel) self.auth = _AuthStub(self._channel) self.debug = _DebugStub(self._channel) self.enterprise = _EnterpriseStub(self._channel) self.identity = _IdentityStub(self._channel) self.license = _LicenseStub(self._channel) self.pfs = _PfsStub( self._channel, get_transaction_id=lambda: self.transaction_id, ) self.pps = _PpsStub(self._channel) self.transaction = _TransactionStub( self._channel, get_transaction_id=lambda: self.transaction_id, set_transaction_id=lambda value: setattr(self, "transaction_id", value), ) self._version_api = _VersionStub(self._channel) self._worker: Optional[_WorkerStub] @classmethod def new_in_cluster( cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None ) -> "Client": """Creates a Pachyderm client that operates within a Pachyderm cluster. Parameters ---------- auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if CONFIG_PATH_SPOUT.exists(): # TODO: Should we notify the user that we are using spout config? return cls.from_config(CONFIG_PATH_SPOUT) host = os.environ.get(PACHD_SERVICE_HOST_ENV) if host is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_HOST_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) port = os.environ.get(PACHD_SERVICE_PORT_ENV) if port is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_PORT_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) return cls( host=host, port=int(port), auth_token=auth_token, transaction_id=transaction_id, ) @classmethod def from_pachd_address( cls, pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, ) -> "Client": """Creates a Pachyderm client from a given pachd address. Parameters ---------- pachd_address : str The address of pachd server auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if "://" not in pachd_address: pachd_address = "grpc://{}".format(pachd_address) u = urlparse(pachd_address) if u.scheme not in ("grpc", "http", "grpcs", "https"): raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme)) if u.path or u.params or u.query or u.fragment or u.username or u.password: raise ValueError("invalid pachd address") return cls( host=u.hostname, port=u.port, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, tls=u.scheme == "grpcs" or u.scheme == "https", ) @classmethod def from_config(cls, config_file: Union[Path, str] = CONFIG_PATH_LOCAL) -> "Client": """Creates a Pachyderm client from a config file. Parameters ---------- config_file : Union[Path, str] The path to a config json file. config_file defaults to the local config. Returns ------- Client A properly configured Client. """ config = ConfigFile(config_file) active_context = config.active_context client = cls.from_pachd_address( active_context.active_pachd_address, auth_token=active_context.session_token, root_certs=active_context.server_cas_decoded, transaction_id=active_context.active_transaction, ) return client @property def auth_token(self): """The authentication token. Used if authentication is enabled on the cluster.""" return self._auth_token @auth_token.setter def auth_token(self, value): self._auth_token = value self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._metadata, ) self._init_api() @property def transaction_id(self): """The ID of the transaction to run operations on.""" return self._transaction_id @transaction_id.setter def transaction_id(self, value): self._transaction_id = value self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._metadata, ) self._init_api() @property def worker(self) -> _WorkerStub: """Access the worker API stub. This is dynamically loaded in order to provide a helpful error message to the user if they try to interact the worker API from outside a worker. """ if self._worker is None: port = os.environ.get(WORKER_PORT_ENV) if port is None: raise ConnectionError( f"Cannot connect to the worker since {WORKER_PORT_ENV} is not set. " "Are you running inside a pipeline?" ) # Note: This channel does not go through the metadata interceptor. channel = _create_channel( address=f"localhost:{port}", root_certs=None, options=GRPC_CHANNEL_OPTIONS ) self._worker = _WorkerStub(channel) return self._worker def _build_metadata(self): metadata = [] if self._auth_token is not None: metadata.append(("authn-token", self._auth_token)) if self._transaction_id is not None: metadata.append(("pach-transaction", self._transaction_id)) return metadata def get_version(self) -> Version: """Requests version information from the pachd cluster.""" return self._version_api.get_version()
Static methods
def new_in_cluster(auth_token: Optional[str] = None, transaction_id: Optional[str] = None) ‑> Client
-
Creates a Pachyderm client that operates within a Pachyderm cluster.
Parameters
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
Returns
Client
- A python_pachyderm client instance.
Expand source code
@classmethod def new_in_cluster( cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None ) -> "Client": """Creates a Pachyderm client that operates within a Pachyderm cluster. Parameters ---------- auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if CONFIG_PATH_SPOUT.exists(): # TODO: Should we notify the user that we are using spout config? return cls.from_config(CONFIG_PATH_SPOUT) host = os.environ.get(PACHD_SERVICE_HOST_ENV) if host is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_HOST_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) port = os.environ.get(PACHD_SERVICE_PORT_ENV) if port is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_PORT_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) return cls( host=host, port=int(port), auth_token=auth_token, transaction_id=transaction_id, )
def from_pachd_address(pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None) ‑> Client
-
Creates a Pachyderm client from a given pachd address.
Parameters
pachd_address
:str
- The address of pachd server
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
root_certs
:bytes
, optional- The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
Returns
Client
- A python_pachyderm client instance.
Expand source code
@classmethod def from_pachd_address( cls, pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, ) -> "Client": """Creates a Pachyderm client from a given pachd address. Parameters ---------- pachd_address : str The address of pachd server auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if "://" not in pachd_address: pachd_address = "grpc://{}".format(pachd_address) u = urlparse(pachd_address) if u.scheme not in ("grpc", "http", "grpcs", "https"): raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme)) if u.path or u.params or u.query or u.fragment or u.username or u.password: raise ValueError("invalid pachd address") return cls( host=u.hostname, port=u.port, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, tls=u.scheme == "grpcs" or u.scheme == "https", )
def from_config(config_file: Union[pathlib.Path, str] = PosixPath('~/.pachyderm/config.json')) ‑> Client
-
Creates a Pachyderm client from a config file.
Parameters
config_file
:Union[Path, str]
- The path to a config json file. config_file defaults to the local config.
Returns
Client
- A properly configured Client.
Expand source code
@classmethod def from_config(cls, config_file: Union[Path, str] = CONFIG_PATH_LOCAL) -> "Client": """Creates a Pachyderm client from a config file. Parameters ---------- config_file : Union[Path, str] The path to a config json file. config_file defaults to the local config. Returns ------- Client A properly configured Client. """ config = ConfigFile(config_file) active_context = config.active_context client = cls.from_pachd_address( active_context.active_pachd_address, auth_token=active_context.session_token, root_certs=active_context.server_cas_decoded, transaction_id=active_context.active_transaction, ) return client
Instance variables
var auth_token
-
The authentication token. Used if authentication is enabled on the cluster.
Expand source code
@property def auth_token(self): """The authentication token. Used if authentication is enabled on the cluster.""" return self._auth_token
var transaction_id
-
The ID of the transaction to run operations on.
Expand source code
@property def transaction_id(self): """The ID of the transaction to run operations on.""" return self._transaction_id
var worker : WorkerStub
-
Access the worker API stub.
This is dynamically loaded in order to provide a helpful error message to the user if they try to interact the worker API from outside a worker.
Expand source code
@property def worker(self) -> _WorkerStub: """Access the worker API stub. This is dynamically loaded in order to provide a helpful error message to the user if they try to interact the worker API from outside a worker. """ if self._worker is None: port = os.environ.get(WORKER_PORT_ENV) if port is None: raise ConnectionError( f"Cannot connect to the worker since {WORKER_PORT_ENV} is not set. " "Are you running inside a pipeline?" ) # Note: This channel does not go through the metadata interceptor. channel = _create_channel( address=f"localhost:{port}", root_certs=None, options=GRPC_CHANNEL_OPTIONS ) self._worker = _WorkerStub(channel) return self._worker
Methods
def get_version(self) ‑> Version
-
Requests version information from the pachd cluster.
Expand source code
def get_version(self) -> Version: """Requests version information from the pachd cluster.""" return self._version_api.get_version()