| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| |
| package electron |
| |
| import ( |
| "fmt" |
| "io" |
| "qpid.apache.org/proton" |
| ) |
| |
| // Closed is an alias for io.EOF. It is returned as an error when an endpoint |
| // was closed cleanly. |
| var Closed = io.EOF |
| |
| // EOF is an alias for io.EOF. It is returned as an error when an endpoint |
| // was closed cleanly. |
| var EOF = io.EOF |
| |
| // Endpoint is the local end of a communications channel to the remote peer |
| // process. The following interface implement Endpoint: Connection, Session, |
| // Sender and Receiver. |
| // |
| // You can create an endpoint with functions on Container, Connection and |
| // Session. You can accept incoming endpoints from the remote peer using |
| // Connection.Incoming() |
| // |
| type Endpoint interface { |
| // Close an endpoint and signal an error to the remote end if error != nil. |
| Close(error) |
| |
| // String is a human readable identifier, useful for debugging and logging. |
| String() string |
| |
| // Error returns nil if the endpoint is open, otherwise returns an error. |
| // Error() == Closed means the endpoint was closed without error. |
| Error() error |
| |
| // Connection is the connection associated with this endpoint. |
| Connection() Connection |
| |
| // Done returns a channel that will close when the endpoint closes. |
| // After Done() has closed, Error() will return the reason for closing. |
| Done() <-chan struct{} |
| |
| // Sync() waits for the remote peer to confirm the endpoint is active or |
| // reject it with an error. You can call it immediately on new endpoints |
| // for more predictable error handling. |
| // |
| // AMQP is an asynchronous protocol. It is legal to create an endpoint and |
| // start using it without waiting for confirmation. This avoids a needless |
| // delay in the non-error case and throughput by "assuming the best". |
| // |
| // However if there *is* an error, these "optimistic" actions will fail. The |
| // endpoint and its children will be closed with an error. The error will only |
| // be detected when you try to use one of these endpoints or call Sync() |
| Sync() error |
| |
| // Called in handler goroutine when endpoint is remotely closed. |
| closed(err error) error |
| wakeSync() |
| } |
| |
| // Base implementation for Endpoint |
| type endpoint struct { |
| err proton.ErrorHolder |
| str string // String() return value. |
| done chan struct{} |
| active chan struct{} |
| } |
| |
| func (e *endpoint) init(s string) { |
| e.str = s |
| e.done = make(chan struct{}) |
| e.active = make(chan struct{}) |
| } |
| |
| // Called in proton goroutine on remote open. |
| func (e *endpoint) wakeSync() { |
| select { // Close active channel if not already closed. |
| case <-e.active: |
| default: |
| close(e.active) |
| } |
| } |
| |
| // Called in proton goroutine (from handler) on a Closed or Disconnected event. |
| // |
| // Set err if there is not already an error on the endpoint. |
| // Return Error() |
| func (e *endpoint) closed(err error) error { |
| select { |
| case <-e.done: |
| // Already closed |
| default: |
| e.err.Set(err) |
| e.err.Set(Closed) |
| e.wakeSync() // Make sure we wake up Sync() |
| close(e.done) |
| } |
| return e.Error() |
| } |
| |
| func (e *endpoint) String() string { return e.str } |
| |
| func (e *endpoint) Error() error { return e.err.Get() } |
| |
| func (e *endpoint) Done() <-chan struct{} { return e.done } |
| |
| func (e *endpoint) Sync() error { |
| <-e.active |
| return e.Error() |
| } |
| |
| // Call in proton goroutine to initiate closing an endpoint locally |
| // handler will complete the close when remote end closes. |
| func localClose(ep proton.Endpoint, err error) { |
| if ep.State().LocalActive() { |
| proton.CloseError(ep, err) |
| } |
| } |
| |
| // Incoming is the interface for incoming endpoints, see Connection.Incoming() |
| // |
| // Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it |
| // with optional error |
| // |
| // Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender |
| // and *IncomingReceiver. Each type provides methods to examine the incoming |
| // endpoint request and set configuration options for the local endpoint |
| // before calling Accept() or Reject() |
| type Incoming interface { |
| // Accept and open the endpoint. |
| Accept() Endpoint |
| |
| // Reject the endpoint with an error |
| Reject(error) |
| |
| // wait for and call the accept function, call in proton goroutine. |
| wait() error |
| pEndpoint() proton.Endpoint |
| } |
| |
| type incoming struct { |
| pep proton.Endpoint |
| acceptCh chan func() error |
| } |
| |
| func makeIncoming(e proton.Endpoint) incoming { |
| return incoming{pep: e, acceptCh: make(chan func() error)} |
| } |
| |
| func (in *incoming) String() string { return fmt.Sprintf("%s: %s", in.pep.Type(), in.pep) } |
| func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } } |
| |
| // Call in proton goroutine, wait for and call the accept function. |
| func (in *incoming) wait() error { return (<-in.acceptCh)() } |
| |
| func (in *incoming) pEndpoint() proton.Endpoint { return in.pep } |
| |
| // Called in app goroutine to send an accept function to proton and return the resulting endpoint. |
| func (in *incoming) accept(f func() Endpoint) Endpoint { |
| done := make(chan Endpoint) |
| in.acceptCh <- func() error { |
| ep := f() |
| done <- ep |
| return nil |
| } |
| return <-done |
| } |