Module pachyderm_sdk.api.pps.extension
Handwritten classes/methods that augment the existing PPS API.
Expand source code
"""Handwritten classes/methods that augment the existing PPS API."""
import base64
import json
from typing import Dict
import grpc
from betterproto.lib.google.protobuf import Empty
from . import ApiStub as _GeneratedApiStub
from . import (
Job,
Pipeline,
PipelineInfo,
)
class ApiStub(_GeneratedApiStub):
def inspect_pipeline(
self,
*,
pipeline: "Pipeline" = None,
details: bool = False,
history: int = 0,
) -> "PipelineInfo":
"""Inspects a pipeline.
Parameters
----------
pipeline : pps.Pipeline
The pipeline to inspect.
details : bool, optional
If true, return pipeline details.
history : int, optional
Indicates to return historical versions of `pipeline_name`.
Semantics are:
- 0: Return current version of `pipeline_name`
- 1: Return the above and `pipeline_name` from the next most recent version.
- 2: etc.
- -1: Return all historical versions of `pipeline_name`.
Returns
-------
pps.PipelineInfo
Examples
--------
>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pps
>>> client: Client
>>> pipeline_info = client.pps.inspect_pipeline(
>>> pipeline=pps.Pipeline(name="foo")
>>> )
"""
if history:
response = self.list_pipeline(
pipeline=pipeline, history=history, details=details
)
try:
return next(response)
except StopIteration:
raise ValueError("invalid pipeline")
return super().inspect_pipeline(pipeline=pipeline, details=details)
def pipeline_exists(self, pipeline: "Pipeline") -> bool:
"""Checks whether a pipeline exists.
Parameters
----------
pipeline: pps.Pipeline
The pipeline to check.
Returns
-------
bool
Whether the pipeline exists.
"""
try:
super().inspect_pipeline(pipeline=pipeline)
return True
except grpc.RpcError as err:
err: grpc.Call
if err.code() == grpc.StatusCode.NOT_FOUND:
return False
raise err
def job_exists(self, job: "Job") -> bool:
"""Checks whether a job exists.
Parameters
----------
job: pps.Job
The job to check.
Returns
-------
bool
Whether the job exists.
"""
try:
super().inspect_job(job=job)
return True
except grpc.RpcError as err:
err: grpc.Call
if err.code() == grpc.StatusCode.NOT_FOUND:
return False
raise err
# noinspection PyMethodOverriding
def create_secret(
self,
*,
name: str,
data: Dict,
labels: Dict[str, str] = None,
annotations: Dict[str, str] = None,
) -> Empty:
"""Creates a new secret.
Parameters
----------
name : str
The name of the secret.
data : Dict[str, Union[str, bytes]]
The data to store in the secret. Each key must consist of
alphanumeric characters ``-``, ``_`` or ``.``.
labels : Dict[str, str], optional
Kubernetes labels to attach to the secret.
annotations : Dict[str, str], optional
Kubernetes annotations to attach to the secret.
"""
encoded_data = {}
for k, v in data.items():
if isinstance(v, str):
v = v.encode("utf8")
encoded_data[k] = base64.b64encode(v).decode("utf8")
file = json.dumps(
{
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": name,
"labels": labels,
"annotations": annotations,
},
"data": encoded_data,
}
).encode()
return super().create_secret(file=file)
Classes
class ApiStub (channel: grpc.Channel)
-
Expand source code
class ApiStub(_GeneratedApiStub): def inspect_pipeline( self, *, pipeline: "Pipeline" = None, details: bool = False, history: int = 0, ) -> "PipelineInfo": """Inspects a pipeline. Parameters ---------- pipeline : pps.Pipeline The pipeline to inspect. details : bool, optional If true, return pipeline details. history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. Returns ------- pps.PipelineInfo Examples -------- >>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pps >>> client: Client >>> pipeline_info = client.pps.inspect_pipeline( >>> pipeline=pps.Pipeline(name="foo") >>> ) """ if history: response = self.list_pipeline( pipeline=pipeline, history=history, details=details ) try: return next(response) except StopIteration: raise ValueError("invalid pipeline") return super().inspect_pipeline(pipeline=pipeline, details=details) def pipeline_exists(self, pipeline: "Pipeline") -> bool: """Checks whether a pipeline exists. Parameters ---------- pipeline: pps.Pipeline The pipeline to check. Returns ------- bool Whether the pipeline exists. """ try: super().inspect_pipeline(pipeline=pipeline) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err def job_exists(self, job: "Job") -> bool: """Checks whether a job exists. Parameters ---------- job: pps.Job The job to check. Returns ------- bool Whether the job exists. """ try: super().inspect_job(job=job) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err # noinspection PyMethodOverriding def create_secret( self, *, name: str, data: Dict, labels: Dict[str, str] = None, annotations: Dict[str, str] = None, ) -> Empty: """Creates a new secret. Parameters ---------- name : str The name of the secret. data : Dict[str, Union[str, bytes]] The data to store in the secret. Each key must consist of alphanumeric characters ``-``, ``_`` or ``.``. labels : Dict[str, str], optional Kubernetes labels to attach to the secret. annotations : Dict[str, str], optional Kubernetes annotations to attach to the secret. """ encoded_data = {} for k, v in data.items(): if isinstance(v, str): v = v.encode("utf8") encoded_data[k] = base64.b64encode(v).decode("utf8") file = json.dumps( { "kind": "Secret", "apiVersion": "v1", "metadata": { "name": name, "labels": labels, "annotations": annotations, }, "data": encoded_data, } ).encode() return super().create_secret(file=file)
Ancestors
Methods
def inspect_pipeline(self, *, pipeline: Pipeline = None, details: bool = False, history: int = 0) ‑> PipelineInfo
-
Inspects a pipeline.
Parameters
pipeline
:pps.Pipeline
- The pipeline to inspect.
details
:bool
, optional- If true, return pipeline details.
history
:int
, optional-
Indicates to return historical versions of
pipeline_name
. Semantics are:- 0: Return current version of
pipeline_name
- 1: Return the above and
pipeline_name
from the next most recent version. - 2: etc.
- -1: Return all historical versions of
pipeline_name
.
- 0: Return current version of
Returns
pps.PipelineInfo
Examples
>>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pps >>> client: Client >>> pipeline_info = client.pps.inspect_pipeline( >>> pipeline=pps.Pipeline(name="foo") >>> )
Expand source code
def inspect_pipeline( self, *, pipeline: "Pipeline" = None, details: bool = False, history: int = 0, ) -> "PipelineInfo": """Inspects a pipeline. Parameters ---------- pipeline : pps.Pipeline The pipeline to inspect. details : bool, optional If true, return pipeline details. history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. Returns ------- pps.PipelineInfo Examples -------- >>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pps >>> client: Client >>> pipeline_info = client.pps.inspect_pipeline( >>> pipeline=pps.Pipeline(name="foo") >>> ) """ if history: response = self.list_pipeline( pipeline=pipeline, history=history, details=details ) try: return next(response) except StopIteration: raise ValueError("invalid pipeline") return super().inspect_pipeline(pipeline=pipeline, details=details)
def pipeline_exists(self, pipeline: Pipeline) ‑> bool
-
Checks whether a pipeline exists.
Parameters
pipeline
:pps.Pipeline
- The pipeline to check.
Returns
bool
- Whether the pipeline exists.
Expand source code
def pipeline_exists(self, pipeline: "Pipeline") -> bool: """Checks whether a pipeline exists. Parameters ---------- pipeline: pps.Pipeline The pipeline to check. Returns ------- bool Whether the pipeline exists. """ try: super().inspect_pipeline(pipeline=pipeline) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err
def job_exists(self, job: Job) ‑> bool
-
Checks whether a job exists.
Parameters
job
:pps.Job
- The job to check.
Returns
bool
- Whether the job exists.
Expand source code
def job_exists(self, job: "Job") -> bool: """Checks whether a job exists. Parameters ---------- job: pps.Job The job to check. Returns ------- bool Whether the job exists. """ try: super().inspect_job(job=job) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err
def create_secret(self, *, name: str, data: Dict, labels: Dict[str, str] = None, annotations: Dict[str, str] = None) ‑> betterproto.lib.google.protobuf.Empty
-
Creates a new secret.
Parameters
name
:str
- The name of the secret.
data
:Dict[str, Union[str, bytes]]
- The data to store in the secret. Each key must consist of
alphanumeric characters
-
,_
or.
. labels
:Dict[str, str]
, optional- Kubernetes labels to attach to the secret.
annotations
:Dict[str, str]
, optional- Kubernetes annotations to attach to the secret.
Expand source code
def create_secret( self, *, name: str, data: Dict, labels: Dict[str, str] = None, annotations: Dict[str, str] = None, ) -> Empty: """Creates a new secret. Parameters ---------- name : str The name of the secret. data : Dict[str, Union[str, bytes]] The data to store in the secret. Each key must consist of alphanumeric characters ``-``, ``_`` or ``.``. labels : Dict[str, str], optional Kubernetes labels to attach to the secret. annotations : Dict[str, str], optional Kubernetes annotations to attach to the secret. """ encoded_data = {} for k, v in data.items(): if isinstance(v, str): v = v.encode("utf8") encoded_data[k] = base64.b64encode(v).decode("utf8") file = json.dumps( { "kind": "Secret", "apiVersion": "v1", "metadata": { "name": name, "labels": labels, "annotations": annotations, }, "data": encoded_data, } ).encode() return super().create_secret(file=file)