Module pachyderm_sdk.config
Functionality for parsing Pachyderm config files.
Expand source code
"""Functionality for parsing Pachyderm config files."""
import json
import os
from base64 import b64decode
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Optional, Union
from .errors import ConfigError
class ConfigFile:
"""A parsed Pachyderm config file."""
def __init__(self, config_file: Union[Path, str]):
"""
Parameters
----------
config_file : str
The path to the config file.
"""
config_file = Path(os.path.expanduser(config_file)).resolve()
self._config_file_data = json.loads(config_file.read_bytes())
@property
def user_id(self) -> str:
"""The user ID of the config file."""
return self._config_file_data["user_id"]
@property
def active_context(self) -> "Context":
"""The currently-active context."""
active_context_name = self._config_file_data["v2"]["active_context"]
contexts = self._config_file_data["v2"]["contexts"]
if active_context_name not in contexts:
raise ConfigError(f"active context not found: {active_context_name}")
return Context(**contexts[active_context_name])
@property
def active_enterprise_context(self) -> "Context":
"""The currently-active context that is enterprise-enabled."""
context_name = self._config_file_data["v2"].get("active_enterprise_context")
if context_name is None:
raise ConfigError("active enterprise context is not specified")
contexts = self._config_file_data["v2"]["contexts"]
if context_name not in contexts:
raise ConfigError(f"active enterprise context not found: {context_name}")
return Context(**contexts[context_name])
@dataclass
class Context:
"""A context contains all the information needed to connect to a pachyderm
instance. Not all fields need to be present for the context to be valid."""
source: Optional[int] = None
"""An integer that specifies where the config came from.
This parameter is for internal use only and should not be modified."""
pachd_address: Optional[str] = None
"""A host:port specification for connecting to pachd."""
server_cas: Optional[str] = None
"""Trusted root certificates for the cluster, formatted as a
base64-encoded PEM. This is only set when TLS is enabled."""
session_token: Optional[str] = None
"""A secret token identifying the current user within their pachyderm
cluster. This is included in all RPCs and used to determine if a user's
actions are authorized. This is only set when auth is enabled."""
active_transaction: Optional[str] = None
"""The currently active transaction for batching together commands."""
cluster_name: Optional[str] = None
"""The name of the underlying Kubernetes cluster."""
auth_info: Optional[str] = None
"""The name of the underlying Kubernetes cluster’s auth credentials"""
namespace: Optional[str] = None
"""The underlying Kubernetes cluster’s namespace"""
cluster_deployment_id: Optional[str] = None
"""The pachyderm cluster deployment ID that is used to ensure the
operations run on the expected cluster."""
project: Optional[str] = None
enterprise_server: bool = False
"""Whether the context represents an enterprise server."""
port_forwarders: Dict[str, int] = None
"""A mapping of service name -> local port."""
@property
def active_pachd_address(self) -> str:
"""This pachd address factors in port-forwarding."""
if self.pachd_address is None:
port = 30650
if self.port_forwarders:
port = self.port_forwarders.get("pachd", 30650)
return f"grpc://localhost:{port}"
return self.pachd_address
@property
def server_cas_decoded(self) -> Optional[bytes]:
"""The base64 decoded root certificates in PEM format, if they exist."""
if self.server_cas:
return b64decode(bytes(self.server_cas, "utf-8"))
Classes
class ConfigFile (config_file: Union[pathlib.Path, str])
-
A parsed Pachyderm config file.
Parameters
config_file
:str
- The path to the config file.
Expand source code
class ConfigFile: """A parsed Pachyderm config file.""" def __init__(self, config_file: Union[Path, str]): """ Parameters ---------- config_file : str The path to the config file. """ config_file = Path(os.path.expanduser(config_file)).resolve() self._config_file_data = json.loads(config_file.read_bytes()) @property def user_id(self) -> str: """The user ID of the config file.""" return self._config_file_data["user_id"] @property def active_context(self) -> "Context": """The currently-active context.""" active_context_name = self._config_file_data["v2"]["active_context"] contexts = self._config_file_data["v2"]["contexts"] if active_context_name not in contexts: raise ConfigError(f"active context not found: {active_context_name}") return Context(**contexts[active_context_name]) @property def active_enterprise_context(self) -> "Context": """The currently-active context that is enterprise-enabled.""" context_name = self._config_file_data["v2"].get("active_enterprise_context") if context_name is None: raise ConfigError("active enterprise context is not specified") contexts = self._config_file_data["v2"]["contexts"] if context_name not in contexts: raise ConfigError(f"active enterprise context not found: {context_name}") return Context(**contexts[context_name])
Instance variables
var user_id : str
-
The user ID of the config file.
Expand source code
@property def user_id(self) -> str: """The user ID of the config file.""" return self._config_file_data["user_id"]
var active_context : Context
-
The currently-active context.
Expand source code
@property def active_context(self) -> "Context": """The currently-active context.""" active_context_name = self._config_file_data["v2"]["active_context"] contexts = self._config_file_data["v2"]["contexts"] if active_context_name not in contexts: raise ConfigError(f"active context not found: {active_context_name}") return Context(**contexts[active_context_name])
var active_enterprise_context : Context
-
The currently-active context that is enterprise-enabled.
Expand source code
@property def active_enterprise_context(self) -> "Context": """The currently-active context that is enterprise-enabled.""" context_name = self._config_file_data["v2"].get("active_enterprise_context") if context_name is None: raise ConfigError("active enterprise context is not specified") contexts = self._config_file_data["v2"]["contexts"] if context_name not in contexts: raise ConfigError(f"active enterprise context not found: {context_name}") return Context(**contexts[context_name])
class Context (source: Optional[int] = None, pachd_address: Optional[str] = None, server_cas: Optional[str] = None, session_token: Optional[str] = None, active_transaction: Optional[str] = None, cluster_name: Optional[str] = None, auth_info: Optional[str] = None, namespace: Optional[str] = None, cluster_deployment_id: Optional[str] = None, project: Optional[str] = None, enterprise_server: bool = False, port_forwarders: Dict[str, int] = None)
-
A context contains all the information needed to connect to a pachyderm instance. Not all fields need to be present for the context to be valid.
Expand source code
@dataclass class Context: """A context contains all the information needed to connect to a pachyderm instance. Not all fields need to be present for the context to be valid.""" source: Optional[int] = None """An integer that specifies where the config came from. This parameter is for internal use only and should not be modified.""" pachd_address: Optional[str] = None """A host:port specification for connecting to pachd.""" server_cas: Optional[str] = None """Trusted root certificates for the cluster, formatted as a base64-encoded PEM. This is only set when TLS is enabled.""" session_token: Optional[str] = None """A secret token identifying the current user within their pachyderm cluster. This is included in all RPCs and used to determine if a user's actions are authorized. This is only set when auth is enabled.""" active_transaction: Optional[str] = None """The currently active transaction for batching together commands.""" cluster_name: Optional[str] = None """The name of the underlying Kubernetes cluster.""" auth_info: Optional[str] = None """The name of the underlying Kubernetes cluster’s auth credentials""" namespace: Optional[str] = None """The underlying Kubernetes cluster’s namespace""" cluster_deployment_id: Optional[str] = None """The pachyderm cluster deployment ID that is used to ensure the operations run on the expected cluster.""" project: Optional[str] = None enterprise_server: bool = False """Whether the context represents an enterprise server.""" port_forwarders: Dict[str, int] = None """A mapping of service name -> local port.""" @property def active_pachd_address(self) -> str: """This pachd address factors in port-forwarding.""" if self.pachd_address is None: port = 30650 if self.port_forwarders: port = self.port_forwarders.get("pachd", 30650) return f"grpc://localhost:{port}" return self.pachd_address @property def server_cas_decoded(self) -> Optional[bytes]: """The base64 decoded root certificates in PEM format, if they exist.""" if self.server_cas: return b64decode(bytes(self.server_cas, "utf-8"))
Class variables
var source : Optional[int]
-
An integer that specifies where the config came from. This parameter is for internal use only and should not be modified.
var pachd_address : Optional[str]
-
A host:port specification for connecting to pachd.
var server_cas : Optional[str]
-
Trusted root certificates for the cluster, formatted as a base64-encoded PEM. This is only set when TLS is enabled.
var session_token : Optional[str]
-
A secret token identifying the current user within their pachyderm cluster. This is included in all RPCs and used to determine if a user's actions are authorized. This is only set when auth is enabled.
var active_transaction : Optional[str]
-
The currently active transaction for batching together commands.
var cluster_name : Optional[str]
-
The name of the underlying Kubernetes cluster.
var auth_info : Optional[str]
-
The name of the underlying Kubernetes cluster’s auth credentials
var namespace : Optional[str]
-
The underlying Kubernetes cluster’s namespace
var cluster_deployment_id : Optional[str]
-
The pachyderm cluster deployment ID that is used to ensure the operations run on the expected cluster.
var project : Optional[str]
var enterprise_server : bool
-
Whether the context represents an enterprise server.
var port_forwarders : Dict[str, int]
-
A mapping of service name -> local port.
Instance variables
var active_pachd_address : str
-
This pachd address factors in port-forwarding.
Expand source code
@property def active_pachd_address(self) -> str: """This pachd address factors in port-forwarding.""" if self.pachd_address is None: port = 30650 if self.port_forwarders: port = self.port_forwarders.get("pachd", 30650) return f"grpc://localhost:{port}" return self.pachd_address
var server_cas_decoded : Optional[bytes]
-
The base64 decoded root certificates in PEM format, if they exist.
Expand source code
@property def server_cas_decoded(self) -> Optional[bytes]: """The base64 decoded root certificates in PEM format, if they exist.""" if self.server_cas: return b64decode(bytes(self.server_cas, "utf-8"))