| package plugin |
| |
| import ( |
| "crypto/tls" |
| "fmt" |
| "io" |
| "net" |
| "net/rpc" |
| |
| "github.com/hashicorp/yamux" |
| ) |
| |
| // RPCClient connects to an RPCServer over net/rpc to dispense plugin types. |
| type RPCClient struct { |
| broker *MuxBroker |
| control *rpc.Client |
| plugins map[string]Plugin |
| |
| // These are the streams used for the various stdout/err overrides |
| stdout, stderr net.Conn |
| } |
| |
| // newRPCClient creates a new RPCClient. The Client argument is expected |
| // to be successfully started already with a lock held. |
| func newRPCClient(c *Client) (*RPCClient, error) { |
| // Connect to the client |
| conn, err := net.Dial(c.address.Network(), c.address.String()) |
| if err != nil { |
| return nil, err |
| } |
| if tcpConn, ok := conn.(*net.TCPConn); ok { |
| // Make sure to set keep alive so that the connection doesn't die |
| tcpConn.SetKeepAlive(true) |
| } |
| |
| if c.config.TLSConfig != nil { |
| conn = tls.Client(conn, c.config.TLSConfig) |
| } |
| |
| // Create the actual RPC client |
| result, err := NewRPCClient(conn, c.config.Plugins) |
| if err != nil { |
| conn.Close() |
| return nil, err |
| } |
| |
| // Begin the stream syncing so that stdin, out, err work properly |
| err = result.SyncStreams( |
| c.config.SyncStdout, |
| c.config.SyncStderr) |
| if err != nil { |
| result.Close() |
| return nil, err |
| } |
| |
| return result, nil |
| } |
| |
| // NewRPCClient creates a client from an already-open connection-like value. |
| // Dial is typically used instead. |
| func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) { |
| // Create the yamux client so we can multiplex |
| mux, err := yamux.Client(conn, nil) |
| if err != nil { |
| conn.Close() |
| return nil, err |
| } |
| |
| // Connect to the control stream. |
| control, err := mux.Open() |
| if err != nil { |
| mux.Close() |
| return nil, err |
| } |
| |
| // Connect stdout, stderr streams |
| stdstream := make([]net.Conn, 2) |
| for i, _ := range stdstream { |
| stdstream[i], err = mux.Open() |
| if err != nil { |
| mux.Close() |
| return nil, err |
| } |
| } |
| |
| // Create the broker and start it up |
| broker := newMuxBroker(mux) |
| go broker.Run() |
| |
| // Build the client using our broker and control channel. |
| return &RPCClient{ |
| broker: broker, |
| control: rpc.NewClient(control), |
| plugins: plugins, |
| stdout: stdstream[0], |
| stderr: stdstream[1], |
| }, nil |
| } |
| |
| // SyncStreams should be called to enable syncing of stdout, |
| // stderr with the plugin. |
| // |
| // This will return immediately and the syncing will continue to happen |
| // in the background. You do not need to launch this in a goroutine itself. |
| // |
| // This should never be called multiple times. |
| func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error { |
| go copyStream("stdout", stdout, c.stdout) |
| go copyStream("stderr", stderr, c.stderr) |
| return nil |
| } |
| |
| // Close closes the connection. The client is no longer usable after this |
| // is called. |
| func (c *RPCClient) Close() error { |
| // Call the control channel and ask it to gracefully exit. If this |
| // errors, then we save it so that we always return an error but we |
| // want to try to close the other channels anyways. |
| var empty struct{} |
| returnErr := c.control.Call("Control.Quit", true, &empty) |
| |
| // Close the other streams we have |
| if err := c.control.Close(); err != nil { |
| return err |
| } |
| if err := c.stdout.Close(); err != nil { |
| return err |
| } |
| if err := c.stderr.Close(); err != nil { |
| return err |
| } |
| if err := c.broker.Close(); err != nil { |
| return err |
| } |
| |
| // Return back the error we got from Control.Quit. This is very important |
| // since we MUST return non-nil error if this fails so that Client.Kill |
| // will properly try a process.Kill. |
| return returnErr |
| } |
| |
| func (c *RPCClient) Dispense(name string) (interface{}, error) { |
| p, ok := c.plugins[name] |
| if !ok { |
| return nil, fmt.Errorf("unknown plugin type: %s", name) |
| } |
| |
| var id uint32 |
| if err := c.control.Call( |
| "Dispenser.Dispense", name, &id); err != nil { |
| return nil, err |
| } |
| |
| conn, err := c.broker.Dial(id) |
| if err != nil { |
| return nil, err |
| } |
| |
| return p.Client(c.broker, rpc.NewClient(conn)) |
| } |
| |
| // Ping pings the connection to ensure it is still alive. |
| // |
| // The error from the RPC call is returned exactly if you want to inspect |
| // it for further error analysis. Any error returned from here would indicate |
| // that the connection to the plugin is not healthy. |
| func (c *RPCClient) Ping() error { |
| var empty struct{} |
| return c.control.Call("Control.Ping", true, &empty) |
| } |