fix: close pg conn after rows (#4274)
* fix: close pg conn after rows
* fix: wrap for loop rows in local func
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 38c59b5..afdffb9 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -279,39 +279,46 @@
offset := 0
var err error
for {
- var rows *sql.Rows
var data []map[string]interface{}
// select data from db
- rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by %s limit %d offset %d", table, orderBy, config.BatchSize, offset))
- if err != nil {
- return err
- }
- cols, err := rows.Columns()
- if err != nil {
- return err
- }
- for rows.Next() {
- row := make(map[string]interface{})
- columns := make([]interface{}, len(cols))
- columnPointers := make([]interface{}, len(cols))
- for i := range columns {
- dataType := columnMap[cols[i]]
- if strings.HasPrefix(dataType, "array") {
- var arr []string
- columns[i] = &arr
- columnPointers[i] = pq.Array(&arr)
- } else {
- columnPointers[i] = &columns[i]
- }
- }
- err = rows.Scan(columnPointers...)
+ err = func() error {
+ var rows *sql.Rows
+ rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by %s limit %d offset %d", table, orderBy, config.BatchSize, offset))
if err != nil {
return err
}
- for i, colName := range cols {
- row[colName] = columns[i]
+ defer rows.Close()
+ cols, err := rows.Columns()
+ if err != nil {
+ return err
}
- data = append(data, row)
+ for rows.Next() {
+ row := make(map[string]interface{})
+ columns := make([]interface{}, len(cols))
+ columnPointers := make([]interface{}, len(cols))
+ for i := range columns {
+ dataType := columnMap[cols[i]]
+ if strings.HasPrefix(dataType, "array") {
+ var arr []string
+ columns[i] = &arr
+ columnPointers[i] = pq.Array(&arr)
+ } else {
+ columnPointers[i] = &columns[i]
+ }
+ }
+ err = rows.Scan(columnPointers...)
+ if err != nil {
+ return err
+ }
+ for i, colName := range cols {
+ row[colName] = columns[i]
+ }
+ data = append(data, row)
+ }
+ return nil
+ }()
+ if err != nil {
+ return err
}
if len(data) == 0 {
c.GetLogger().Warn(nil, "no data found in table %s already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
@@ -400,6 +407,7 @@
if err != nil {
return err
}
+ defer rows.Close()
var sourceCount int
for rows.Next() {
err = rows.Scan(&sourceCount)
@@ -407,13 +415,14 @@
return err
}
}
- rows, err = starrocks.Query(fmt.Sprintf("select count(*) from %s", starrocksTable))
+ rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from %s", starrocksTable))
if err != nil {
return err
}
+ defer rowsStarRocks.Close()
var starrocksCount int
- for rows.Next() {
- err = rows.Scan(&starrocksCount)
+ for rowsStarRocks.Next() {
+ err = rowsStarRocks.Scan(&starrocksCount)
if err != nil {
return err
}