blob: 6c1080b9227dea7483e7d98e87df34d12032b3f7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package emulators
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"beam.apache.org/playground/backend/internal/constants"
"beam.apache.org/playground/backend/internal/logger"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/linkedin/goavro/v2"
)
const (
addressSeperator = ":"
pauseDuration = 100 * time.Millisecond
globalDuration = 120 * time.Second
bootstrapServerKey = "bootstrap.servers"
networkType = "tcp"
jsonExt = ".json"
avroExt = ".avro"
)
type KafkaMockCluster struct {
cmd *exec.Cmd
host string
port string
preparerParameters map[string]string
}
func NewKafkaMockCluster(emulatorExecutablePath string) (*KafkaMockCluster, error) {
cmd := exec.Command("java", "-jar", emulatorExecutablePath)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
err = cmd.Start()
if err != nil {
return nil, err
}
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
logger.Infof("Emulator: %s", scanner.Text())
}
if err := scanner.Err(); err != nil {
logger.Errorf("Failed to read stderr: %s", err.Error())
}
}()
const host = "127.0.0.1"
var port string
stdoutScanner := bufio.NewScanner(stdout)
stdoutScanner.Scan()
line := stdoutScanner.Text()
if err := stdoutScanner.Err(); err != nil {
if cmd.ProcessState != nil && !cmd.ProcessState.Exited() {
if killErr := cmd.Process.Kill(); killErr != nil {
logger.Errorf("failed to kill emulator: %s", killErr.Error())
return nil, killErr
}
}
return nil, err
}
_, err = fmt.Sscanf(line, "Port: %s", &port)
if err != nil {
if cmd.ProcessState != nil && !cmd.ProcessState.Exited() {
if killErr := cmd.Process.Kill(); killErr != nil {
logger.Errorf("failed to kill emulator: %s", killErr.Error())
return nil, killErr
}
}
return nil, err
}
bootstrapServersArr := []string{host, port}
bootstrapServers := fmt.Sprintf("%s%s%s", host, addressSeperator, port)
workTicker := time.NewTicker(pauseDuration)
defer workTicker.Stop()
globalTicker := time.NewTicker(globalDuration)
defer globalTicker.Stop()
for {
select {
case <-workTicker.C:
if _, err = net.DialTimeout(networkType, bootstrapServers, pauseDuration); err == nil {
return &KafkaMockCluster{
cmd: cmd,
host: bootstrapServersArr[0],
port: bootstrapServersArr[1],
preparerParameters: make(map[string]string)}, nil
}
case <-globalTicker.C:
return nil, errors.New("timeout while a mock cluster is starting")
}
}
}
func (kmc *KafkaMockCluster) Stop() {
logger.Infof("Stopping Kafka emulator")
if kmc.cmd != nil {
err := kmc.cmd.Process.Signal(os.Interrupt)
if err != nil {
logger.Errorf("Failed to send Interrupt signal to process %d: %v", kmc.cmd.Process.Pid, err)
return
}
_, err = kmc.cmd.Process.Wait()
if err != nil {
logger.Errorf("Failed to wait for process %d: %v", kmc.cmd.Process.Pid, err)
}
}
}
func (kmc *KafkaMockCluster) GetAddress() string {
return fmt.Sprintf("%s%s%s", kmc.host, addressSeperator, kmc.port)
}
type KafkaProducer struct {
cluster *KafkaMockCluster
producer *kafka.Producer
}
func NewKafkaProducer(cluster *KafkaMockCluster) (*KafkaProducer, error) {
logger.Infof("Kafka server: %s", cluster.GetAddress())
p, err := kafka.NewProducer(&kafka.ConfigMap{
bootstrapServerKey: cluster.GetAddress(),
"acks": "all",
})
if err != nil {
return nil, err
}
return &KafkaProducer{cluster: cluster, producer: p}, nil
}
func (kp *KafkaProducer) ProduceDatasets(datasets []*DatasetDTO) error {
if kp.producer == nil {
return errors.New("producer not set")
}
defer kp.producer.Close()
for _, dataset := range datasets {
topic := getTopic(dataset)
entries, err := unmarshallDatasets(dataset)
if err != nil {
return err
}
err = produce(entries, kp, topic)
if err != nil {
return err
}
}
return nil
}
func produce(entries []map[string]interface{}, kp *KafkaProducer, topic *string) error {
for _, entry := range entries {
entryBytes, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("failed to marshal data, err: %v", err)
}
if err := kp.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: topic, Partition: 0},
Value: entryBytes},
nil,
); err != nil {
return errors.New("failed to produce data to a topic")
}
e := <-kp.producer.Events()
if m, ok := e.(*kafka.Message); ok {
if m.TopicPartition.Error != nil {
return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
}
} else if err, ok := e.(kafka.Error); ok {
return fmt.Errorf("producer error: %v", err)
} else {
return fmt.Errorf("unknown event: %s", e.String())
}
}
return nil
}
func getTopic(dataset *DatasetDTO) *string {
topicName := dataset.Dataset.Options[constants.TopicNameKey]
topic := new(string)
*topic = topicName
return topic
}
func unmarshallDatasets(dataset *DatasetDTO) ([]map[string]interface{}, error) {
ext := filepath.Ext(dataset.Dataset.DatasetPath)
var entries []map[string]interface{}
switch ext {
case jsonExt:
err := json.Unmarshal(dataset.Data, &entries)
if err != nil {
return nil, fmt.Errorf("failed to parse a dataset in json format, err: %v", err)
}
break
case avroExt:
ocfReader, err := goavro.NewOCFReader(strings.NewReader(string(dataset.Data)))
if err != nil {
return nil, fmt.Errorf("failed to parse a dataset in avro format, err: %v", err)
}
for ocfReader.Scan() {
record, err := ocfReader.Read()
if err != nil {
return nil, fmt.Errorf("failed to read a file in avro format, err: %v", err)
}
entries = append(entries, record.(map[string]interface{}))
}
break
default:
return nil, errors.New("wrong dataset extension")
}
return entries, nil
}
func (kmc *KafkaMockCluster) GetPreparerParameters() map[string]string {
return kmc.preparerParameters
}