| // Copyright 2015 CoreOS, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package dbus |
| |
| import ( |
| "errors" |
| "log" |
| "time" |
| |
| "github.com/godbus/dbus" |
| ) |
| |
| const ( |
| cleanIgnoreInterval = int64(10 * time.Second) |
| ignoreInterval = int64(30 * time.Millisecond) |
| ) |
| |
| // Subscribe sets up this connection to subscribe to all systemd dbus events. |
| // This is required before calling SubscribeUnits. When the connection closes |
| // systemd will automatically stop sending signals so there is no need to |
| // explicitly call Unsubscribe(). |
| func (c *Conn) Subscribe() error { |
| c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, |
| "type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'") |
| c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, |
| "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'") |
| |
| return c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store() |
| } |
| |
| // Unsubscribe this connection from systemd dbus events. |
| func (c *Conn) Unsubscribe() error { |
| return c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store() |
| } |
| |
| func (c *Conn) dispatch() { |
| ch := make(chan *dbus.Signal, signalBuffer) |
| |
| c.sigconn.Signal(ch) |
| |
| go func() { |
| for { |
| signal, ok := <-ch |
| if !ok { |
| return |
| } |
| |
| if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" { |
| c.jobComplete(signal) |
| } |
| |
| if c.subStateSubscriber.updateCh == nil && |
| c.propertiesSubscriber.updateCh == nil { |
| continue |
| } |
| |
| var unitPath dbus.ObjectPath |
| switch signal.Name { |
| case "org.freedesktop.systemd1.Manager.JobRemoved": |
| unitName := signal.Body[2].(string) |
| c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath) |
| case "org.freedesktop.systemd1.Manager.UnitNew": |
| unitPath = signal.Body[1].(dbus.ObjectPath) |
| case "org.freedesktop.DBus.Properties.PropertiesChanged": |
| if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" { |
| unitPath = signal.Path |
| |
| if len(signal.Body) >= 2 { |
| if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok { |
| c.sendPropertiesUpdate(unitPath, changed) |
| } |
| } |
| } |
| } |
| |
| if unitPath == dbus.ObjectPath("") { |
| continue |
| } |
| |
| c.sendSubStateUpdate(unitPath) |
| } |
| }() |
| } |
| |
| // Returns two unbuffered channels which will receive all changed units every |
| // interval. Deleted units are sent as nil. |
| func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) { |
| return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil) |
| } |
| |
| // SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer |
| // size of the channels, the comparison function for detecting changes and a filter |
| // function for cutting down on the noise that your channel receives. |
| func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) { |
| old := make(map[string]*UnitStatus) |
| statusChan := make(chan map[string]*UnitStatus, buffer) |
| errChan := make(chan error, buffer) |
| |
| go func() { |
| for { |
| timerChan := time.After(interval) |
| |
| units, err := c.ListUnits() |
| if err == nil { |
| cur := make(map[string]*UnitStatus) |
| for i := range units { |
| if filterUnit != nil && filterUnit(units[i].Name) { |
| continue |
| } |
| cur[units[i].Name] = &units[i] |
| } |
| |
| // add all new or changed units |
| changed := make(map[string]*UnitStatus) |
| for n, u := range cur { |
| if oldU, ok := old[n]; !ok || isChanged(oldU, u) { |
| changed[n] = u |
| } |
| delete(old, n) |
| } |
| |
| // add all deleted units |
| for oldN := range old { |
| changed[oldN] = nil |
| } |
| |
| old = cur |
| |
| if len(changed) != 0 { |
| statusChan <- changed |
| } |
| } else { |
| errChan <- err |
| } |
| |
| <-timerChan |
| } |
| }() |
| |
| return statusChan, errChan |
| } |
| |
| type SubStateUpdate struct { |
| UnitName string |
| SubState string |
| } |
| |
| // SetSubStateSubscriber writes to updateCh when any unit's substate changes. |
| // Although this writes to updateCh on every state change, the reported state |
| // may be more recent than the change that generated it (due to an unavoidable |
| // race in the systemd dbus interface). That is, this method provides a good |
| // way to keep a current view of all units' states, but is not guaranteed to |
| // show every state transition they go through. Furthermore, state changes |
| // will only be written to the channel with non-blocking writes. If updateCh |
| // is full, it attempts to write an error to errCh; if errCh is full, the error |
| // passes silently. |
| func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) { |
| if c == nil { |
| msg := "nil receiver" |
| select { |
| case errCh <- errors.New(msg): |
| default: |
| log.Printf("full error channel while reporting: %s\n", msg) |
| } |
| return |
| } |
| |
| c.subStateSubscriber.Lock() |
| defer c.subStateSubscriber.Unlock() |
| c.subStateSubscriber.updateCh = updateCh |
| c.subStateSubscriber.errCh = errCh |
| } |
| |
| func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) { |
| c.subStateSubscriber.Lock() |
| defer c.subStateSubscriber.Unlock() |
| |
| if c.subStateSubscriber.updateCh == nil { |
| return |
| } |
| |
| isIgnored := c.shouldIgnore(unitPath) |
| defer c.cleanIgnore() |
| if isIgnored { |
| return |
| } |
| |
| info, err := c.GetUnitPathProperties(unitPath) |
| if err != nil { |
| select { |
| case c.subStateSubscriber.errCh <- err: |
| default: |
| log.Printf("full error channel while reporting: %s\n", err) |
| } |
| return |
| } |
| defer c.updateIgnore(unitPath, info) |
| |
| name, ok := info["Id"].(string) |
| if !ok { |
| msg := "failed to cast info.Id" |
| select { |
| case c.subStateSubscriber.errCh <- errors.New(msg): |
| default: |
| log.Printf("full error channel while reporting: %s\n", err) |
| } |
| return |
| } |
| substate, ok := info["SubState"].(string) |
| if !ok { |
| msg := "failed to cast info.SubState" |
| select { |
| case c.subStateSubscriber.errCh <- errors.New(msg): |
| default: |
| log.Printf("full error channel while reporting: %s\n", msg) |
| } |
| return |
| } |
| |
| update := &SubStateUpdate{name, substate} |
| select { |
| case c.subStateSubscriber.updateCh <- update: |
| default: |
| msg := "update channel is full" |
| select { |
| case c.subStateSubscriber.errCh <- errors.New(msg): |
| default: |
| log.Printf("full error channel while reporting: %s\n", msg) |
| } |
| return |
| } |
| } |
| |
| // The ignore functions work around a wart in the systemd dbus interface. |
| // Requesting the properties of an unloaded unit will cause systemd to send a |
| // pair of UnitNew/UnitRemoved signals. Because we need to get a unit's |
| // properties on UnitNew (as that's the only indication of a new unit coming up |
| // for the first time), we would enter an infinite loop if we did not attempt |
| // to detect and ignore these spurious signals. The signal themselves are |
| // indistinguishable from relevant ones, so we (somewhat hackishly) ignore an |
| // unloaded unit's signals for a short time after requesting its properties. |
| // This means that we will miss e.g. a transient unit being restarted |
| // *immediately* upon failure and also a transient unit being started |
| // immediately after requesting its status (with systemctl status, for example, |
| // because this causes a UnitNew signal to be sent which then causes us to fetch |
| // the properties). |
| |
| func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool { |
| t, ok := c.subStateSubscriber.ignore[path] |
| return ok && t >= time.Now().UnixNano() |
| } |
| |
| func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) { |
| loadState, ok := info["LoadState"].(string) |
| if !ok { |
| return |
| } |
| |
| // unit is unloaded - it will trigger bad systemd dbus behavior |
| if loadState == "not-found" { |
| c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval |
| } |
| } |
| |
| // without this, ignore would grow unboundedly over time |
| func (c *Conn) cleanIgnore() { |
| now := time.Now().UnixNano() |
| if c.subStateSubscriber.cleanIgnore < now { |
| c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval |
| |
| for p, t := range c.subStateSubscriber.ignore { |
| if t < now { |
| delete(c.subStateSubscriber.ignore, p) |
| } |
| } |
| } |
| } |
| |
| // PropertiesUpdate holds a map of a unit's changed properties |
| type PropertiesUpdate struct { |
| UnitName string |
| Changed map[string]dbus.Variant |
| } |
| |
| // SetPropertiesSubscriber writes to updateCh when any unit's properties |
| // change. Every property change reported by systemd will be sent; that is, no |
| // transitions will be "missed" (as they might be with SetSubStateSubscriber). |
| // However, state changes will only be written to the channel with non-blocking |
| // writes. If updateCh is full, it attempts to write an error to errCh; if |
| // errCh is full, the error passes silently. |
| func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) { |
| c.propertiesSubscriber.Lock() |
| defer c.propertiesSubscriber.Unlock() |
| c.propertiesSubscriber.updateCh = updateCh |
| c.propertiesSubscriber.errCh = errCh |
| } |
| |
| // we don't need to worry about shouldIgnore() here because |
| // sendPropertiesUpdate doesn't call GetProperties() |
| func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) { |
| c.propertiesSubscriber.Lock() |
| defer c.propertiesSubscriber.Unlock() |
| |
| if c.propertiesSubscriber.updateCh == nil { |
| return |
| } |
| |
| update := &PropertiesUpdate{unitName(unitPath), changedProps} |
| |
| select { |
| case c.propertiesSubscriber.updateCh <- update: |
| default: |
| msg := "update channel is full" |
| select { |
| case c.propertiesSubscriber.errCh <- errors.New(msg): |
| default: |
| log.Printf("full error channel while reporting: %s\n", msg) |
| } |
| return |
| } |
| } |