blob: e8bb1a569c7d6d87919f9413dfb9723cb5598c5b [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
import os
import subprocess
class DorisMiniLoadClient(object):
""" load file to doris """
def __init__(self, db_host, db_port, db_name,
db_user, db_password, file_name, table, load_timeout):
"""
init
:param db_host: db host
:param db_port: db port
:param db_name: db name
:param db_user: db user
:param db_password: db password
:param file_name: local file path
:param table: db table
:param load_timeout:mini load timeout, defalut 86400 seconds.
"""
self.file_name = file_name
self.table = table
self.load_host = db_host
self.load_port = db_port
self.load_database = db_name
self.load_user = db_user
self.load_password = db_password
self.load_timeout = load_timeout
def get_label(self):
"""
获取label前缀
:return: lable
"""
return '_'.join([self.table, os.path.basename(self.file_name)])
def load_doris(self):
"""
load file to doris by curl, allow 3 times to retry.
:return: mini load label
"""
retry_time = 0
label = self.get_label()
while retry_time < 3:
load_cmd = "curl"
param_location = "--location-trusted"
param_user = "%s:%s" % (self.load_user, self.load_password)
param_file = "%s" % self.file_name
param_url = "http://%s:%s/api/%s/%s/_load?label=%s&timeout=" % (self.load_host, self.load_port,
self.load_database,
self.table, label, self.load_timeout)
load_subprocess = subprocess.Popen([load_cmd, param_location,
"-u", param_user, "-T", param_file, param_url])
# Wait for child process to terminate. Returns returncode attribute
load_subprocess.wait()
# check returncode;
# If fail, retry 3 times
if load_subprocess.returncode != 0:
print """Load to doris failed! LABEL is %s, Retry time is %d """ % (label, retry_time)
retry_time += 1
# If success, print log, and break retry loop
if load_subprocess.returncode == 0:
print """Load to doris success! LABEL is %s, Retry time is %d """ % (label, retry_time)
break
return label
@classmethod
def check_load_status(cls, label, host, port, user, password, database):
"""
check async mini load process status.
:param label:mini load label
:param host: db host
:param port: db port
:param user: db user
:param password: db password
:param database: db database
:return: check async mini load process status.
"""
db_conn = MySQLdb.connect(host=host,port=port,user=user,passwd=password,db=database)
db_cursor = db_conn.cursor()
check_status_sql = "show load where label = '%s' order by CreateTime desc limit 1" % label
db_cursor.execute(check_status_sql)
rows = db_cursor.fetchall()
# timeout config: 60 minutes.
timeout = 60 * 60
while timeout > 0:
if len(rows) == 0:
print """Load label: %s doesn't exist""" % label
return
load_status = rows[0][2]
print "mini load status: " + load_status
if load_status == 'FINISHED':
print """Async mini load to db success! label is %s""" % label
break
if load_status == 'CANCELLED':
print """Async load to db failed! label is %s""" % label
break
timeout = timeout - 5
time.sleep(5)
db_cursor.execute(sql)
rows = db_cursor.fetchall()
if time_out <= 0:
print """Async load to db timeout! timeout second is: %s, label is %s""" % (time_out, label)
if __name__ == '__main__':
"""
mini_load demo.
There is no need to install subprocess in Python 2.7. It is a standard module that is built in.
You need input db config & load param.
"""
db_host = "db_conn_host"
db_port = "port"
db_name = "db_name"
db_user = "db_user"
db_password = "db_password"
file_name = "file_name"
table = "db_table"
# default load_time_out, seconds
load_timeout = 86400
doris_client = DorisMiniLoadClient(
db_host, db_port, db_name, db_user, db_password, file_name, table, load_timeout)
doris_client.check_load_status(doris_client.load_doirs(), db_host, db_port, db_user, db_password, db_name)
print "load to doris end"