| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package nmble |
| |
| import ( |
| "encoding/hex" |
| "fmt" |
| "sync" |
| "time" |
| |
| log "github.com/Sirupsen/logrus" |
| |
| "mynewt.apache.org/newt/util/unixchild" |
| . "mynewt.apache.org/newtmgr/nmxact/bledefs" |
| "mynewt.apache.org/newtmgr/nmxact/nmxutil" |
| "mynewt.apache.org/newtmgr/nmxact/sesn" |
| "mynewt.apache.org/newtmgr/nmxact/task" |
| ) |
| |
| type XportCfg struct { |
| // *********************** |
| // *** Required fields *** |
| // *********************** |
| |
| // Path of Unix domain socket to create and listen on. |
| SockPath string |
| |
| // Path of the blehostd executable. |
| BlehostdPath string |
| |
| // Path of the BLE controller device (e.g., /dev/ttyUSB0). |
| DevPath string |
| |
| // *********************** |
| // *** Optional fields *** |
| // *********************** |
| |
| // How long to wait for the blehostd process to connect to the Unix domain |
| // socket. |
| // Default: 1 second. |
| BlehostdAcceptTimeout time.Duration |
| |
| // How long to wait for a JSON response from the blehostd process. |
| // Default: 10 seconds. |
| BlehostdRspTimeout time.Duration |
| |
| // How long to allow for the host and controller to sync at startup. |
| // Default: 2 seconds. |
| SyncTimeout time.Duration |
| |
| // The static random address to use. Set to nil if one should be |
| // generated. |
| // Default: nil (auto-generate). |
| RandAddr *BleAddr |
| |
| // The value to specify during ATT MTU exchange. |
| // Default: 264. |
| PreferredMtu uint16 |
| |
| // Additional args to blehostd |
| BlehostdArgs []string |
| |
| // Whether to restart automatically when an error is detected. |
| // Default: true. |
| Restart bool |
| } |
| |
| // Implements xport.Xport. |
| type BleXport struct { |
| // Whether the transport should restart on failure. |
| enabled bool |
| |
| shuttingDown bool |
| |
| advertiser *Advertiser |
| cfg XportCfg |
| client *unixchild.Client |
| cm ChrMgr |
| d *Dispatcher |
| master Master |
| slave nmxutil.SingleResource |
| stopChan chan struct{} |
| syncer Syncer |
| tq task.TaskQueue |
| wg sync.WaitGroup |
| |
| // Map of open sessions (key: connection handle). |
| sesns map[uint16]*NakedSesn |
| |
| // Protects `enabled`. |
| mtx sync.Mutex |
| } |
| |
| func (bx *BleXport) runTask(fn func() error) error { |
| err := bx.tq.Run(fn) |
| if err == task.InactiveError { |
| return nmxutil.NewXportError("attempt to use inactive BLE transport") |
| } |
| return err |
| } |
| |
| func (bx *BleXport) enqueueShutdown(cause error) chan error { |
| return bx.tq.Enqueue(func() error { return bx.shutdown(cause) }) |
| } |
| |
| func (bx *BleXport) startUnixChild() error { |
| config := unixchild.Config{ |
| SockPath: bx.cfg.SockPath, |
| ChildPath: bx.cfg.BlehostdPath, |
| ChildArgs: []string{bx.cfg.DevPath, bx.cfg.SockPath}, |
| Depth: 10, |
| MaxMsgSz: 10240, |
| AcceptTimeout: bx.cfg.BlehostdAcceptTimeout, |
| } |
| |
| config.ChildArgs = append(config.ChildArgs, bx.cfg.BlehostdArgs...) |
| bx.client = unixchild.New(config) |
| |
| if err := bx.client.Start(); err != nil { |
| if unixchild.IsUcAcceptError(err) { |
| err = nmxutil.NewXportError( |
| "blehostd did not connect to socket; " + |
| "controller not attached?") |
| } else { |
| err = nmxutil.NewXportError( |
| "Failed to start child process: " + err.Error()) |
| } |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (bx *BleXport) addAccessListener() (*Listener, error) { |
| key := TchKey(MSG_TYPE_ACCESS_EVT, -1) |
| nmxutil.LogAddListener(3, key, 0, "access") |
| return bx.AddListener(key) |
| } |
| |
| func (bx *BleXport) startSyncer() error { |
| syncCh, resetCh, err := bx.syncer.Start(bx) |
| if err != nil { |
| return err |
| } |
| |
| initialSyncCh := make(chan struct{}) |
| |
| // Listen for events in the background: |
| // * sync loss |
| // * stack reset |
| // * GATT access |
| bx.wg.Add(1) |
| go func() { |
| defer bx.wg.Done() |
| |
| accessl, err := bx.addAccessListener() |
| if err != nil { |
| bx.enqueueShutdown(err) |
| return |
| } |
| defer bx.RemoveListener(accessl) |
| |
| for { |
| select { |
| case reason, ok := <-resetCh: |
| if ok { |
| // Ignore resets prior to initial sync. |
| if initialSyncCh == nil { |
| bx.enqueueShutdown(nmxutil.NewXportError(fmt.Sprintf( |
| "The BLE controller has been reset by the host; "+ |
| "reason=%s (%d)", |
| ErrCodeToString(reason), reason))) |
| } |
| } |
| |
| case synced, ok := <-syncCh: |
| if ok { |
| if !synced { |
| bx.enqueueShutdown(nmxutil.NewXportError( |
| "BLE host <-> controller sync lost")) |
| } else if initialSyncCh != nil { |
| close(initialSyncCh) |
| initialSyncCh = nil |
| } |
| } |
| |
| case err, ok := <-accessl.ErrChan: |
| if ok { |
| bx.enqueueShutdown(err) |
| } |
| |
| case bm, ok := <-accessl.MsgChan: |
| if ok { |
| switch msg := bm.(type) { |
| case *BleAccessEvt: |
| if err := bx.cm.Access(bx, msg); err != nil { |
| log.Debugf("Error sending access status: %s", |
| err.Error()) |
| } |
| } |
| } |
| |
| case <-bx.stopChan: |
| return |
| } |
| } |
| }() |
| |
| bx.syncer.Refresh() |
| |
| // Block until host and controller are synced. |
| select { |
| case <-initialSyncCh: |
| |
| case <-time.After(bx.cfg.SyncTimeout): |
| return nmxutil.NewXportError(fmt.Sprintf( |
| "Error waiting for host <-> controller sync: timeout (%s)", |
| bx.cfg.SyncTimeout.String())) |
| |
| case <-bx.stopChan: |
| return nmxutil.NewXportError("stopped") |
| } |
| |
| return nil |
| } |
| |
| func (bx *BleXport) setAddr() error { |
| // Generate a new random address if none was specified. |
| var addr BleAddr |
| if bx.cfg.RandAddr != nil { |
| addr = *bx.cfg.RandAddr |
| } else { |
| var err error |
| addr, err = GenRandAddrXact(bx) |
| if err != nil { |
| return err |
| } |
| } |
| |
| // Set the random address on the controller. |
| if err := SetRandAddrXact(bx, addr); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func (bx *BleXport) shutdown(cause error) error { |
| nmxutil.Assert(nmxutil.IsXport(cause)) |
| |
| initiate := func() error { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| if bx.shuttingDown { |
| return nmxutil.NewXportError("BLE xport stopped more than once") |
| } |
| bx.shuttingDown = true |
| return nil |
| } |
| |
| if err := initiate(); err != nil { |
| return err |
| } |
| defer func() { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| bx.shuttingDown = false |
| }() |
| |
| log.Debugf("Shutting down BLE transport - %s", cause.Error()) |
| |
| bx.sesns = map[uint16]*NakedSesn{} |
| |
| // Stop monitoring host-controller sync. |
| synced := bx.syncer.Synced() |
| log.Debugf("Stopping BLE syncer") |
| bx.syncer.Stop() |
| |
| if synced { |
| // Reset controller so that all outstanding connections terminate. |
| log.Debugf("Resetting host") |
| ResetXact(bx) |
| } |
| |
| if err := bx.tq.StopNoWait(cause); err != nil { |
| // Already shut down. |
| return err |
| } |
| |
| // Indicate error to all clients who are waiting for the master |
| // resource. |
| log.Debugf("Aborting BLE master") |
| bx.master.Abort(cause) |
| |
| // Indicate an error to all of this transport's listeners. This |
| // prevents them from blocking endlessly while awaiting a BLE message. |
| log.Debugf("Stopping BLE dispatcher") |
| bx.d.ErrorAll(cause) |
| |
| // Stop all of this transport's go routines. |
| close(bx.stopChan) |
| |
| // Stop the unixchild instance (blehostd + socket). |
| if bx.client != nil { |
| log.Debugf("Stopping unixchild") |
| bx.client.Stop() |
| } |
| |
| bx.wg.Wait() |
| |
| return nil |
| } |
| |
| // Transmit data to blehostd; host-controller sync not required. |
| func (bx *BleXport) txNoSync(data []byte) error { |
| log.Debugf("Tx to blehostd:\n%s", hex.Dump(data)) |
| return bx.client.TxToChild(data) |
| } |
| |
| func (bx *BleXport) startEvent() error { |
| fail := func(err error) error { |
| bx.shutdown(nmxutil.NewXportError(err.Error())) |
| return err |
| } |
| |
| // Make sure we don't think we are still in sync with the controller. If |
| // we fail early, we don't want to try sending a reset command. |
| bx.syncer.Stop() |
| |
| if err := bx.startUnixChild(); err != nil { |
| return fail(err) |
| } |
| |
| // Listen for errors and data from the blehostd process. |
| bx.wg.Add(1) |
| go func() { |
| defer bx.wg.Done() |
| |
| for { |
| select { |
| case err, ok := <-bx.client.ErrChild: |
| if ok { |
| bx.enqueueShutdown(nmxutil.NewXportError( |
| "BLE transport error: " + err.Error())) |
| } |
| |
| case buf := <-bx.client.FromChild: |
| if len(buf) != 0 { |
| log.Debugf("Receive from blehostd:\n%s", hex.Dump(buf)) |
| bx.d.Dispatch(buf) |
| } |
| |
| case <-bx.stopChan: |
| return |
| } |
| } |
| }() |
| |
| // Listen for sync and reset; blocks until initial sync. |
| if err := bx.startSyncer(); err != nil { |
| return fail(err) |
| } |
| |
| // Set the random address. |
| if err := bx.setAddr(); err != nil { |
| return fail(err) |
| } |
| |
| // Set the preferred ATT MTU in the host. |
| if err := SetPreferredMtuXact(bx, bx.cfg.PreferredMtu); err != nil { |
| return fail(err) |
| } |
| |
| return nil |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // API // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| func (bx *BleXport) Advertiser() *Advertiser { |
| return bx.advertiser |
| } |
| |
| func (bx *BleXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) { |
| return NewBleSesn(bx, cfg) |
| } |
| |
| func (bx *BleXport) Start() error { |
| initialize := func() error { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| if bx.enabled { |
| return nmxutil.NewXportError("BLE xport double start") |
| } |
| |
| bx.enabled = true |
| return nil |
| } |
| |
| if err := initialize(); err != nil { |
| return err |
| } |
| |
| startTask := func() chan error { |
| if err := bx.tq.Start(10); err != nil { |
| nmxutil.Assert(false) |
| } |
| bx.stopChan = make(chan struct{}) |
| return bx.tq.Enqueue(bx.startEvent) |
| } |
| |
| // Enqueue start event and block until it completes. If this first attempt |
| // fails, abort the start procedure completely (don't enter the retry |
| // loop). |
| if err := <-startTask(); err != nil { |
| bx.mtx.Lock() |
| bx.enabled = false |
| bx.mtx.Unlock() |
| return err |
| } |
| |
| // Run and restart task queue in the background. |
| go func() { |
| isEnabled := func() bool { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| return bx.enabled |
| } |
| |
| for { |
| <-bx.stopChan |
| bx.wg.Wait() |
| |
| if !bx.cfg.Restart || !isEnabled() { |
| break |
| } |
| |
| startTask() |
| } |
| }() |
| |
| return nil |
| } |
| |
| func (bx *BleXport) Stop() error { |
| fn := func() error { |
| initialize := func() error { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| if !bx.enabled { |
| return fmt.Errorf("BLE xport double stop") |
| } |
| bx.enabled = false |
| return nil |
| } |
| |
| if err := initialize(); err != nil { |
| return err |
| } |
| |
| cause := nmxutil.NewXportError("BLE xport manually stopped") |
| |
| if err := bx.shutdown(cause); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| return bx.runTask(fn) |
| } |
| |
| func (bx *BleXport) Restart(reason string) { |
| cause := nmxutil.NewXportError("Restarting BLE transport; " + reason) |
| bx.enqueueShutdown(cause) |
| } |
| |
| // Transmit data to blehostd. If the host and controller are not synced, this |
| // function blocks until they are (or until the sync fails). |
| func (bx *BleXport) Tx(data []byte) error { |
| fn := func() error { |
| return bx.txNoSync(data) |
| } |
| |
| return bx.runTask(fn) |
| } |
| |
| func (bx *BleXport) SetServices(svcs []BleSvc) error { |
| return bx.cm.SetServices(bx, svcs) |
| } |
| |
| func (bx *BleXport) AddListener(key ListenerKey) (*Listener, error) { |
| listener := NewListener() |
| if err := bx.d.AddListener(key, listener); err != nil { |
| return nil, err |
| } |
| return listener, nil |
| } |
| |
| func (bx *BleXport) RemoveListener(listener *Listener) *ListenerKey { |
| return bx.d.RemoveListener(listener) |
| } |
| |
| func (bx *BleXport) RemoveKey(key ListenerKey) *Listener { |
| return bx.d.RemoveKey(key) |
| } |
| |
| func (bx *BleXport) RspTimeout() time.Duration { |
| return bx.cfg.BlehostdRspTimeout |
| } |
| |
| func (bx *BleXport) GetMasterSecondary() Preemptable { |
| return bx.master.GetSecondary() |
| } |
| |
| func (bx *BleXport) SetMasterSecondary(s Preemptable) error { |
| return bx.master.SetSecondary(s) |
| } |
| |
| func (bx *BleXport) AcquireMasterPrimary(token interface{}) error { |
| return bx.master.AcquirePrimary(token) |
| } |
| |
| func (bx *BleXport) AcquireMasterSecondary() error { |
| return bx.master.AcquireSecondary() |
| } |
| |
| func (bx *BleXport) ReleaseMaster() { |
| bx.master.Release() |
| } |
| |
| func (bx *BleXport) StopWaitingForMasterPrimary(token interface{}, err error) { |
| bx.master.StopWaitingPrimary(token, err) |
| } |
| |
| func (bx *BleXport) StopWaitingForMasterSecondary(err error) { |
| bx.master.StopWaitingSecondary(err) |
| } |
| |
| func (bx *BleXport) AcquireSlave(token interface{}) error { |
| return <-bx.slave.Acquire(token) |
| } |
| |
| func (bx *BleXport) ReleaseSlave() { |
| bx.slave.Release() |
| } |
| |
| func (bx *BleXport) StopWaitingForSlave(token interface{}, err error) { |
| bx.slave.StopWaiting(token, err) |
| } |
| |
| func (bx *BleXport) AddSesn(connHandle uint16, s *NakedSesn) { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| bx.sesns[connHandle] = s |
| } |
| |
| func (bx *BleXport) RemoveSesn(connHandle uint16) *NakedSesn { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| s := bx.sesns[connHandle] |
| if s != nil { |
| delete(bx.sesns, connHandle) |
| } |
| return s |
| } |
| |
| func (bx *BleXport) FindSesn(connHandle uint16) *NakedSesn { |
| bx.mtx.Lock() |
| defer bx.mtx.Unlock() |
| |
| return bx.sesns[connHandle] |
| } |
| |
| func NewXportCfg() XportCfg { |
| return XportCfg{ |
| BlehostdAcceptTimeout: time.Second, |
| BlehostdRspTimeout: 10 * time.Second, |
| SyncTimeout: 2 * time.Second, |
| PreferredMtu: 512, |
| Restart: true, |
| } |
| } |
| |
| func NewBleXport(cfg XportCfg) (*BleXport, error) { |
| bx := &BleXport{ |
| cfg: cfg, |
| d: NewDispatcher(), |
| slave: nmxutil.NewSingleResource(), |
| sesns: map[uint16]*NakedSesn{}, |
| } |
| |
| bx.tq = task.NewTaskQueue("ble_xport") |
| |
| bx.advertiser = NewAdvertiser(bx) |
| bx.master = NewMaster(bx) |
| |
| return bx, nil |
| } |