blob: 1e21a120354c6a0bb1b9879f397d707e36bf6ea8 [file] [log] [blame]
package tendermint
import (
"context"
"encoding/hex"
"encoding/json"
"os"
"os/signal"
"syscall"
"github.com/apache/incubator-milagro-dta/libs/logger"
"github.com/apache/incubator-milagro-dta/pkg/api"
tmclient "github.com/tendermint/tendermint/rpc/client"
tmtypes "github.com/tendermint/tendermint/types"
)
//Subscribe - Connect to the Tendermint websocket to collect events
func Subscribe(logger *logger.Logger, nodeID string, listenPort string) error {
//tmlogger := log2.NewTMLogger(log.NewSyncWriter(os.Stdout))
client := tmclient.NewHTTP("tcp://"+node+"", "/websocket")
//client.SetLogger(tmlogger)
err := client.Start()
if err != nil {
logger.Info("Failed to start Tendermint HTTP client %s", err)
return err
}
defer client.Stop()
query := "tag.recipient='" + nodeID + "'"
//query := "tm.event = 'Tx'"
out, err := client.Subscribe(context.Background(), "test", query, 1000)
if err != nil {
logger.Info("Failed to subscribe to query %s %s", query, err)
return err
}
logger.Info("Tendermint: Connected")
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
for {
select {
case result := <-out:
tx := result.Data.(tmtypes.EventDataTx).Tx
payload := api.BlockChainTX{}
err := json.Unmarshal(tx, &payload)
if err != nil {
logger.Info("******** Invalid TX - ignored")
break
}
//check if this node is Sender
if payload.SenderID == nodeID {
break
}
//check is receipient
isRecipient := false
for _, v := range payload.RecipientID {
if v == nodeID {
isRecipient = true
break
}
}
if isRecipient == false {
logger.Info("******** Invalid Recipient - why are we receiving this TX?")
break
}
//blockchainTX, txid, err := decodeTX(string(tx))
TXIDhex := hex.EncodeToString(payload.TXhash[:])
logger.Info("Incoming TXHash:%s . Processor:%s", TXIDhex, payload.Processor)
if payload.Processor == "NONE" {
DumpTX(&payload)
} else {
callNextTX(&payload, listenPort)
}
//print(blockchainTX)
// print(txid)
// print(string(xx))
// a := result.Data.(tmtypes.EventDataTx).Index
// b := result.Data.(tmtypes.EventDataTx)
// c := b.TxResult
// tx := c.Tx
// txdata := []byte(c.Tx)
// print(string(txdata))
// print(a)
// Use(c, b, tx)
//logger.Info("got tx","index", result.Data.(tmtypes.EventDataTx).Index)
case <-quit:
os.Exit(0)
}
}
}
//Use - helper to remove warnings
func Use(vals ...interface{}) {
for _, val := range vals {
_ = val
}
}