blob: 7b843b4f6823761427d66d9c8af56dc5d2231cac [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import MySQLdb
# NOTE: The default organization of meta info is cascading, we flatten its structure
# NOTE: We get schema-hash from proc '/dbs/db_id/tbl_id/'index_schema'
class FeMetaResolver:
def __init__(self, fe_host, query_port, user, query_pwd):
self.fe_host = fe_host
self.query_port = query_port
self.user = user
self.query_pwd = query_pwd
self.db = None
self.cur = None
self.be_list = []
self.db_list = []
# Only base tables, excluding rollups
self.table_list = []
# All rollups, including base tables
self.rollup_map = {}
self.partition_list = []
self.index_list = []
self.tablet_map = {}
def init(self):
self.connect_mysql()
self.fetch_be_list();
self.fetch_db_list();
self.fetch_table_list();
self.fetch_rollup_map();
self.fetch_partition_list();
self.fetch_idx_list();
self._merge_schema_hash_to_idx_list()
self.fetch_tablet_list();
self.close()
def connect_mysql(self):
try:
self.db = MySQLdb.connect(host=self.fe_host, port=self.query_port,
user=self.user,
passwd=self.query_pwd)
self.cur = self.db.cursor()
except MySQLdb.Error as e:
print ("Failed to connect fe server. error %s:%s" % (str(e.args[0]), e.args[1]))
exit(-1);
def exec_sql(self, sql):
try:
self.cur.execute(sql)
except MySQLdb.Error as e:
print ("exec sql error %s:%s" % (str(e.args[0]), e.args[1]))
exit(-1);
def close(self):
if self.db.open:
self.cur.close()
self.db.close()
def fetch_be_list(self):
show_be_sql = "show backends"
self.exec_sql(show_be_sql);
be_list = self.cur.fetchall()
for be_tuple in be_list :
be = {}
be['be_id'] = long(be_tuple[0])
be['ip'] = be_tuple[2]
be['http_port'] = be_tuple[5]
self.be_list.append(be)
return
def fetch_db_list(self):
show_database_sql = "show proc \"/dbs\" "
self.exec_sql(show_database_sql);
db_list = self.cur.fetchall()
for db_tuple in db_list :
db = {}
if long(db_tuple[0]) <= 0:
continue
db['db_id'] = long(db_tuple[0])
db['db_name'] = db_tuple[1]
self.db_list.append(db)
def fetch_table_list(self):
for db in self.db_list:
self._fetch_tables_by_db(db)
def _fetch_tables_by_db(self, db):
sql = "show proc \"/dbs/%s\" " % db['db_id']
self.exec_sql(sql);
table_list = self.cur.fetchall()
for table_tuple in table_list :
table = {}
table['db_id'] = db['db_id']
table['db_name'] = db['db_name']
table['tbl_id'] = long(table_tuple[0])
table['tbl_name'] = table_tuple[1]
self.table_list.append(table)
return
def fetch_rollup_map(self):
for table in self.table_list:
self._fetch_rollups_by_table(table);
def _fetch_rollups_by_table(self, table):
sql = "show proc \"/dbs/%s/%s/index_schema\" " % (table['db_id'], table['tbl_id'])
self.exec_sql(sql);
index_list = self.cur.fetchall()
for index_tuple in index_list :
index = {}
index['tbl_id'] = table['tbl_id']
index['tbl_name'] = table['tbl_name']
index['idx_id'] = long(index_tuple[0])
index['schema_hash'] = long(index_tuple[3])
self.rollup_map[index['idx_id']] = index
return
def fetch_partition_list(self):
for table in self.table_list:
self._fetch_partitions_by_table(table);
def _fetch_partitions_by_table(self, table):
sql = "show proc \"/dbs/%s/%s/partitions\" " % (table['db_id'], table['tbl_id'])
self.exec_sql(sql);
partition_list = self.cur.fetchall()
for partition_tuple in partition_list :
partition = {}
partition['db_id'] = table['db_id']
partition['db_name'] = table['db_name']
partition['tbl_id'] = table['tbl_id']
partition['tbl_name'] = table['tbl_name']
partition['partition_id'] = long(partition_tuple[0])
partition['partition_name'] = partition_tuple[1]
self.partition_list.append(partition)
return
def fetch_idx_list(self):
for partition in self.partition_list:
self._fetch_idxes_by_partition(partition);
def _fetch_idxes_by_partition(self, partition):
sql = "show proc \"/dbs/%s/%s/partitions/%s\" " % \
(partition['db_id'], partition['tbl_id'], partition['partition_id'])
self.exec_sql(sql);
index_list = self.cur.fetchall()
for idx_tuple in index_list :
idx = {}
idx['db_id'] = partition['db_id']
idx['db_name'] = partition['db_name']
idx['tbl_id'] = partition['tbl_id']
idx['tbl_name'] = partition['tbl_name']
idx['partition_id'] = partition['partition_id']
idx['partition_name'] = partition['partition_name']
idx['idx_id'] = long(idx_tuple[0])
idx['idx_name'] = idx_tuple[1]
idx['idx_state'] = idx_tuple[2]
self.index_list.append(idx)
return
def _merge_schema_hash_to_idx_list(self):
for index in self.index_list:
idx_id = index['idx_id']
rollup = self.rollup_map.get(idx_id)
index['schema_hash'] = rollup['schema_hash']
def fetch_tablet_list(self):
for index in self.index_list:
self._fetch_tablets_by_index(index);
def _fetch_tablets_by_index(self, index):
sql = "show proc \"/dbs/%s/%s/partitions/%s/%s\" " % \
(index['db_id'], index['tbl_id'], index['partition_id'], index['idx_id'])
self.exec_sql(sql);
tablet_list = self.cur.fetchall()
for tablet_tuple in tablet_list :
tablet = {}
tablet['db_id'] = index['db_id']
tablet['db_name'] = index['db_name']
tablet['tbl_id'] = index['tbl_id']
tablet['tbl_name'] = index['tbl_name']
tablet['partition_id'] = index['partition_id']
tablet['partition_name'] = index['partition_name']
tablet['idx_id'] = index['idx_id']
tablet['idx_name'] = index['idx_name']
tablet['idx_state'] = index['idx_state']
tablet['tablet_id'] = long(tablet_tuple[0])
tablet['replica_id'] = long(tablet_tuple[1])
tablet['be_id'] = long(tablet_tuple[2])
tablet['schema_hash'] = index["schema_hash"]
self.tablet_map[tablet['tablet_id']] = tablet
return
def debug_output(self):
print "be_list:"
self._print_list(self.be_list)
print
print "database_list:"
self._print_list(self.db_list)
print
print "table_list:"
self._print_list(self.table_list)
print
print "rollup_list:"
self._print_list(self.rollup_map.values())
print
print "partition_list:"
self._print_list(self.partition_list)
print
print "index_list:"
self._print_list(self.index_list)
print
print "tablet_map:(%s), print up to ten here:" % len(self.tablet_map)
self._print_list(self.tablet_map.values()[0:10])
print
def _print_list(self, one_list):
for item in one_list:
print item
def get_tablet_by_id(self, tablet_id):
return self.tablet_map.get(tablet_id)
def get_all_tablets(self):
return self.tablet_map.values()