blob: c4568d2938a4ef971a609eebfc12f326deb0452e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"fmt"
"io"
"net"
"os"
"os/exec"
"time"
"github.com/apache/spark-connect-go/v35/spark/sparkerrors"
)
func StartSparkConnect() (int64, error) {
sparkHome := os.Getenv("SPARK_HOME")
if sparkHome == "" {
return -1, sparkerrors.WithString(sparkerrors.TestSetupError, "SPARK_HOME not set")
}
fmt.Printf("Starting Spark Connect Server in: %v\n", os.Getenv("SPARK_HOME"))
cmd := exec.Command("./sbin/start-connect-server.sh", "--conf",
"spark.log.structuredLogging.enabled=false", "--packages",
"org.apache.spark:spark-connect_2.12:3.5.2")
cmd.Dir = sparkHome
baseEnv := os.Environ()
baseEnv = append(baseEnv, "SPARK_NO_DAEMONIZE=1")
cmd.Env = baseEnv
stdout, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
return -1, sparkerrors.WithType(sparkerrors.TestSetupError, err)
}
timeout := time.After(60 * time.Second)
tick := time.NewTicker(1 * time.Second)
for {
select {
case <-timeout:
out, _ := io.ReadAll(stdout)
fmt.Printf("Output: %v\n", string(out))
return -1, sparkerrors.WithString(sparkerrors.TestSetupError,
"timeout waiting for Spark Connect to start")
case <-tick.C:
if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
return -1, sparkerrors.WithString(sparkerrors.TestSetupError, "Spark Connect exited")
}
conn, err := net.Dial("tcp", "localhost:15002")
if err == nil {
conn.Close()
return int64(cmd.Process.Pid), nil
}
}
}
}
func StopSparkConnect(pid int64) error {
return nil
}