blob: 742ba19e0f9144e542752c6a3eea9f62137d8406 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tests statestore with non-default startup options
import logging
import os
import pytest
import re
import sys
import uuid
import socket
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_test_suite import ImpalaTestSuite
from Types.ttypes import TNetworkAddress
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
import StatestoreService.StatestoreSubscriber as Subscriber
import StatestoreService.StatestoreService as Statestore
from ErrorCodes.ttypes import TErrorCode
LOG = logging.getLogger('custom_statestore_test')
STATESTORE_SERVICE_PORT = 24000
# A simple wrapper class to launch a cluster where we can tune various
# startup parameters of the statestored to test correct boundary-value
# behavior.
class TestCustomStatestore(CustomClusterTestSuite):
# Grab a port the statestore subscribers will use to connect.
# Note that all subscribers we create below use this port to connect,
# with different subscriber IDs.
handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
handle.bind(('localhost', 0))
_, port = handle.getsockname()
@classmethod
def get_workload(self):
return 'functional-query'
def __register_subscriber(self):
subscriber_id = "python-test-client-%s" % uuid.uuid4()
topics = []
request = Subscriber.TRegisterSubscriberRequest(topic_registrations=topics,
subscriber_location=TNetworkAddress("localhost", self.port),
subscriber_id=subscriber_id)
client_transport = \
TTransport.TBufferedTransport(TSocket.TSocket('localhost', STATESTORE_SERVICE_PORT))
protocol = TBinaryProtocol.TBinaryProtocol(client_transport)
client = Statestore.Client(protocol)
client_transport.open()
return client.RegisterSubscriber(request)
@CustomClusterTestSuite.with_args(statestored_args="-statestore_max_subscribers=3")
def test_statestore_max_subscribers(self):
"""Test that the statestored correctly handles the condition where the number
of subscribers exceeds FLAGS_statestore_max_subscribers
(see be/src/statestore/statestore.cc). The expected behavior is for the
statestored to reject the subscription request once the threshold is
exceeded."""
# With a statestore_max_subscribers of 3, we should hit the registration error
# pretty quick.
for x in xrange(20):
response = self.__register_subscriber()
if response.status.status_code == TErrorCode.OK:
self.registration_id = response.registration_id
LOG.log(logging.INFO, "Registration id %s, x=%d" % (response.registration_id, x))
else:
assert 'Maximum subscriber limit reached:' in ''.join(response.status.error_msgs)
return
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--statestore_subscriber_use_resolved_address=true",
catalogd_args="--statestore_subscriber_use_resolved_address=true")
def test_subscriber_with_resolved_address(self, vector):
# Ensure cluster has started up by running a query.
result = self.execute_query("select count(*) from functional_parquet.alltypes")
assert result.success, str(result)
self.assert_impalad_log_contains("INFO",
"Registering with statestore with resolved address")
self.assert_catalogd_log_contains("INFO",
"Registering with statestore with resolved address")