blob: c2f29d30e655eccca449f6e31a99c92927f70cc2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"context"
"io"
"os/exec"
"sort"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/ioextended"
"github.com/apache/dubbo-kubernetes/pkg/bufman/pkg/thread"
)
var emptyEnv = envSlice(
map[string]string{
"__EMPTY_ENV": "1",
},
)
type runner struct {
parallelism int
semaphoreC chan struct{}
}
func newRunner(options ...RunnerOption) *runner {
runner := &runner{
parallelism: thread.Parallelism(),
}
for _, option := range options {
option(runner)
}
runner.semaphoreC = make(chan struct{}, runner.parallelism)
return runner
}
func (r *runner) Run(ctx context.Context, name string, options ...RunOption) error {
execOptions := newExecOptions()
for _, option := range options {
option(execOptions)
}
cmd := exec.CommandContext(ctx, name, execOptions.args...)
execOptions.ApplyToCmd(cmd)
r.increment()
err := cmd.Run()
r.decrement()
return err
}
func (r *runner) Start(name string, options ...StartOption) (Process, error) {
execOptions := newExecOptions()
for _, option := range options {
option(execOptions)
}
cmd := exec.Command(name, execOptions.args...)
execOptions.ApplyToCmd(cmd)
r.increment()
if err := cmd.Start(); err != nil {
return nil, err
}
process := newProcess(cmd, r.decrement)
process.Monitor()
return process, nil
}
func (r *runner) increment() {
r.semaphoreC <- struct{}{}
}
func (r *runner) decrement() {
<-r.semaphoreC
}
type execOptions struct {
args []string
env map[string]string
stdin io.Reader
stdout io.Writer
stderr io.Writer
dir string
}
func newExecOptions() *execOptions {
return &execOptions{}
}
func (e *execOptions) ApplyToCmd(cmd *exec.Cmd) {
// If the user did not specify env vars, we want to make sure
// the command has access to none, as the default is the current env.
if len(e.env) == 0 {
cmd.Env = emptyEnv
} else {
cmd.Env = envSlice(e.env)
}
// If the user did not specify any stdin, we want to make sure
// the command has access to none, as the default is the default stdin.
if e.stdin == nil {
cmd.Stdin = ioextended.DiscardReader
} else {
cmd.Stdin = e.stdin
}
// If Stdout or Stderr are nil, os/exec connects the process output directly
// to the null device.
cmd.Stdout = e.stdout
cmd.Stderr = e.stderr
// The default behavior for dir is what we want already, i.e. the current
// working directory.
cmd.Dir = e.dir
}
func envSlice(env map[string]string) []string {
var environ []string
for key, value := range env {
environ = append(environ, key+"="+value)
}
sort.Strings(environ)
return environ
}