Module pachyderm_sdk.api.transaction.extension
Handwritten classes/methods that augment the existing Transaction API.
Expand source code
"""Handwritten classes/methods that augment the existing Transaction API."""
from contextlib import contextmanager
from typing import Callable, ContextManager
import grpc
from . import ApiStub as _GeneratedApiStub
from . import (
Transaction,
TransactionInfo,
)
class ApiStub(_GeneratedApiStub):
def __init__(
self,
channel: grpc.Channel,
*,
get_transaction_id: Callable[[], str],
set_transaction_id: Callable[[str], None],
):
self._get_transaction_id = get_transaction_id
self._set_transaction_id = set_transaction_id
super().__init__(channel=channel)
def start_transaction(self) -> "Transaction":
"""Starts a transaction and sets the transaction ID within the client.
This will make all subsequent resource operations performed by the client
occur within the returned transaction, until the transaction is finished.
"""
response = super().start_transaction()
self._set_transaction_id(response.id)
return response
def finish_transaction(self, *, transaction: "Transaction" = None) -> "TransactionInfo":
"""Finishes a transaction and removes the transaction ID within the client."""
response = super().finish_transaction(transaction=transaction)
self._set_transaction_id("")
return response
@contextmanager
def transaction(self) -> ContextManager[Transaction]:
"""A context manager for running operations within a transaction. When
the context manager completes, the transaction will be deleted if an
error occurred, or otherwise finished.
Yields
-------
transaction.Transaction
A protobuf object that represents a transaction.
Examples
--------
If a pipeline has two input repos, `foo` and `bar`, a transaction is
useful for adding data to both atomically before the pipeline runs
even once.
>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pfs
>>> client: Client
>>> with client.transaction.transaction() as txn:
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("foo@master")) as c1:
>>> c1.put_file_from_bytes("/joint_data.txt", b"DATA1")
>>> with client.pfs.commit(branch=pfs.Branch.from_uri("bar@master")) as c2:
>>> c2.put_file_from_bytes("/joint_data.txt", b"DATA2")
>>> c1.wait()
>>> c2.wait()
"""
old_transaction_id = self._get_transaction_id()
transaction = super().start_transaction()
self._set_transaction_id(transaction.id)
try:
yield transaction
except Exception:
super().delete_transaction(transaction=transaction)
raise
else:
super().finish_transaction(transaction=transaction)
finally:
self._set_transaction_id(old_transaction_id)
def transaction_exists(self, transaction: "Transaction") -> bool:
"""Checks whether a transaction exists.
Parameters
----------
transaction: transaction.Transaction
The transaction to check.
Returns
-------
bool
Whether the transaction exists.
"""
try:
super().inspect_transaction(transaction=transaction)
return True
except grpc.RpcError as err:
err: grpc.Call
if err.code() == grpc.StatusCode.NOT_FOUND:
return False
raise err
Classes
class ApiStub (channel: grpc.Channel, *, get_transaction_id: Callable[[], str], set_transaction_id: Callable[[str], None])
-
Expand source code
class ApiStub(_GeneratedApiStub): def __init__( self, channel: grpc.Channel, *, get_transaction_id: Callable[[], str], set_transaction_id: Callable[[str], None], ): self._get_transaction_id = get_transaction_id self._set_transaction_id = set_transaction_id super().__init__(channel=channel) def start_transaction(self) -> "Transaction": """Starts a transaction and sets the transaction ID within the client. This will make all subsequent resource operations performed by the client occur within the returned transaction, until the transaction is finished. """ response = super().start_transaction() self._set_transaction_id(response.id) return response def finish_transaction(self, *, transaction: "Transaction" = None) -> "TransactionInfo": """Finishes a transaction and removes the transaction ID within the client.""" response = super().finish_transaction(transaction=transaction) self._set_transaction_id("") return response @contextmanager def transaction(self) -> ContextManager[Transaction]: """A context manager for running operations within a transaction. When the context manager completes, the transaction will be deleted if an error occurred, or otherwise finished. Yields ------- transaction.Transaction A protobuf object that represents a transaction. Examples -------- If a pipeline has two input repos, `foo` and `bar`, a transaction is useful for adding data to both atomically before the pipeline runs even once. >>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pfs >>> client: Client >>> with client.transaction.transaction() as txn: >>> with client.pfs.commit(branch=pfs.Branch.from_uri("foo@master")) as c1: >>> c1.put_file_from_bytes("/joint_data.txt", b"DATA1") >>> with client.pfs.commit(branch=pfs.Branch.from_uri("bar@master")) as c2: >>> c2.put_file_from_bytes("/joint_data.txt", b"DATA2") >>> c1.wait() >>> c2.wait() """ old_transaction_id = self._get_transaction_id() transaction = super().start_transaction() self._set_transaction_id(transaction.id) try: yield transaction except Exception: super().delete_transaction(transaction=transaction) raise else: super().finish_transaction(transaction=transaction) finally: self._set_transaction_id(old_transaction_id) def transaction_exists(self, transaction: "Transaction") -> bool: """Checks whether a transaction exists. Parameters ---------- transaction: transaction.Transaction The transaction to check. Returns ------- bool Whether the transaction exists. """ try: super().inspect_transaction(transaction=transaction) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err
Ancestors
Methods
def start_transaction(self) ‑> Transaction
-
Starts a transaction and sets the transaction ID within the client. This will make all subsequent resource operations performed by the client occur within the returned transaction, until the transaction is finished.
Expand source code
def start_transaction(self) -> "Transaction": """Starts a transaction and sets the transaction ID within the client. This will make all subsequent resource operations performed by the client occur within the returned transaction, until the transaction is finished. """ response = super().start_transaction() self._set_transaction_id(response.id) return response
def finish_transaction(self, *, transaction: Transaction = None) ‑> TransactionInfo
-
Finishes a transaction and removes the transaction ID within the client.
Expand source code
def finish_transaction(self, *, transaction: "Transaction" = None) -> "TransactionInfo": """Finishes a transaction and removes the transaction ID within the client.""" response = super().finish_transaction(transaction=transaction) self._set_transaction_id("") return response
def transaction(self) ‑> ContextManager[Transaction]
-
A context manager for running operations within a transaction. When the context manager completes, the transaction will be deleted if an error occurred, or otherwise finished.
Yields
transaction.Transaction
- A protobuf object that represents a transaction.
Examples
If a pipeline has two input repos,
foo
andbar
, a transaction is useful for adding data to both atomically before the pipeline runs even once.>>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pfs >>> client: Client >>> with client.transaction.transaction() as txn: >>> with client.pfs.commit(branch=pfs.Branch.from_uri("foo@master")) as c1: >>> c1.put_file_from_bytes("/joint_data.txt", b"DATA1") >>> with client.pfs.commit(branch=pfs.Branch.from_uri("bar@master")) as c2: >>> c2.put_file_from_bytes("/joint_data.txt", b"DATA2") >>> c1.wait() >>> c2.wait()
Expand source code
@contextmanager def transaction(self) -> ContextManager[Transaction]: """A context manager for running operations within a transaction. When the context manager completes, the transaction will be deleted if an error occurred, or otherwise finished. Yields ------- transaction.Transaction A protobuf object that represents a transaction. Examples -------- If a pipeline has two input repos, `foo` and `bar`, a transaction is useful for adding data to both atomically before the pipeline runs even once. >>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pfs >>> client: Client >>> with client.transaction.transaction() as txn: >>> with client.pfs.commit(branch=pfs.Branch.from_uri("foo@master")) as c1: >>> c1.put_file_from_bytes("/joint_data.txt", b"DATA1") >>> with client.pfs.commit(branch=pfs.Branch.from_uri("bar@master")) as c2: >>> c2.put_file_from_bytes("/joint_data.txt", b"DATA2") >>> c1.wait() >>> c2.wait() """ old_transaction_id = self._get_transaction_id() transaction = super().start_transaction() self._set_transaction_id(transaction.id) try: yield transaction except Exception: super().delete_transaction(transaction=transaction) raise else: super().finish_transaction(transaction=transaction) finally: self._set_transaction_id(old_transaction_id)
def transaction_exists(self, transaction: Transaction) ‑> bool
-
Checks whether a transaction exists.
Parameters
transaction
:transaction.Transaction
- The transaction to check.
Returns
bool
- Whether the transaction exists.
Expand source code
def transaction_exists(self, transaction: "Transaction") -> bool: """Checks whether a transaction exists. Parameters ---------- transaction: transaction.Transaction The transaction to check. Returns ------- bool Whether the transaction exists. """ try: super().inspect_transaction(transaction=transaction) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err