fixed unsynced log bugs, cleaned test and pinned the h2non dependency (#58)
diff --git a/openwhisk/compiler.go b/openwhisk/compiler.go
index a948547..412a67b 100644
--- a/openwhisk/compiler.go
+++ b/openwhisk/compiler.go
@@ -24,7 +24,7 @@
"os/exec"
"runtime"
- "github.com/h2non/filetype"
+ "gopkg.in/h2non/filetype.v1"
)
// this is only to let test run on OSX
diff --git a/openwhisk/executor.go b/openwhisk/executor.go
index ad1b549..1658fe8 100644
--- a/openwhisk/executor.go
+++ b/openwhisk/executor.go
@@ -44,14 +44,14 @@
_output *bufio.Reader
_logout *bufio.Reader
_logerr *bufio.Reader
- _outbuf *os.File
- _errbuf *os.File
+ _outbuf *bufio.Writer
+ _errbuf *bufio.Writer
}
// NewExecutor creates a child subprocess using the provided command line,
// writing the logs in the given file.
// You can then start it getting a communication channel
-func NewExecutor(outbuf *os.File, errbuf *os.File, command string, args ...string) (proc *Executor) {
+func NewExecutor(logout *os.File, logerr *os.File, command string, args ...string) (proc *Executor) {
cmd := exec.Command(command, args...)
cmd.Env = []string{
"__OW_API_HOST=" + os.Getenv("__OW_API_HOST"),
@@ -85,6 +85,8 @@
pout := bufio.NewReader(pipeOut)
sout := bufio.NewReader(stdout)
serr := bufio.NewReader(stderr)
+ outbuf := bufio.NewWriter(logout)
+ errbuf := bufio.NewWriter(logerr)
return &Executor{
make(chan []byte),
@@ -129,17 +131,19 @@
Debug("run: end")
}
-func drain(ch chan string, out *os.File) {
+func drain(ch chan string, out *bufio.Writer) {
for loop := true; loop; {
runtime.Gosched()
select {
case buf := <-ch:
fmt.Fprint(out, buf)
+ out.Flush()
case <-time.After(DefaultTimeoutDrain):
loop = false
}
}
fmt.Fprintln(out, OutputGuard)
+ out.Flush()
}
// manage copying stdout and stder in output
@@ -152,15 +156,16 @@
chErr := make(chan string)
go _collect(chErr, proc._logerr)
- // wait for the signal
+ // loop draining the loop until asked to exit
for <-proc.log {
- // flush stdout
+ // drain stdout
+ Debug("draining stdout")
drain(chOut, proc._outbuf)
- // flush stderr
+ // drain stderr
+ Debug("draining stderr")
drain(chErr, proc._errbuf)
+ proc.log <- true
}
- proc._outbuf.Sync()
- proc._errbuf.Sync()
Debug("logger: end")
}
diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go
index 6a649ed..8504fd9 100644
--- a/openwhisk/executor_test.go
+++ b/openwhisk/executor_test.go
@@ -51,19 +51,18 @@
proc := NewExecutor(log, log, "_test/bc.sh")
err := proc.Start()
fmt.Println(err)
- //proc.log <- true
proc.io <- []byte("2+2")
fmt.Printf("%s", <-proc.io)
+ proc.log <- true
+ <-proc.log
// and now, exit detection
proc.io <- []byte("quit")
- proc.log <- true
select {
case in := <-proc.io:
fmt.Printf("%s", in)
case <-proc.exit:
fmt.Println("exit")
}
- waitabit()
proc.Stop()
dump(log)
// Output:
@@ -82,9 +81,8 @@
proc.io <- []byte(`{"value":{"name":"Mike"}}`)
fmt.Printf("%s", <-proc.io)
proc.log <- true
- waitabit()
+ <-proc.log
proc.Stop()
- waitabit()
_, ok := <-proc.io
fmt.Printf("io %v\n", ok)
dump(log)
@@ -111,10 +109,9 @@
exited = true
}
proc.log <- true
+ <-proc.log
fmt.Printf("exit %v\n", exited)
- waitabit()
proc.Stop()
- waitabit()
_, ok := <-proc.io
fmt.Printf("io %v\n", ok)
dump(log)
diff --git a/openwhisk/extractor.go b/openwhisk/extractor.go
index 70b1520..ca751f8 100644
--- a/openwhisk/extractor.go
+++ b/openwhisk/extractor.go
@@ -27,7 +27,7 @@
"path/filepath"
"strconv"
- "github.com/h2non/filetype"
+ "gopkg.in/h2non/filetype.v1"
)
func unzip(src []byte, dest string) error {
diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go
index 803684f..81f00dc 100644
--- a/openwhisk/initHandler_test.go
+++ b/openwhisk/initHandler_test.go
@@ -105,6 +105,8 @@
// 500 {"error":"no action defined yet"}
// msg=hello Mike
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
// Goodbye!
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
}
diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go
index 1495189..2463255 100644
--- a/openwhisk/runHandler.go
+++ b/openwhisk/runHandler.go
@@ -64,8 +64,9 @@
body = bytes.Replace(body, []byte("\n"), []byte(""), -1)
// execute the action
- // and check for early termination
ap.theExecutor.io <- body
+
+ // check for early termination
var response []byte
var exited bool
select {
@@ -75,6 +76,10 @@
exited = true
}
+ // flush the logs sending the activation message at the end
+ ap.theExecutor.log <- true
+ <-ap.theExecutor.log
+
// check for early termination
if exited {
Debug("WARNING! Command exited")
@@ -84,9 +89,6 @@
}
DebugLimit("received:", response, 120)
- // flush the logs sending the activation message at the end
- ap.theExecutor.log <- true
-
// check if the answer is an object map
var objmap map[string]*json.RawMessage
err = json.Unmarshal(response, &objmap)
diff --git a/openwhisk/util_test.go b/openwhisk/util_test.go
index 9ebcbc0..b0d8084 100644
--- a/openwhisk/util_test.go
+++ b/openwhisk/util_test.go
@@ -35,7 +35,7 @@
"testing"
"time"
- "github.com/h2non/filetype"
+ "gopkg.in/h2non/filetype.v1"
)
func startTestServer(compiler string) (*httptest.Server, string, *os.File) {
@@ -157,7 +157,8 @@
var pseudoElfForMacType = filetype.NewType("elf", "darwin/mach")
func pseudoElfForMacMatcher(buf []byte) bool {
- return len(buf) > 4 && buf[0] == 0xcf && buf[1] == 0xfa && buf[2] == 0xed && buf[3] == 0xfe
+ return len(buf) > 4 && ((buf[0] == 0xcf && buf[1] == 0xfa && buf[2] == 0xed && buf[3] == 0xfe) ||
+ (buf[0] == 0xce && buf[1] == 0xfa && buf[2] == 0xed && buf[3] == 0xfe))
}
func detect(dir, filename string) string {
@@ -177,7 +178,7 @@
return re.ReplaceAllString(out, "::")
}
func TestMain(m *testing.M) {
- //Debugging = true // enable debug
+ // Debugging = true // enable debug
// silence those annoying logs
if !Debugging {
log.SetOutput(ioutil.Discard)