fix: close pg conn after rows (#4274)

* fix: close pg conn after rows

* fix: wrap for loop rows in local func
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 38c59b5..afdffb9 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -279,39 +279,46 @@
 	offset := 0
 	var err error
 	for {
-		var rows *sql.Rows
 		var data []map[string]interface{}
 		// select data from db
-		rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by %s limit %d offset %d", table, orderBy, config.BatchSize, offset))
-		if err != nil {
-			return err
-		}
-		cols, err := rows.Columns()
-		if err != nil {
-			return err
-		}
-		for rows.Next() {
-			row := make(map[string]interface{})
-			columns := make([]interface{}, len(cols))
-			columnPointers := make([]interface{}, len(cols))
-			for i := range columns {
-				dataType := columnMap[cols[i]]
-				if strings.HasPrefix(dataType, "array") {
-					var arr []string
-					columns[i] = &arr
-					columnPointers[i] = pq.Array(&arr)
-				} else {
-					columnPointers[i] = &columns[i]
-				}
-			}
-			err = rows.Scan(columnPointers...)
+		err = func() error {
+			var rows *sql.Rows
+			rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by %s limit %d offset %d", table, orderBy, config.BatchSize, offset))
 			if err != nil {
 				return err
 			}
-			for i, colName := range cols {
-				row[colName] = columns[i]
+			defer rows.Close()
+			cols, err := rows.Columns()
+			if err != nil {
+				return err
 			}
-			data = append(data, row)
+			for rows.Next() {
+				row := make(map[string]interface{})
+				columns := make([]interface{}, len(cols))
+				columnPointers := make([]interface{}, len(cols))
+				for i := range columns {
+					dataType := columnMap[cols[i]]
+					if strings.HasPrefix(dataType, "array") {
+						var arr []string
+						columns[i] = &arr
+						columnPointers[i] = pq.Array(&arr)
+					} else {
+						columnPointers[i] = &columns[i]
+					}
+				}
+				err = rows.Scan(columnPointers...)
+				if err != nil {
+					return err
+				}
+				for i, colName := range cols {
+					row[colName] = columns[i]
+				}
+				data = append(data, row)
+			}
+			return nil
+		}()
+		if err != nil {
+			return err
 		}
 		if len(data) == 0 {
 			c.GetLogger().Warn(nil, "no data found in table %s already, limit: %d, offset: %d, so break", table, config.BatchSize, offset)
@@ -400,6 +407,7 @@
 	if err != nil {
 		return err
 	}
+	defer rows.Close()
 	var sourceCount int
 	for rows.Next() {
 		err = rows.Scan(&sourceCount)
@@ -407,13 +415,14 @@
 			return err
 		}
 	}
-	rows, err = starrocks.Query(fmt.Sprintf("select count(*) from %s", starrocksTable))
+	rowsStarRocks, err := starrocks.Query(fmt.Sprintf("select count(*) from %s", starrocksTable))
 	if err != nil {
 		return err
 	}
+	defer rowsStarRocks.Close()
 	var starrocksCount int
-	for rows.Next() {
-		err = rows.Scan(&starrocksCount)
+	for rowsStarRocks.Next() {
+		err = rowsStarRocks.Scan(&starrocksCount)
 		if err != nil {
 			return err
 		}