blob: d17a0d26819efb4e6d321802f683edaec7a12bad [file] [log] [blame]
package tendermint
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"
"github.com/apache/incubator-milagro-dta/libs/datastore"
"github.com/apache/incubator-milagro-dta/libs/logger"
"github.com/apache/incubator-milagro-dta/pkg/api"
status "github.com/apache/incubator-milagro-dta/pkg/tendermint/status"
"github.com/fatih/color"
"github.com/pkg/errors"
tmclient "github.com/tendermint/tendermint/rpc/client"
tmtypes "github.com/tendermint/tendermint/types"
)
const (
nodeConnectionTimeout = time.Second * 10
txChanSize = 1000
)
// ProcessTXFunc is executed on each incoming TX
type ProcessTXFunc func(tx *api.BlockChainTX) error
// NodeConnector is using external tendermint node to post and get transactions
type NodeConnector struct {
nodeID string
tmNodeAddr string
httpClient *http.Client
tmClient *tmclient.HTTP
log *logger.Logger
store *datastore.Store
}
// NewNodeConnector constructs a new Tendermint NodeConnector
func NewNodeConnector(tmNodeAddr string, nodeID string, store *datastore.Store, log *logger.Logger) (conn *NodeConnector, err error) {
defer func() {
if r := recover(); r != nil {
err = errors.Errorf("Initialize tendermint node connector: %v", r)
}
}()
tmNodeAddr = strings.TrimRight(tmNodeAddr, "/")
tmClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s", tmNodeAddr), "/websocket")
if err := tmClient.Start(); err != nil {
return nil, errors.Wrap(err, "Start tendermint client")
}
return &NodeConnector{
tmNodeAddr: tmNodeAddr,
nodeID: nodeID,
log: log,
store: store,
httpClient: &http.Client{
Timeout: nodeConnectionTimeout,
},
tmClient: tmClient,
}, nil
}
// Stop is performing clean-up
func (nc *NodeConnector) Stop() error {
return nc.tmClient.Stop()
}
// NodeID returns the NodeID
func (nc *NodeConnector) NodeID() string {
return nc.nodeID
}
// GetTx retreives a transaction by hash
func (nc *NodeConnector) GetTx(txHash string) (*api.BlockChainTX, error) {
query := fmt.Sprintf("tag.txhash='%s'", txHash)
result, err := nc.tmClient.TxSearch(query, true, 1, 1)
if err != nil {
return nil, err
}
if len(result.Txs) == 0 {
return nil, errors.Errorf("Document not found: %v", txHash)
}
payload := &api.BlockChainTX{}
if err := json.Unmarshal(result.Txs[0].Tx, &payload); err != nil {
return nil, err
}
return payload, nil
}
// PostTx posts a transaction to the chain and returns the transaction ID
func (nc *NodeConnector) PostTx(tx *api.BlockChainTX) (txID string, err error) {
txID = tx.CalcHash()
//serialize the whole transaction
serializedTX, err := json.Marshal(tx)
if err != nil {
return
}
base64EncodedTX := base64.StdEncoding.EncodeToString(serializedTX)
// TODO: use net/rpc
body := strings.NewReader(`{
"jsonrpc": "2.0",
"id": "anything",
"method": "broadcast_tx_commit",
"params": {
"tx": "` + base64EncodedTX + `"}
}`)
url := "http://" + nc.tmNodeAddr
req, err := http.NewRequest("POST", url, body)
if err != nil {
return "", errors.Wrap(err, "post to blockchain node")
}
req.Header.Set("Content-Type", "text/plain;")
resp, err := nc.httpClient.Do(req)
if err != nil {
return "", errors.Wrap(err, "post to blockchain node")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
var respErr string
if b, err := ioutil.ReadAll(resp.Body); err != nil {
respErr = resp.Status
} else {
respErr = string(b)
}
return "", errors.Errorf("Post to blockchain node status %v: %v", resp.StatusCode, respErr)
}
nc.log.Debug("Post to chain: Processor: %s: txID: %s", tx.Processor, txID)
return
}
// Subscribe connects to the Tendermint node and collect the events
func (nc *NodeConnector) Subscribe(ctx context.Context, processFn ProcessTXFunc) error {
chainStatus, err := nc.getChainStatus()
if err != nil {
return err
}
currentBlockHeight, err := strconv.Atoi(chainStatus.Result.SyncInfo.LatestBlockHeight)
if err != nil {
return errors.Wrap(err, "Failed to obtain latest blockheight of Blockchain")
}
var processedTo string
if err := nc.store.Get("chain", "height", &processedTo); err != nil {
if err != datastore.ErrKeyNotFound {
return errors.Wrap(err, "Get last processed block height")
}
}
// create the transaction queue chan
txQueue := make(chan *api.BlockChainTX, txChanSize)
// Collect events
if err := nc.subscribeAndQueue(ctx, txQueue); err != nil {
return err
}
nc.loadMissingHistory(currentBlockHeight, processedTo, processFn)
// Process events
return nc.processTXQueue(ctx, txQueue, processFn)
}
func decodeProcessedTo(processedTo string) (processedToHeight int64, processedToIndex uint32, err error) {
pth := strings.Split(processedTo, ".")
if len(pth) == 2 {
processedToHeight, err = strconv.ParseInt(pth[0], 10, 64)
if err != nil {
return 0, 0, errors.Wrapf(err, "Can't decode processed to Height %s", processedTo)
}
procindex64, err := strconv.ParseUint(pth[1], 10, 32)
if err != nil {
return 0, 0, errors.Wrapf(err, "Can't decode processed to Index %s", processedTo)
}
processedToIndex = uint32(procindex64)
return processedToHeight, processedToIndex, nil
}
return 0, 0, nil
}
func (nc *NodeConnector) loadMissingHistory(currentBlockHeight int, processedTo string, processFn ProcessTXFunc) error {
nc.log.Debug("Block height: Current: %v; Processed: %s", currentBlockHeight, processedTo)
processedToHeight, processedToIndex, err := decodeProcessedTo(processedTo)
if err != nil {
return err
}
//Open a 2nd websocket client
tmNodeAddr := strings.TrimRight(nc.tmNodeAddr, "/")
tmHistoryClient := tmclient.NewHTTP(fmt.Sprintf("tcp://%s", tmNodeAddr), "/websocket")
if err := tmHistoryClient.Start(); err != nil {
return errors.Wrap(err, "Start tendermint history client")
}
currentPage := 1
query := fmt.Sprintf("tag.recipient='%v' AND tx.height>=%v AND tx.height<=%v", nc.nodeID, processedToHeight, currentBlockHeight)
numPerPage := 5
processedCount := 1
for {
result, err := tmHistoryClient.TxSearch(query, true, currentPage, numPerPage)
if err != nil {
return errors.Wrapf(err, "Failed to subscribe to query %s", query)
}
//skip over any previously processed
totalCount := result.TotalCount
totalToProcess := totalCount - int(processedToIndex)
for _, chainTx := range result.Txs {
tx := chainTx.Tx
payload := &api.BlockChainTX{}
err := json.Unmarshal(tx, payload)
if err != nil {
nc.log.Debug("IGNORED TX - Invalid!")
break
}
payload.Index = chainTx.Index
payload.Height = chainTx.Height
//processedTo check
if processedToHeight == payload.Height && processedToIndex >= payload.Index {
//We have already processed this before
msg := fmt.Sprintf("[%v/%v] HISTORY %s Block:%v Index:%v", processedCount, totalCount, color.BlueString("ALREADY PROCESSED"), chainTx.Height, chainTx.Index)
nc.log.Info(msg)
processedCount++
continue
}
// if processedToHeight == payload.Height && payload.Index == 0 {
// totalCount = totalCount - int(processedToIndex)
// }
// if processedToHeight > payload.Height {
// continue
// }
// if processedToHeight == payload.Height && processedToIndex > payload.Index {
// continue
// }
//Dont queue just process directly
if err := processFn(payload); err != nil {
msg := fmt.Sprintf("[%v/%v] HISTORY %s Block:%v Index:%v Error:%v", processedCount, totalCount, color.RedString("FAILURE"), chainTx.Height, chainTx.Index, err)
nc.log.Info(msg)
} else {
msg := fmt.Sprintf("[%v/%v] HISTORY %s Block:%v Index:%v", processedCount, totalCount, color.GreenString("PROCESSED"), chainTx.Height, chainTx.Index)
nc.log.Info(msg)
}
processedCount++
if err := nc.updateProcessedUpToHeight(chainTx.Height, chainTx.Index); err != nil {
return err
}
processedToHeight = chainTx.Height
processedToIndex = chainTx.Index
}
if processedCount == totalToProcess {
break
}
currentPage++
}
nc.log.Info("Process history complete")
return nil
}
func (nc *NodeConnector) subscribeAndQueue(ctx context.Context, txQueue chan *api.BlockChainTX) error {
query := "tag.recipient='" + nc.nodeID + "'"
out, err := nc.tmClient.Subscribe(context.Background(), "", query, 1000)
if err != nil {
return errors.Wrapf(err, "Failed to subscribe to query %s", query)
}
go func() {
for {
select {
case result := <-out:
tx := result.Data.(tmtypes.EventDataTx).Tx
payload := &api.BlockChainTX{}
err := json.Unmarshal(tx, payload)
payload.Height = result.Data.(tmtypes.EventDataTx).Height
payload.Index = result.Data.(tmtypes.EventDataTx).Index
if err != nil {
nc.log.Debug("IGNORED TX - Invalid!")
break
}
//check if this node is in receipient list
if payload.RecipientID != nc.nodeID {
nc.log.Debug("IGNORED TX! Recipient not match the query! (%v != %v)", payload.RecipientID, nc.nodeID)
break
}
// TODO: Check if hash match the payload
//Add into the waitingQueue for later processing
txQueue <- payload
case <-ctx.Done():
return
}
}
}()
return nil
}
func (nc *NodeConnector) processTXQueue(ctx context.Context, txQueue chan *api.BlockChainTX, processFn ProcessTXFunc) error {
for {
select {
case chainTx := <-txQueue:
//nc.log.Info(("incoming tx"))
go func() error {
if err := processFn(chainTx); err != nil {
msg := fmt.Sprintf("TX %s Block:%v Index:%v Error:%v", color.RedString("FAILURE"), chainTx.Height, chainTx.Index, err)
nc.log.Info(msg)
} else {
orderRef := chainTx.Tags["reference"]
msg := fmt.Sprintf("TX %s Block:%v Index:%v Type:%v Ref:%v", color.GreenString("PROCESSED"), chainTx.Height, chainTx.Index, chainTx.Processor, orderRef)
nc.log.Info(msg)
}
if err := nc.updateProcessedUpToHeight(chainTx.Height, chainTx.Index); err != nil {
return err
}
return nil
}()
// TODO: store the last block height
case <-ctx.Done():
return nil
}
}
}
func (nc *NodeConnector) updateProcessedUpToHeight(height int64, index uint32) error {
processedTo := fmt.Sprintf("%v.%v", height, index)
if err := nc.store.Set("chain", "height", &processedTo, nil); err != nil {
return errors.Wrapf(err, "Failed to update processed up to %s ", processedTo)
}
return nil
}
func (nc *NodeConnector) getChainStatus() (*status.StatusResponse, error) {
url := fmt.Sprintf("http://%s/status", nc.tmNodeAddr)
resp, err := nc.httpClient.Get(url)
if err != nil {
return nil, errors.Wrap(err, "Get node status")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("Get node status status code: %v", resp.StatusCode)
}
status := &status.StatusResponse{}
if err := json.NewDecoder(resp.Body).Decode((&status)); err != nil {
return nil, errors.Wrap(err, "Invalid node status response")
}
return status, nil
}