package collector import ( "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/go-xorm/xorm" "log" "math/big" "strings" "syncscan-go/er" "syncscan-go/eth" "syncscan-go/model" models_table "syncscan-go/models/table" "time" ) type Collector struct { db *xorm.Engine isDone bool chDone chan struct{} eth *eth.Eth collectionAmount int64 methods map[string]models_table.EthTxMethod } func New(db *xorm.Engine, eth *eth.Eth, collectionAmount int64) *Collector { if collectionAmount == 0 { collectionAmount = 1000 } c := &Collector{db: db, eth: eth, collectionAmount: collectionAmount, methods: make(map[string]models_table.EthTxMethod)} c.loadTxMethods() return c } func (this *Collector) Run() { to, err := this.eth.GetLastBlockNumber() er.Check(err) from, err := this.getLastBlockNumberFromDb() log.Println(to, from, err) er.Check(err) err = this.Process(from, to.Int64()) er.Check(err) // time.Sleep(time.Second * 5) // time.Sleep(time.Second * 60) } func (this *Collector) Stop() { this.isDone = true } func (this *Collector) loadTxMethods() (err error) { this.db.Find(&this.methods) return } func (this *Collector) getLastBlockNumberFromDb() (n int64, err error) { sn := models_table.EthLastSync{} _, err = this.db.Get(&sn) n = int64(sn.LastNo) if n == 0 { _, err = this.db.Insert(&models_table.EthLastSync{SyncType: "eth", LastNo: 0, CreatedOn: time.Now().Unix(), UpdatedOn: time.Now().Unix()}) if err != nil && strings.Contains(err.Error(), "Duplicate entry") { err = nil } } return } func (this *Collector) Process(fromBlockNumber int64, toBlockNumber int64) (err error) { //현재까지 저장된 블럭 이후부터 수집 if toBlockNumber-fromBlockNumber == 0 { log.Println("추가된 블럭 없음:", fromBlockNumber) return nil } if fromBlockNumber != 0 { fromBlockNumber += 1 } if (toBlockNumber - fromBlockNumber) > this.collectionAmount { toBlockNumber = fromBlockNumber + this.collectionAmount } log.Printf("processing started %d to %d\n", fromBlockNumber, toBlockNumber) for n := fromBlockNumber; n <= toBlockNumber; n++ { err = this.processBlock(n) er.Check(err) log.Println("processed block no:", n) _, err = this.db.Where("sync_type = ?", "eth").Update(&models_table.EthLastSync{LastNo: uint64(n), UpdatedOn: time.Now().Unix()}) if err != nil { if strings.Contains(err.Error(), "No content found to be updated") { err = nil } } er.Check(err) } return err } func (this *Collector) processBlock(blockNo int64) (err error) { defer er.Recover(&err) block := models_table.EthBlock{} b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo)) er.Check(err) block.CreatedOn = b.Time() block.BlockNo = blockNo block.TxnCnt = b.Transactions().Len() block.UncleCnt = len(b.Uncles()) block.Miner = b.Coinbase().Hex() block.GasUsed = fmt.Sprintf("%d", b.GasUsed()) block.GasLimit = fmt.Sprintf("%d", b.GasUsed()) if b.BaseFee() != nil { block.BaseFee = fmt.Sprintf("%d", b.BaseFee().Uint64()) } log.Println(block) receipts, err := this.eth.GetTxReceipts(b.Transactions()) er.Check(err) reward, txFee, uncleReward, err := this.eth.GetReward(b, receipts) ucr, _ := uncleReward.Float64() er.Check(err) f, _ := reward.Float64() block.Reward = fmt.Sprintf("%f", f) block.TrfeeSum = fmt.Sprintf("%f", float64(txFee.Uint64())/eth.ETH) block.UncleReward = fmt.Sprintf("%f", ucr/eth.ETH) _, err = this.db.Insert(&block) if err != nil { _, err = this.db.Where("block_no = ?", block.BlockNo).Update(block) er.Check(err) } if blockNo > 52 { err = this.processUncleBlock(blockNo - 52) er.Check(err) } err = this.processTx(b, receipts) er.Check(err) return } func (this *Collector) processUncleBlock(blockNo int64) (err error) { defer er.Recover(&err) b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo)) er.Check(err) uncs := b.Uncles() for i, u := range uncs { ub := models_table.EthUncleBlock{ CreatedOn: b.Time(), BlockNo: blockNo, UncleNo: u.Number.Uint64(), UnclePosition: i, Miner: u.Coinbase.Hex(), GasUsed: u.GasUsed, GasLimit: u.GasLimit, UncleReward: 262.5, } _, err := this.db.Insert(&ub) if err != nil { _, err = this.db.Where("uncle_no = ?", ub.UncleNo).Update(ub) er.Check(err) } } return } func (this *Collector) processTx(block *types.Block, receipts types.Receipts) (err error) { defer er.Recover(&err) for i, tx := range block.Transactions() { t, err := this.eth.GetTxByHash(tx.Hash().Hex()) er.Check(err) value, err := hexutil.DecodeBig(t.Value) if err != nil { value = big.NewInt(0) } from := t.From to := t.To if len(tx.Data()) > 4 { t.Type = hexutil.Encode(tx.Data()[:4]) from, to, err = this.handleTxMethod(t.Type, hexutil.Encode(tx.Data()[4:])) er.Check(err) } contractAddr := "" isContract := false if tx.To() == nil { isContract = true contractAddr = receipts[i].ContractAddress.Hex() } else { contractAddr = tx.To().Hex() cnt, err := this.db.Where("cntr_addr = ?").Count(models_table.EthToken{}) er.Check(err) if cnt > 0 { isContract = true } else { contractAddr = "main_coin" } } if isContract || len(tx.Data()) > 0 { err = this.processTxInternal(block, t.Hash) er.Check(err) } txValue := models_table.EthTranxn{ CreatedOn: block.Time(), BlockNo: block.Number().Uint64(), TxnHash: t.Hash, CntrAddr: contractAddr, FromAddr: from, ToAddr: to, TxnType: t.Type, Value: value.String(), TxnFee: float64(100*receipts[i].GasUsed*tx.GasPrice().Uint64()) / eth.ETH, GasUsed: receipts[i].GasUsed, } _, err = this.db.Insert(&tx) if err != nil { _, err = this.db.Where("txn_hash = ?", txValue.TxnHash).Update(tx) er.Check(err) } err = this.processContract(block, t.From, tx, receipts[i]) er.Check(err) err = this.processToken(block, tx, receipts[i]) er.Check(err) log.Println("processed tx", t.Hash) } return nil } func (this *Collector) processTxInternal(block *types.Block, txHash string) (err error) { defer er.Recover(&err) start := time.Now() calls, err := this.eth.GetInternalTxsByHash(block.Number().Uint64(), txHash) er.Check(err) for _, call := range calls { value := big.NewInt(0) if len(call.Value) > 2 { value, _ = value.SetString(call.Value[2:], 16) } txi := models_table.EthTransInter{ CreatedOn: block.Time(), BlockNo: block.Number().Uint64(), ParentTxnHash: txHash, FromAddr: call.From, ToAddr: call.To, TxnType: call.Type, Value: value.String(), } _, err := this.db.Insert(&txi) if err != nil { _, err = this.db.Where("parent_txn_hash = ?", txHash).Update(txi) er.Check(err) } } if len(calls) > 0 { log.Println("processed internal tx", txHash) } log.Println("elapsed internal tx ", time.Since(start)) return } func (this *Collector) processContract(block *types.Block, txfrom string, tx *types.Transaction, receipt *types.Receipt) (err error) { defer er.Recover(&err) log.Println(receipt.ContractAddress.Hex()) if tx == nil || tx.To() != nil || receipt.ContractAddress.Hex() == "0x0000000000000000000000000000000000000000" { return } contract := models_table.EthContract{ CreatedOn: block.Time(), TxnHash: tx.Hash().Hex(), BlockNo: block.Number().Uint64(), CreatorAddr: txfrom, CntrAddr: receipt.ContractAddress.Hex(), } _, err = this.db.Insert(&contract) if err != nil { _, err = this.db.Where("txn_hash = ?", tx.Hash().Hex()).Update(contract) er.Check(err) } log.Println("processed contract:", tx.Hash(), receipt.ContractAddress) return nil } func (this *Collector) processToken(block *types.Block, tx *types.Transaction, receipt *types.Receipt) (err error) { defer er.Recover(&err) if tx == nil || len(tx.Data()) == 0 { return } tokenAddr := common.Address{} if tx.To() == nil { tokenAddr = receipt.ContractAddress } else { tokenAddr = *tx.To() } token, err := this.eth.GetToken(tokenAddr) if err != nil { return nil } tokenName, err := token.Name(nil) if err != nil { return nil } symbol, err := token.Symbol(nil) if err != nil { return nil } balance, err := token.BalanceOf(nil, tokenAddr) er.Check(err) totalSupply, err := token.TotalSupply(nil) er.Check(err) tokenValue := models_table.EthToken{ CreatedOn: block.Time(), CntrAddr: tokenAddr.Hex(), TokenName: tokenName, Symbol: symbol, CurrBalQty: balance.Uint64(), TotalCirculQty: totalSupply.Uint64(), } _, err = this.db.Insert(&tokenValue) if err != nil { _, err = this.db.Where("cntr_addr = ?", tokenAddr.Hex()).Update(tokenValue) er.Check(err) } contractAddr := tokenAddr.Hex() session := this.db.NewSession() defer session.Close() err = session.Begin() er.Check(err) for _, lg := range receipt.Logs { if len(lg.Topics) == 0 { continue } if lg.Topics[0].Hex() == eth.LogTransferSignHash.Hex() && len(lg.Data) > 0 { value := big.Int{} value.SetBytes(lg.Data) from := "0x" + lg.Topics[1].Big().Text(16) to := "0x" + lg.Topics[2].Big().Text(16) amount := value.Uint64() log.Printf("Transfer Event from: %s, to: %s, value: %d\n", from, to, amount) if amount != 0 && from != to { _, err = session.Where("cntr_addr = ? and holder_addr = ?", contractAddr, from).Cols("created_on", "total_sum").Decr("total_sum", -amount).Update(&models_table.EthTokenHolder{ CreatedOn: block.Time(), }) er.Check(err) affected, err := session.Where("cntr_addr = ? and holder_addr = ?", contractAddr, to).Cols("created_on", "total_sum").Decr("total_sum", amount).Update(&models_table.EthTokenHolder{ CreatedOn: block.Time(), }) er.Check(err) if err == nil && affected == 0 { _, err = session.Insert(&models_table.EthTokenHolder{ CreatedOn: block.Time(), CntrAddr: contractAddr, HolderAddr: to, TotalSum: amount, }) er.Check(err) } } log.Printf("processed Transfer Event contract: %s, from: %s, to: %s, value: %d\n", contractAddr, from, to, amount) } } log.Println("processed token and contract:", tx.Hash(), tx.To()) return nil } func (this *Collector) handleTxMethod(method string, input string) (from string, to string, err error) { input = input[2:] method = method[2:] args := make([]string, 0) for len(input) >= 64 { args = append(args, input[:64]) input = input[64:] } switch method { case model.Method_name, model.Method_symbol, model.Method_decimals, model.Method_totalSupply: case model.Method_balanceOf2: from = args[1] case model.Method_balanceOF, model.Method_transfer, model.Method_withdrawToken, model.Method_depositToken, model.Method_approve: from = args[0] case model.Method_transfer2, model.Method_Transfer, model.Method_transferFrom, model.Method_allowance, model.Method_Approval, model.Method_eventApproval, model.Method_eventTransfer: from = args[0] to = args[1] case model.Method_confirmTransaction, model.Method_balanceOf: } return }