blob: 1658fe8973261851d472f18a3ce4ff4687e6390b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package openwhisk
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"runtime"
"time"
)
// DefaultTimeoutInit to wait for a process to start
var DefaultTimeoutInit = 5 * time.Millisecond
// DefaultTimeoutDrain to wait for draining logs
var DefaultTimeoutDrain = 5 * time.Millisecond
// Executor is the container and the guardian of a child process
// It starts a command, feeds input and output, read logs and control its termination
type Executor struct {
io chan []byte
log chan bool
exit chan error
_cmd *exec.Cmd
_input io.WriteCloser
_output *bufio.Reader
_logout *bufio.Reader
_logerr *bufio.Reader
_outbuf *bufio.Writer
_errbuf *bufio.Writer
}
// NewExecutor creates a child subprocess using the provided command line,
// writing the logs in the given file.
// You can then start it getting a communication channel
func NewExecutor(logout *os.File, logerr *os.File, command string, args ...string) (proc *Executor) {
cmd := exec.Command(command, args...)
cmd.Env = []string{
"__OW_API_HOST=" + os.Getenv("__OW_API_HOST"),
}
if Debugging {
cmd.Env = append(cmd.Env, "OW_DEBUG=/tmp/action.log")
}
Debug("env: %v", cmd.Env)
stdin, err := cmd.StdinPipe()
if err != nil {
return nil
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil
}
pipeOut, pipeIn, err := os.Pipe()
if err != nil {
return nil
}
cmd.ExtraFiles = []*os.File{pipeIn}
pout := bufio.NewReader(pipeOut)
sout := bufio.NewReader(stdout)
serr := bufio.NewReader(stderr)
outbuf := bufio.NewWriter(logout)
errbuf := bufio.NewWriter(logerr)
return &Executor{
make(chan []byte),
make(chan bool),
make(chan error),
cmd,
stdin,
pout,
sout,
serr,
outbuf,
errbuf,
}
}
// collect log from a stream
func _collect(ch chan string, reader *bufio.Reader) {
for {
buf, err := reader.ReadBytes('\n')
if err != nil {
break
}
ch <- string(buf)
}
}
// loop over the command executing
// returning when the command exits
func (proc *Executor) run() {
Debug("run: start")
err := proc._cmd.Start()
if err != nil {
proc.exit <- err
Debug("run: early exit")
proc._cmd = nil // do not kill
return
}
Debug("pid: %d", proc._cmd.Process.Pid)
// wait for the exit
proc.exit <- proc._cmd.Wait()
proc._cmd = nil // do not kill
Debug("run: end")
}
func drain(ch chan string, out *bufio.Writer) {
for loop := true; loop; {
runtime.Gosched()
select {
case buf := <-ch:
fmt.Fprint(out, buf)
out.Flush()
case <-time.After(DefaultTimeoutDrain):
loop = false
}
}
fmt.Fprintln(out, OutputGuard)
out.Flush()
}
// manage copying stdout and stder in output
// with log guards
func (proc *Executor) logger() {
Debug("logger: start")
// poll stdout and stderr
chOut := make(chan string)
go _collect(chOut, proc._logout)
chErr := make(chan string)
go _collect(chErr, proc._logerr)
// loop draining the loop until asked to exit
for <-proc.log {
// drain stdout
Debug("draining stdout")
drain(chOut, proc._outbuf)
// drain stderr
Debug("draining stderr")
drain(chErr, proc._errbuf)
proc.log <- true
}
Debug("logger: end")
}
// main service function
// writing in input
// and reading in output
// using the provide channels
func (proc *Executor) service() {
Debug("service: start")
for {
in := <-proc.io
if len(in) == 0 {
Debug("terminated upon request")
break
}
// input to the subprocess
DebugLimit(">>>", in, 120)
proc._input.Write(in)
proc._input.Write([]byte("\n"))
Debug("done")
// ok now give a chance to run to goroutines
runtime.Gosched()
// input to the subprocess
out, err := proc._output.ReadBytes('\n')
if err != nil {
break
}
DebugLimit("<<<", out, 120)
proc.io <- out
if len(out) == 0 {
Debug("empty input - exiting")
break
}
}
Debug("service: end")
}
// Start execution of the command
// returns an error if the program fails
func (proc *Executor) Start() error {
// start the underlying executable
// check if died
go proc.run()
select {
case <-proc.exit:
// oops, it died
return fmt.Errorf("command exited")
case <-time.After(DefaultTimeoutInit):
// ok let's process it
go proc.service()
go proc.logger()
}
return nil
}
// Stop will kill the process
// and close the channels
func (proc *Executor) Stop() {
Debug("stopping")
if proc._cmd != nil {
proc.log <- false
proc.io <- []byte("")
proc._cmd.Process.Kill()
<-proc.exit
proc._cmd = nil
}
close(proc.io)
close(proc.exit)
close(proc.log)
}