Module pachyderm_sdk.interceptor
Implementation of a gRPC interceptor used to set request metadata and catch connection errors.
Expand source code
"""
Implementation of a gRPC interceptor used to set request metadata
and catch connection errors.
"""
from os import environ
from typing import Callable, Sequence, Optional, Tuple, Union, cast
import grpc
from betterproto import Message
from grpc_interceptor import ClientCallDetails, ClientInterceptor
from .errors import AuthServiceNotActivated
MetadataType = Sequence[Tuple[str, Union[str, bytes]]]
class MetadataClientInterceptor(ClientInterceptor):
def __init__(self, metadata: MetadataType):
self.metadata = metadata
def intercept(
self, method: Callable, request: Message, call_details: ClientCallDetails
):
call_details_metadata = list(call_details.metadata or [])
call_details_metadata.extend(self.metadata)
new_details = call_details._replace(metadata=call_details_metadata)
# Error handling happens different depending on whether the response
# is Unary or Stream. If the response is a stream, then the error
# will be raised before the "done_callback" is called and will
# therefore be caught by the try-except block. If the response is
# Unary then the error "done_callback" will be called before the
# error is raised.
try:
future = method(request, new_details)
except grpc.RpcError as error:
# gRPC error types are confusing - instantiated errors are Futures.
# ref: github.com/grpc/grpc/issues/25334#issuecomment-772730080
error = cast(error, grpc.Future)
_check_errors(error, request)
else:
future.add_done_callback(lambda f: _check_errors(f, request))
return future
def _check_errors(grpc_future: grpc.Future, request: Message):
"""Callback function that checks if a gRPC.Future experienced a
ConnectionError or TypeError and attempt to sanitize the error
message for the user.
"""
error: Optional[grpc.Call] = grpc_future.exception()
if error is not None:
code, details = error.code(), error.details()
unable_to_connect = "failed to connect to all addresses" in details
if code == grpc.StatusCode.UNAVAILABLE and unable_to_connect:
error_message = "Could not connect to pachyderm instance\n"
if "PACHD_PEER_SERVICE_HOST" in environ:
error_message += (
"\tPACHD_PEER_SERVICE_HOST is detected. "
"Please use Client.new_in_cluster() when using"
" python_pachyderm within the pipeline. "
)
raise ConnectionError(error_message) from error
unable_to_serialize = "Exception serializing request" in details
if code == grpc.StatusCode.INTERNAL and unable_to_serialize:
error_message = (
"An error occurred while trying to serialize the following"
f" {request.__class__.__qualname__} message.\n "
" This is most likely due to one of the fields of"
" this message having a value with an incorrect type.\n"
f"\tMessage: {request}"
)
raise TypeError(error_message) from error
auth_codes = (grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.UNAUTHENTICATED)
auth_not_activated = "the auth service is not activated" in details
if code in auth_codes and auth_not_activated:
return AuthServiceNotActivated(details)
raise error
Classes
class MetadataClientInterceptor (metadata: Sequence[Tuple[str, Union[str, bytes]]])
-
Base class for client-side interceptors.
To implement an interceptor, subclass this class and override the intercept method.
Expand source code
class MetadataClientInterceptor(ClientInterceptor): def __init__(self, metadata: MetadataType): self.metadata = metadata def intercept( self, method: Callable, request: Message, call_details: ClientCallDetails ): call_details_metadata = list(call_details.metadata or []) call_details_metadata.extend(self.metadata) new_details = call_details._replace(metadata=call_details_metadata) # Error handling happens different depending on whether the response # is Unary or Stream. If the response is a stream, then the error # will be raised before the "done_callback" is called and will # therefore be caught by the try-except block. If the response is # Unary then the error "done_callback" will be called before the # error is raised. try: future = method(request, new_details) except grpc.RpcError as error: # gRPC error types are confusing - instantiated errors are Futures. # ref: github.com/grpc/grpc/issues/25334#issuecomment-772730080 error = cast(error, grpc.Future) _check_errors(error, request) else: future.add_done_callback(lambda f: _check_errors(f, request)) return future
Ancestors
- grpc_interceptor.client.ClientInterceptor
- grpc.UnaryUnaryClientInterceptor
- grpc.UnaryStreamClientInterceptor
- grpc.StreamUnaryClientInterceptor
- grpc.StreamStreamClientInterceptor
- abc.ABC
Methods
def intercept(self, method: Callable, request: betterproto.Message, call_details: grpc_interceptor.client.ClientCallDetails)
-
Override this method to implement a custom interceptor.
This method is called for all unary and streaming RPCs. The interceptor implementation should call
method
using agrpc.ClientCallDetails
and therequest_or_iterator
object as parameters. Therequest_or_iterator
parameter may be type checked to determine if this is a singluar request for unary RPCs or an iterator for client-streaming or client-server streaming RPCs.Args
method
- A function that proceeds with the invocation by executing the next interceptor in the chain or invoking the actual RPC on the underlying channel.
request_or_iterator
- RPC request message or iterator of request messages for streaming requests.
call_details
- Describes an RPC to be invoked.
Returns
The type of the return should match the type of the return value received by calling
method
. This is an object that is both aCall <https://grpc.github.io/grpc/python/grpc.html#grpc.Call>
for the RPC and aFuture <https://grpc.github.io/grpc/python/grpc.html#grpc.Future>
.The actual result from the RPC can be got by calling
.result()
on the value returned frommethod
.Expand source code
def intercept( self, method: Callable, request: Message, call_details: ClientCallDetails ): call_details_metadata = list(call_details.metadata or []) call_details_metadata.extend(self.metadata) new_details = call_details._replace(metadata=call_details_metadata) # Error handling happens different depending on whether the response # is Unary or Stream. If the response is a stream, then the error # will be raised before the "done_callback" is called and will # therefore be caught by the try-except block. If the response is # Unary then the error "done_callback" will be called before the # error is raised. try: future = method(request, new_details) except grpc.RpcError as error: # gRPC error types are confusing - instantiated errors are Futures. # ref: github.com/grpc/grpc/issues/25334#issuecomment-772730080 error = cast(error, grpc.Future) _check_errors(error, request) else: future.add_done_callback(lambda f: _check_errors(f, request)) return future