Module pachyderm_sdk.interceptor

Implementation of a gRPC interceptor used to set request metadata and catch connection errors.

Expand source code
"""
Implementation of a gRPC interceptor used to set request metadata
and catch connection errors.
"""
from os import environ
from typing import Callable, Sequence, Optional, Tuple, Union, cast

import grpc
from betterproto import Message
from grpc_interceptor import ClientCallDetails, ClientInterceptor

from .errors import AuthServiceNotActivated

MetadataType = Sequence[Tuple[str, Union[str, bytes]]]


class MetadataClientInterceptor(ClientInterceptor):
    def __init__(self, metadata: MetadataType):
        self.metadata = metadata

    def intercept(
        self, method: Callable, request: Message, call_details: ClientCallDetails
    ):
        call_details_metadata = list(call_details.metadata or [])
        call_details_metadata.extend(self.metadata)
        new_details = call_details._replace(metadata=call_details_metadata)

        # Error handling happens different depending on whether the response
        #   is Unary or Stream. If the response is a stream, then the error
        #   will be raised before the "done_callback" is called and will
        #   therefore be caught by the try-except block. If the response is
        #   Unary then the error "done_callback" will be called before the
        #   error is raised.
        try:
            future = method(request, new_details)
        except grpc.RpcError as error:
            # gRPC error types are confusing - instantiated errors are Futures.
            # ref: github.com/grpc/grpc/issues/25334#issuecomment-772730080
            error = cast(error, grpc.Future)
            _check_errors(error, request)
        else:
            future.add_done_callback(lambda f: _check_errors(f, request))
            return future


def _check_errors(grpc_future: grpc.Future, request: Message):
    """Callback function that checks if a gRPC.Future experienced a
    ConnectionError or TypeError and attempt to sanitize the error
    message for the user.
    """
    error: Optional[grpc.Call] = grpc_future.exception()
    if error is not None:
        code, details = error.code(), error.details()
        unable_to_connect = "failed to connect to all addresses" in details
        if code == grpc.StatusCode.UNAVAILABLE and unable_to_connect:
            error_message = "Could not connect to pachyderm instance\n"
            if "PACHD_PEER_SERVICE_HOST" in environ:
                error_message += (
                    "\tPACHD_PEER_SERVICE_HOST is detected. "
                    "Please use Client.new_in_cluster() when using"
                    " python_pachyderm within the pipeline. "
                )
            raise ConnectionError(error_message) from error

        unable_to_serialize = "Exception serializing request" in details
        if code == grpc.StatusCode.INTERNAL and unable_to_serialize:
            error_message = (
                "An error occurred while trying to serialize the following"
                f" {request.__class__.__qualname__} message.\n "
                " This is most likely due to one of the fields of"
                " this message having a value with an incorrect type.\n"
                f"\tMessage: {request}"
            )
            raise TypeError(error_message) from error

        auth_codes = (grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.UNAUTHENTICATED)
        auth_not_activated = "the auth service is not activated" in details
        if code in auth_codes and auth_not_activated:
            return AuthServiceNotActivated(details)

        raise error

Classes

class MetadataClientInterceptor (metadata: Sequence[Tuple[str, Union[str, bytes]]])

Base class for client-side interceptors.

To implement an interceptor, subclass this class and override the intercept method.

Expand source code
class MetadataClientInterceptor(ClientInterceptor):
    def __init__(self, metadata: MetadataType):
        self.metadata = metadata

    def intercept(
        self, method: Callable, request: Message, call_details: ClientCallDetails
    ):
        call_details_metadata = list(call_details.metadata or [])
        call_details_metadata.extend(self.metadata)
        new_details = call_details._replace(metadata=call_details_metadata)

        # Error handling happens different depending on whether the response
        #   is Unary or Stream. If the response is a stream, then the error
        #   will be raised before the "done_callback" is called and will
        #   therefore be caught by the try-except block. If the response is
        #   Unary then the error "done_callback" will be called before the
        #   error is raised.
        try:
            future = method(request, new_details)
        except grpc.RpcError as error:
            # gRPC error types are confusing - instantiated errors are Futures.
            # ref: github.com/grpc/grpc/issues/25334#issuecomment-772730080
            error = cast(error, grpc.Future)
            _check_errors(error, request)
        else:
            future.add_done_callback(lambda f: _check_errors(f, request))
            return future

Ancestors

  • grpc_interceptor.client.ClientInterceptor
  • grpc.UnaryUnaryClientInterceptor
  • grpc.UnaryStreamClientInterceptor
  • grpc.StreamUnaryClientInterceptor
  • grpc.StreamStreamClientInterceptor
  • abc.ABC

Methods

def intercept(self, method: Callable, request: betterproto.Message, call_details: grpc_interceptor.client.ClientCallDetails)

Override this method to implement a custom interceptor.

This method is called for all unary and streaming RPCs. The interceptor implementation should call method using a grpc.ClientCallDetails and the request_or_iterator object as parameters. The request_or_iterator parameter may be type checked to determine if this is a singluar request for unary RPCs or an iterator for client-streaming or client-server streaming RPCs.

Args

method
A function that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying channel.
request_or_iterator
RPC request message or iterator of request messages for streaming requests.
call_details
Describes an RPC to be invoked.

Returns

The type of the return should match the type of the return value received by calling method. This is an object that is both a Call <https://grpc.github.io/grpc/python/grpc.html#grpc.Call> for the RPC and a Future <https://grpc.github.io/grpc/python/grpc.html#grpc.Future>.

The actual result from the RPC can be got by calling .result() on the value returned from method.

Expand source code
def intercept(
    self, method: Callable, request: Message, call_details: ClientCallDetails
):
    call_details_metadata = list(call_details.metadata or [])
    call_details_metadata.extend(self.metadata)
    new_details = call_details._replace(metadata=call_details_metadata)

    # Error handling happens different depending on whether the response
    #   is Unary or Stream. If the response is a stream, then the error
    #   will be raised before the "done_callback" is called and will
    #   therefore be caught by the try-except block. If the response is
    #   Unary then the error "done_callback" will be called before the
    #   error is raised.
    try:
        future = method(request, new_details)
    except grpc.RpcError as error:
        # gRPC error types are confusing - instantiated errors are Futures.
        # ref: github.com/grpc/grpc/issues/25334#issuecomment-772730080
        error = cast(error, grpc.Future)
        _check_errors(error, request)
    else:
        future.add_done_callback(lambda f: _check_errors(f, request))
        return future