Module pachyderm_sdk.datum_batching
A high-level decorator for pipeline code that uses the datum-batching feature.
Expand source code
"""A high-level decorator for pipeline code that uses the datum-batching feature."""
from functools import wraps
from typing import Callable
from . import Client
PIPELINE_FUNC = Callable[..., None]
def batch_all_datums(user_code: PIPELINE_FUNC) -> PIPELINE_FUNC:
"""A decorator that will repeatedly call the wrapped function until
all datums have been processed. Before calling the wrapped function,
this decorator will call the NextDatum endpoint within the worker
and set any environment variables specified by the worker.
Any exceptions raised during the execution of the wrapped function
will be reported back to the worker. See the pachyderm documentation
for more information on how the datum batching feature works.
Note: This can only be used within a Pachyderm worker.
Examples
--------
>>> from pachyderm_sdk import batch_all_datums
>>>
>>> @batch_all_datums
>>> def pipeline():
>>> # process datums
>>> pass
>>>
>>> if __name__ == '__main__':
>>> # Perform an expensive computation here before
>>> # entering your datum processing function
>>> # i.e. initializing a model.
>>> pipeline()
Check the following link for a more substatial example:
github.com/pachyderm/examples/tree/master/object-detection
"""
@wraps(user_code)
def wrapper(*args, **kwargs) -> None:
worker = Client().worker
while True:
with worker.batch_datum():
user_code(*args, **kwargs)
return wrapper
Functions
def batch_all_datums(user_code: Callable[..., None]) ‑> Callable[..., None]
-
A decorator that will repeatedly call the wrapped function until all datums have been processed. Before calling the wrapped function, this decorator will call the NextDatum endpoint within the worker and set any environment variables specified by the worker.
Any exceptions raised during the execution of the wrapped function will be reported back to the worker. See the pachyderm documentation for more information on how the datum batching feature works.
Note: This can only be used within a Pachyderm worker.
Examples
>>> from pachyderm_sdk import batch_all_datums >>> >>> @batch_all_datums >>> def pipeline(): >>> # process datums >>> pass >>> >>> if __name__ == '__main__': >>> # Perform an expensive computation here before >>> # entering your datum processing function >>> # i.e. initializing a model. >>> pipeline()
Check the following link for a more substatial example: github.com/pachyderm/examples/tree/master/object-detection
Expand source code
def batch_all_datums(user_code: PIPELINE_FUNC) -> PIPELINE_FUNC: """A decorator that will repeatedly call the wrapped function until all datums have been processed. Before calling the wrapped function, this decorator will call the NextDatum endpoint within the worker and set any environment variables specified by the worker. Any exceptions raised during the execution of the wrapped function will be reported back to the worker. See the pachyderm documentation for more information on how the datum batching feature works. Note: This can only be used within a Pachyderm worker. Examples -------- >>> from pachyderm_sdk import batch_all_datums >>> >>> @batch_all_datums >>> def pipeline(): >>> # process datums >>> pass >>> >>> if __name__ == '__main__': >>> # Perform an expensive computation here before >>> # entering your datum processing function >>> # i.e. initializing a model. >>> pipeline() Check the following link for a more substatial example: github.com/pachyderm/examples/tree/master/object-detection """ @wraps(user_code) def wrapper(*args, **kwargs) -> None: worker = Client().worker while True: with worker.batch_datum(): user_code(*args, **kwargs) return wrapper