|  | /* | 
|  | Licensed to the Apache Software Foundation (ASF) under one | 
|  | or more contributor license agreements.  See the NOTICE file | 
|  | distributed with this work for additional information | 
|  | regarding copyright ownership.  The ASF licenses this file | 
|  | to you under the Apache License, Version 2.0 (the | 
|  | "License"); you may not use this file except in compliance | 
|  | with the License.  You may obtain a copy of the License at | 
|  |  | 
|  | http://www.apache.org/licenses/LICENSE-2.0 | 
|  |  | 
|  | Unless required by applicable law or agreed to in writing, | 
|  | software distributed under the License is distributed on an | 
|  | "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | KIND, either express or implied.  See the License for the | 
|  | specific language governing permissions and limitations | 
|  | under the License. | 
|  | */ | 
|  |  | 
|  | package gremlingo | 
|  |  | 
|  | import ( | 
|  | "net/http" | 
|  | "net/url" | 
|  | "sync" | 
|  | "time" | 
|  |  | 
|  | "github.com/gorilla/websocket" | 
|  | ) | 
|  |  | 
|  | const keepAliveIntervalDefault = 5 * time.Second | 
|  | const writeDeadlineDefault = 3 * time.Second | 
|  | const writeChannelSizeDefault = 100 | 
|  | const connectionTimeoutDefault = 5 * time.Second | 
|  |  | 
|  | // Transport layer that uses gorilla/websocket: https://github.com/gorilla/websocket | 
|  | // Gorilla WebSocket is a widely used and stable Go implementation of the WebSocket protocol. | 
|  | type gorillaTransporter struct { | 
|  | url          string | 
|  | connection   websocketConn | 
|  | isClosed     bool | 
|  | logHandler   *logHandler | 
|  | connSettings *connectionSettings | 
|  | writeChannel chan []byte | 
|  | wg           *sync.WaitGroup | 
|  | } | 
|  |  | 
|  | // Connect used to establish a connection. | 
|  | func (transporter *gorillaTransporter) Connect() (err error) { | 
|  | if transporter.connection != nil { | 
|  | return | 
|  | } | 
|  |  | 
|  | var u *url.URL | 
|  | u, err = url.Parse(transporter.url) | 
|  | if err != nil { | 
|  | return | 
|  | } | 
|  |  | 
|  | dialer := &websocket.Dialer{ | 
|  | Proxy:             http.ProxyFromEnvironment, | 
|  | HandshakeTimeout:  transporter.connSettings.connectionTimeout, | 
|  | TLSClientConfig:   transporter.connSettings.tlsConfig, | 
|  | EnableCompression: transporter.connSettings.enableCompression, | 
|  | ReadBufferSize:    transporter.connSettings.readBufferSize, | 
|  | WriteBufferSize:   transporter.connSettings.writeBufferSize, | 
|  | } | 
|  |  | 
|  | header := transporter.getAuthInfo().GetHeader() | 
|  | if transporter.connSettings.enableUserAgentOnConnect { | 
|  | if header == nil { | 
|  | header = make(http.Header) | 
|  | } | 
|  | header.Set(userAgentHeader, userAgent) | 
|  | } | 
|  |  | 
|  | // Nil is accepted as a valid header, so it can always be passed directly through. | 
|  | conn, _, err := dialer.Dial(u.String(), header) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | transporter.connection = conn | 
|  | transporter.connection.SetPongHandler(func(string) error { | 
|  | err := transporter.connection.SetReadDeadline(time.Now().Add(2 * transporter.connSettings.keepAliveInterval)) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | return nil | 
|  | }) | 
|  | transporter.wg.Add(1) | 
|  | go transporter.writeLoop() | 
|  | return | 
|  | } | 
|  |  | 
|  | // Write used to write data to the transporter. Opens connection if closed. | 
|  | func (transporter *gorillaTransporter) Write(data []byte) error { | 
|  | if transporter.connection == nil { | 
|  | err := transporter.Connect() | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | transporter.writeChannel <- data | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (transporter *gorillaTransporter) getAuthInfo() AuthInfoProvider { | 
|  | if transporter.connSettings.authInfo == nil { | 
|  | return NoopAuthInfo | 
|  | } | 
|  | return transporter.connSettings.authInfo | 
|  | } | 
|  |  | 
|  | // Read used to read data from the transporter. Opens connection if closed. | 
|  | func (transporter *gorillaTransporter) Read() ([]byte, error) { | 
|  | if transporter.connection == nil { | 
|  | err := transporter.Connect() | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | } | 
|  |  | 
|  | err := transporter.connection.SetReadDeadline(time.Now().Add(transporter.connSettings.keepAliveInterval * 2)) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | _, bytes, err := transporter.connection.ReadMessage() | 
|  | return bytes, err | 
|  |  | 
|  | } | 
|  |  | 
|  | // Close used to close a connection if it is opened. | 
|  | func (transporter *gorillaTransporter) Close() (err error) { | 
|  | if !transporter.isClosed { | 
|  | if transporter.writeChannel != nil { | 
|  | close(transporter.writeChannel) | 
|  | } | 
|  | if transporter.wg != nil { | 
|  | transporter.wg.Wait() | 
|  | } | 
|  | err = transporter.connection.Close() | 
|  | transporter.isClosed = true | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | return | 
|  | } | 
|  |  | 
|  | // IsClosed returns true when the transporter is closed. | 
|  | func (transporter *gorillaTransporter) IsClosed() bool { | 
|  | return transporter.isClosed | 
|  | } | 
|  |  | 
|  | func (transporter *gorillaTransporter) writeLoop() { | 
|  | defer transporter.wg.Done() | 
|  |  | 
|  | ticker := time.NewTicker(transporter.connSettings.keepAliveInterval) | 
|  | defer ticker.Stop() | 
|  |  | 
|  | for { | 
|  | select { | 
|  | case message, ok := <-transporter.writeChannel: | 
|  | if !ok { | 
|  | // Channel was closed, we can disconnect and exit. | 
|  | return | 
|  | } | 
|  |  | 
|  | // Set write deadline. | 
|  | err := transporter.connection.SetWriteDeadline(time.Now().Add(transporter.connSettings.writeDeadline)) | 
|  | if err != nil { | 
|  | transporter.logHandler.logf(Error, failedToSetWriteDeadline, err.Error()) | 
|  | return | 
|  | } | 
|  |  | 
|  | // Write binary message that was submitted to channel. | 
|  | err = transporter.connection.WriteMessage(websocket.BinaryMessage, message) | 
|  | if err != nil { | 
|  | transporter.logHandler.logf(Error, failedToWriteMessage, "BinaryMessage", err.Error()) | 
|  | return | 
|  | } | 
|  | case <-ticker.C: | 
|  | // Set write deadline. | 
|  | err := transporter.connection.SetWriteDeadline(time.Now().Add(transporter.connSettings.keepAliveInterval)) | 
|  | if err != nil { | 
|  | transporter.logHandler.logf(Error, failedToSetWriteDeadline, err.Error()) | 
|  | return | 
|  | } | 
|  |  | 
|  | // Write pong message. | 
|  | err = transporter.connection.WriteMessage(websocket.PingMessage, nil) | 
|  | if err != nil { | 
|  | transporter.logHandler.logf(Error, failedToWriteMessage, "PingMessage", err.Error()) | 
|  | return | 
|  | } | 
|  | } | 
|  | } | 
|  | } |