blob: afb4d182cabdf427de01c3ca6c1cb9c2a2cdb0d8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Factory to create grpc channel."""
# pytype: skip-file
import grpc
class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
DEFAULT_OPTIONS = [
# Setting keepalive_time_ms is needed for other options to work.
("grpc.keepalive_time_ms", 20_000),
# Default: 20s. Increasing to 5 min.
("grpc.keepalive_timeout_ms", 300_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
]
def __init__(self):
pass
@staticmethod
def insecure_channel(target, options=None):
if options is None:
options = []
return grpc.insecure_channel(
target, options=options + GRPCChannelFactory.DEFAULT_OPTIONS)
@staticmethod
def secure_channel(target, credentials, options=None):
if options is None:
options = []
return grpc.secure_channel(
target,
credentials,
options=options + GRPCChannelFactory.DEFAULT_OPTIONS)