blob: f2452cd49b940d23d6b59c5779ec2494606fe013 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
import re
import time
import os
import sys
import MySQLdb
def insert_func(host, port, user, password, database, select_sql, insert_sql):
"""
insert into doris table select xxx
:param host:
:param port:
:param user:
:param password:
:param database:
:param select_sql: SELECT column1, column2,..., columnN [FROM TABLE_X WHERE xxx]
:param insert_sql: INSERT INTO TABLE_Y[(column1, column2,...,columnN)]
:return:
"""
db_conn = MySQLdb.connect(host=host,
port=port,
user=user,
passwd=password,
db=database)
db_cursor = db_conn.cursor()
insert_process(select_sql, insert_sql, db_cursor)
def insert_process(select_sql, insert_sql, cursor):
"""
issue insert task and check insert task status.
:param select_sql: SELECT column1, column2,..., columnN [FROM TABLE_X WHERE xxx]
:param insert_sql: INSERT INTO TABLE_Y[(column1, column2,...,columnN)]
:param cursor:
:return:
"""
print insert_sql
print select_sql
cursor.execute(select_sql)
rows = cursor.fetchall()
if len(rows) == 0:
print "select result is empty, don't need insert"
return
cursor.execute(insert_sql + select_sql)
label_info = cursor._info
label = re.match(
r'{\'label\':\'([a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12})\'}',
label_info).group(1)
print label
# check insert task status
sql = "show load where label = '" + label + "' order by CreateTime desc limit 1"
print sql
cursor.execute(sql)
rows = cursor.fetchall()
timeout = 60 * 60
load_status = ""
while timeout > 0:
load_status = rows[0][2]
print "insert status: " + load_status
if load_status == 'FINISHED' or load_status == 'CANCELLED':
break
time.sleep(5)
timeout = timeout - 5
cursor.execute(sql)
rows = cursor.fetchall()
if load_status == "CANCELLED":
exit("error: insert data CANCELLED")
elif load_status != "FINISHED":
exit("error: insert data timout")
else:
print "insert success."
if __name__ == '__main__':
"""
Befor you run this demo, you should do as below.
First, you need install MySQLdb, execute cmd by root:
pip install MySQL-python
# if you met error: "mysql_config not found", you can execute the following cmd to solve it.
ln -s /usr/local/mysql/bin/mysql_config /usr/local/bin/mysql_config
Second, you need input your db connect config & input insert/select sql
"""
insert_func("127.0.0.1", 8080, 'user', 'password', 'test_db',
"SELECT column1, column2 FROM TABLE_X WHERE column1 = 'test'",
"INSERT INTO TABLE_Y(column1, column2)")