package collector import ( "database/sql" "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "strconv" "sync-block/dgn" "sync-block/util" "log" "math/big" "strings" comm "sync-block/common" "sync-block/er" "sync-block/model" "time" ) type TokenInfo struct { name string symbol string totalSupply *big.Int decimals uint8 tokenURI string ercType string } type Collector struct { isDone bool chDone chan struct{} db *sql.DB eth *dgn.Eth collectionAmount int64 methods map[string]string conifg comm.BlockConfig interval time.Duration latestInterval time.Duration confirmBlockNumber int64 erc20Info map[common.Address]*TokenInfo accReward *big.Int } func New(db *sql.DB, eth *dgn.Eth, blockConfig comm.BlockConfig ) *Collector { if blockConfig.CollectionAmount == 0 { blockConfig.CollectionAmount = 1000 } var intv, latestIntv time.Duration if blockConfig.Interval == "" { intv = time.Duration(0) } else { intv = util.MustParseDuration(blockConfig.Interval) } if blockConfig.LatestInterval == "" { latestIntv = util.MustParseDuration("1s") } else { latestIntv = util.MustParseDuration(blockConfig.Interval) } return &Collector{ isDone: false, chDone: nil, db: db, eth: eth, collectionAmount: blockConfig.CollectionAmount, methods: make(map[string]string), conifg: blockConfig, interval: intv, latestInterval: latestIntv, confirmBlockNumber: 0, erc20Info: make(map[common.Address]*TokenInfo), } } //func (this *Collector) Start() { // go this.Run() //} func (this *Collector) Run(hooks chan<- struct{}) { for !this.isDone { to, err := this.eth.GetLastBlockNumber() if err != nil { log.Printf("getLastBlockNumber Error: %v\n", err) time.Sleep(this.latestInterval) continue } from, err := this.getLastBlockNumberFromDb() if err != nil { log.Printf("getLastBlockNumberFromDb Error: %v\n", err) time.Sleep(this.latestInterval) continue } if to.Int64()-from != 0 { err = this.Process(from, to.Int64()) er.Check(err) if this.interval > 0 { time.Sleep(this.interval) } } else { time.Sleep(this.latestInterval) } //time.Sleep(time.Second * 60) } hooks <- struct{}{} } func (this *Collector) StopCollect() { this.isDone = true } func (this *Collector) loadTxMethods() (err error) { rows, err := this.db.Query("select method_id, function_prototype from bcs_tx_method") if err != nil { return } defer rows.Close() for rows.Next() { id := "" function := "" err = rows.Scan(&id, &function) if err != nil { return } this.methods[id] = function } return } func (this *Collector) getLastBlockNumberFromDb() (n int64, err error) { var result sql.NullString rows, err := this.db.Query("SELECT MAX(block_no) FROM bcs_block") if err != nil { return 0, err } defer rows.Close() for rows.Next() { err := rows.Scan(&result) if err != nil { log.Printf("mysql getLastBlockNumberFromDb2:rows.Scan() error: %v",err) return 0, err } if result.Valid { n, _ = strconv.ParseInt(result.String, 10, 64) } else { n = 0 } } return } func (this *Collector) Process(fromBlockNumber int64, toBlockNumber int64) (err error) { //현재까지 저장된 블럭 이후부터 수집 if toBlockNumber-fromBlockNumber == 0 { log.Println("추가된 블럭 없음:", fromBlockNumber) return nil } if fromBlockNumber != 0 { fromBlockNumber += 1 } if (toBlockNumber - fromBlockNumber) > this.collectionAmount { toBlockNumber = fromBlockNumber + this.collectionAmount } log.Printf("processing started %d to %d\n", fromBlockNumber, toBlockNumber) for n := fromBlockNumber; n <= toBlockNumber; n++ { this.GetLastConfrimBlock() // 현재 어디 까지 확정 블록인가? err = this.processBlock(n) er.Check(err) log.Println("processed block no:", n) //_, err = this.db.Exec("update bcs_last_sync_no set last_sync_no = ? where table_sort = ?", n, "block") //er.Check(err) } // 확정 블록을 처리. // 위에서 새로운 블록이 들어 왓을때마다 한번씩 처리 하면 된다. this.processConfirmBlock() return err } func (this *Collector) processBlock(blockNo int64) (err error) { defer er.Recover(&err) block := model.Block{} b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo)) //b, err := this.eth.GetBlockByNumber2(big.NewInt(blockNo)) er.Check(err) block.CreatedAt = b.Time() block.No = blockNo block.Hash = b.Hash().Hex() block.TxCnt = b.Transactions().Len() block.UncleCnt = len(b.Uncles()) block.Miner = b.Coinbase().Hex() block.GasUsed = b.GasUsed() block.GasLimit = b.GasLimit() receipts, err := this.eth.GetTxReceipts(b.Transactions()) er.Check(err) reward, txFee, rewardForUncles, err := this.eth.GetReward(b, receipts) // dangnnUnit, _ := new(big.Float).SetString("100") // reward = reward.Mul(reward, dangnnUnit) // ucr, _ := uncleReward.Float64() // ucr = ucr * 100 er.Check(err) block.Reward = reward // 컨핌 블록인지 검사 하자. confirm := 0 if blockNo + this.conifg.ImmatureDepth < this.confirmBlockNumber { confirm = 1 // 컨핌된 블록이다. } if block.UncleCnt > 1 { fmt.Printf("Uncle: %v\n", block.UncleCnt) } _, err = this.db.Exec("replace bcs_block set created_on = ?, block_no = ?, block_hash = ?, txn_cnt = ?, uncle_cnt = ?,"+ "miner = ?, gas_used = ?, gas_limit = ?, trfee_sum = ?, reward = ?, uncle_reward = ?, status=?", block.CreatedAt, block.No, block.Hash, block.TxCnt, block.UncleCnt, block.Miner, block.GasUsed, block.GasLimit, util.RewardInShannon(txFee), util.RewardInShannon(block.Reward), util.RewardInShannon(rewardForUncles), confirm) er.Check(err) totalReward := big.NewInt(0) //if "0x0000000000000000000000000000000000000000" == block.Miner { if block.No != 0 { totalReward.Add(totalReward, util.RewardInShannonByBig(block.Reward)) _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,miner_amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount+VALUES(amount),miner_amount=miner_amount+VALUES(miner_amount)", block.Miner, block.Reward.String(), block.Reward.String(), 0, 0) er.Check(err) } else { // genesis Alloc을 입력해야 된다. } // 완료 블록이 아니라면 언컴핌 블록 정보를 기입 한다. if confirm == 0 { _, err = this.db.Exec("INSERT INTO bcs_unconfirm(block_no, block_hash) VALUES (?, ?)", block.No, block.Hash) er.Check(err) } uncles := b.Uncles() for i, u := range uncles { reward := this.eth.GetUncleReward(uncles[i].Number.Uint64(), b.Number().Uint64(), this.conifg.MainNet) if reward.Cmp(big.NewInt(0)) < 0 { reward = big.NewInt(0) } totalReward.Add(totalReward, util.RewardInShannonByBig(reward)) _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,miner_amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount+VALUES(amount),miner_amount=miner_amount+VALUES(miner_amount)", uncles[i].Coinbase.Hex(), reward.String(), reward.String(), 0, 0) er.Check(err) _, err = this.db.Exec("replace bcs_uncle_block set created_on = ?, block_no = ?, block_hash = ?, uncle_no = ?, uncle_hash = ?, uncle_position = ?,"+ "miner = ?, gas_used = ?, gas_limit = ?, uncle_reward = ?", b.Time(), blockNo, b.Hash().Hex(), u.Number.Uint64(), u.Hash().Hex(), i, u.Coinbase.Hex(), u.GasUsed, u.GasLimit, util.RewardInShannon(reward)) er.Check(err) } if this.accReward == nil { this.accReward, _ = this.getAccRewardBlock() fmt.Printf("init acc reward: %v",this.accReward) } this.accReward.Add(this.accReward, totalReward) dateTime := util.MakeTimestampZero(b.Time()) dateFormat := util.MakeTimestampToDate(int64(dateTime)) _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,add_block,reward,acc_reward) VALUES (?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE add_block=add_block+1,reward=reward+VALUES(reward),acc_reward=VALUES(acc_reward)", dateTime, dateFormat, 0,"main_coin", 1,this.setStringPerc(totalReward.String(),7), this.setStringPerc(this.accReward.String(),7)) er.Check(err) err = this.processTx(b, receipts) er.Check(err) return } func (this *Collector) setStringPerc(s string,i int) string { length := len(s) if length < 7 { return "0.0000000" } return s[:length-i] + "." + s[length-i:] } func (this *Collector) processUncleBlock(blockNo int64) (err error) { defer er.Recover(&err) b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo)) er.Check(err) uncs := b.Uncles() for i, u := range uncs { _, err = this.db.Exec("replace bcs_uncle_block set created_on = ?, block_no = ?, uncle_no = ?, uncle_position = ?,"+ "miner = ?, gas_used = ?, gas_limit = ?, uncle_reward = ?", b.Time(), blockNo, u.Number.Uint64(), i, u.Coinbase.Hex(), u.GasUsed, u.GasLimit, 262.5) er.Check(err) } return } func (this *Collector) processTx(block *dgn.Block, receipts types.Receipts) (err error) { defer er.Recover(&err) for i, tx := range block.Transactions() { t, err := this.eth.GetTxByHash(tx.Hash().Hex()) er.Check(err) value, err := hexutil.DecodeBig(t.Value) if err != nil { value = big.NewInt(0) } from := t.From to := t.To inputData := "" symbol := "" valueToken := big.NewInt(0) if len(tx.Data()) >= 4 { inputData = hexutil.Encode(tx.Data()) t.Type = hexutil.Encode(tx.Data()[:4]) symbol = t.Type inputFrom, inputTo, tmpValueToken, err := this.handleTxMethod(t.Type, hexutil.Encode(tx.Data()[4:])) er.Check(err) if inputFrom != "" { from = inputFrom } if inputTo != "" { to = inputTo } if tmpValueToken != nil { valueToken = tmpValueToken } if tmpSymbol, err := this.eth.ByteDb.Selector(tx.Data()); err == nil { symbol = tmpSymbol } } contractAddr := "" isContract := false if tx.To() == nil { isContract = true contractAddr = receipts[i].ContractAddress.Hex() } else { contractAddr = tx.To().Hex() this.db.QueryRow("select count(*) > 0 from bcs_contract where cntr_addr = ?", contractAddr).Scan(&isContract) if !isContract { contractAddr = "main_coin" } else { fmt.Printf("Contract: %v", contractAddr) } } if isContract || len(tx.Data()) > 0 { err = this.processTxInternal(block, t.Hash, contractAddr, receipts[i].Status) er.Check(err) } _, err = this.db.Exec("insert bcs_tranxn set created_on = ?, block_no = ?, block_hash = ?, txn_hash = ?, cntr_addr = ?,"+ " from_addr = ?, to_addr = ?, txn_type = ?, txn_symbol=?, value = ?, input_value=?, txn_fee = ?, input=?, gas_used = ?, status=? on duplicate key update cntr_addr = values(cntr_addr),"+ "from_addr = values(from_addr),to_addr = values(to_addr),txn_type = values(txn_type),txn_symbol=values(txn_symbol), value = values(value), input_value=values(input_value),"+ " txn_fee = values(txn_fee), input=values(input), gas_used = values(gas_used), status=values(status)", block.Time(), block.Number().Uint64(),block.Hash().Hex(), t.Hash, contractAddr, from, to, t.Type, symbol, value.String(), valueToken.String(), float64(receipts[i].GasUsed*tx.GasPrice().Uint64())/dgn.DGC, inputData, receipts[i].GasUsed, receipts[i].Status) er.Check(err) err = this.processContract(block, t.From, tx, receipts[i]) er.Check(err) dateTime := util.MakeTimestampZero(block.Time()) dateFormat := util.MakeTimestampToDate(int64(dateTime)) if isContract == true { ercType := this.processERC(block, tx, receipts[i]) if receipts[i].Status == 1 { log.Println("processed tx", t.Hash) } else { switch ercType { case "erc20": _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_fail_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+VALUES(txn_fail_cnt)", dateTime, dateFormat, 20, contractAddr, 0, 1) er.Check(err) case "erc721": _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_fail_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+VALUES(txn_fail_cnt)", dateTime, dateFormat, 721, contractAddr, 0, 1) er.Check(err) default: } } } else { if receipts[i].Status == 1 { // dgn 홀더 성공 _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount-VALUES(amount),txn_cnt=txn_cnt+1", from, value.String(), 1, 0) er.Check(err) _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount+VALUES(amount)", to, value.String(), 0, 0) er.Check(err) // dgn 데일리 통계 _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE circulating_supply=circulating_supply+VALUES(circulating_supply),txn_cnt=txn_cnt+VALUES(txn_cnt)", dateTime, dateFormat, 0, contractAddr, value.String(), 1) er.Check(err) } else { // dgn 홀더 실패 _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+1", from, 0, 0, 1) er.Check(err) // dgn 실패 데일리 통계 _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_fail_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+VALUES(txn_fail_cnt)", dateTime, dateFormat, 0, contractAddr, 0, 1) er.Check(err) } } } return nil } func (this *Collector) processTxInternal(block *dgn.Block, txHash string, cntrAddr string, status uint64) (err error) { lastBlockNo, err := this.eth.GetLastBlockNumber() if err != nil { return } // Retrieves an internal transaction from a specific block. If it is 0, internal transactions are checked in all sections. if this.conifg.TxInternalRange > 0 { if block.Number().Uint64() <= lastBlockNo.Uint64() - this.conifg.TxInternalRange { return } } defer er.Recover(&err) start := time.Now() calls, err := this.eth.GetInternalTxsByHash(block.Number().Uint64(), txHash, lastBlockNo.Uint64()) er.Check(err) for _, call := range calls { value := big.NewInt(0) gasUsed := big.NewInt(0) gas := big.NewInt(0) if len(call.Value) > 2 { value, _ = value.SetString(call.Value[2:], 16) } if len(call.GasUsed) > 2 { gasUsed, _ = gasUsed.SetString(call.GasUsed[2:], 16) } if len(call.Gas) > 2 { gas, _ = gas.SetString(call.Gas[2:], 16) } _, err = this.db.Exec("replace bcs_trans_inter set created_on = ?, block_no = ?, block_hash = ?, parent_txn_hash = ?,cntr_addr=?, from_addr = ?, to_addr = ?,"+ "txn_type = ?, value = ?,gas_used=?,gas=?,input=?,err=?,status=?", block.Time(), block.Number().Uint64(), block.Hash().Hex(), txHash, cntrAddr, call.From, call.To, call.Type, value.String(),gasUsed.String(), gas.String(),call.Input, call.Error, status) er.Check(err) } if len(calls) > 0 { log.Println("processed internal tx", txHash) } log.Println("elapsed internal tx ", time.Since(start)) return } func (this *Collector) processContract(block *dgn.Block, txfrom string, tx *types.Transaction, receipt *types.Receipt) (err error) { defer er.Recover(&err) log.Println(receipt.ContractAddress.Hex()) if tx == nil || tx.To() != nil || receipt.ContractAddress.Hex() == "0x0000000000000000000000000000000000000000" { return } _, err = this.db.Exec("replace bcs_contract set created_on = ?, txn_hash = ?, block_no = ?, creator_addr = ?, cntr_addr = ?", block.Time(), tx.Hash().Hex(), block.Number().Uint64(), txfrom, receipt.ContractAddress.Hex()) er.Check(err) log.Println("processed contract:", tx.Hash(), receipt.ContractAddress) return nil } func (this *Collector) processERC(block *dgn.Block, tx *types.Transaction, receipt *types.Receipt) (ercType string) { if tx == nil || len(tx.Data()) == 0 { return } tokenAddr := common.Address{} if tx.To() == nil { tokenAddr = receipt.ContractAddress } else { tokenAddr = *tx.To() } var ( tokenInfo *TokenInfo ok bool ) ercType = "erc20" if tokenInfo, ok = this.erc20Info[tokenAddr]; ok { ercType = tokenInfo.ercType // 실패한 트랜잭션이라면 여기서 끝낸다. if receipt.Status != 1 { return ercType } } else { name, err := this.eth.GetTokenName(tokenAddr) if err != nil { return "" } symbol, err := this.eth.GetTokenSymbol(tokenAddr) if err != nil { return "" } totalSupply, err := this.eth.GetTokenTotalSupply(tokenAddr) er.Check(err) decimals, err := this.eth.GetTokenDecimals(tokenAddr) // decimal은 없을수도 있다. var tokenURI string if err != nil { // nft검사. tokenURI, err = this.eth.GetNFTTokenURI(tokenAddr,1) if err != nil { // nft도 아니다. 1155 nft일수도 있다. supportsInterface로 수정해야 된다. return "" } ercType = "erc721" // 기본 nft 이다. } tokenInfo = &TokenInfo{ name: name, symbol: symbol, totalSupply: totalSupply, decimals: decimals, ercType: ercType, tokenURI: tokenURI, } this.erc20Info[tokenAddr] = tokenInfo balance, err := this.eth.GetTokenBalanceOf(tokenAddr, tokenAddr) er.Check(err) // 실패한 트랜잭션이라면 여기서 끝낸다. if receipt.Status != 1 { return ercType } switch ercType { case "erc20": _, err = this.db.Exec("insert bcs_token set created_on = ?, cntr_addr = ?, token_name = ?, "+ "symbol = ?, balance = ?, decimals=?,total_supply = ? on duplicate key update token_name = values(token_name), symbol = values(symbol),"+ "balance = values(balance), decimals = values(decimals),total_supply = values(total_supply)", block.Time(), tokenAddr.Hex(), tokenInfo.name, tokenInfo.symbol, balance.String(), tokenInfo.decimals, tokenInfo.totalSupply.String()) er.Check(err) case "erc721": _, err = this.db.Exec("insert bcs_nft set created_on=?,cntr_addr=?,token_name=?,symbol=?,total_supply=?" + "on duplicate key update token_name=values(token_name),symbol=values(symbol),total_supply=values(total_supply)", block.Time(), tokenAddr.Hex(), tokenInfo.name, tokenInfo.symbol, tokenInfo.totalSupply.String()) er.Check(err) default: // 없는 타입 return ercType } } // 밑에는 erc20,erc721 같은 transfer가 있는 erc타입만 사용한다. contractAddr := tokenAddr.Hex() queryStr := strings.Builder{} transferStr := strings.Builder{} insertCnt := int64(0) totalValue := big.NewInt(0) // 밑에 대량 쿼리 처리 필요(미작업) for _, lg := range receipt.Logs { if len(lg.Topics) == 0 { continue } if lg.Topics[0].Hex() == dgn.LogTransferSignHash.Hex() && len(lg.Data) > 0 { topics := fmt.Sprintf("%x",lg.Topics) value := big.Int{} value.SetBytes(lg.Data) if contractAddr == lg.Address.Hex() { totalValue.Add(totalValue,&value) } from := "0x" + lg.Topics[1].Big().Text(16) to := "0x" + lg.Topics[2].Big().Text(16) amount := value.String() reversAmount := "" removed := int(0) if lg.Removed == true { removed = 1 } if value.Cmp(big.NewInt(0)) >= 0 { reversAmount = "-" + amount } else { reversAmount = strings.TrimLeft(amount,"-") } log.Printf("Transfer Event from: %s, to: %s, value: %d\n", from, to, amount) if amount != "" && amount != "0" && from != to { balance_from, err := this.eth.GetTokenBalanceOf(common.HexToAddress(contractAddr),common.HexToAddress(from)) if err != nil { balance_from = big.NewInt(0) } balance_to, err := this.eth.GetTokenBalanceOf(common.HexToAddress(contractAddr),common.HexToAddress(to)) if err != nil { balance_to = big.NewInt(0) } if insertCnt == 0 { queryStr.WriteString(fmt.Sprintf("INSERT INTO bcs_token_holder(`created_on`,`cntr_addr`,`holder_addr`,`qty_sum`,`last_qty_sum`) VALUES ")) if from == "0x0" { queryStr.WriteString(fmt.Sprintf("(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, to, amount, balance_to.String())) // 보내는 사람이 없는 경우. } else { queryStr.WriteString(fmt.Sprintf("(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\"),(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, from, reversAmount, balance_from.String(), block.Time(), contractAddr, to, amount, balance_to.String())) } // 트랜스퍼 RAW 데이타 저장. transferStr.WriteString(fmt.Sprintf("INSERT INTO bcs_txn_transfer(block_no,block_hash,txn_hash,tx_index,`index`,from_addr,to_addr,input_value,topic,removed,cntr_addr,log_cntr_addr) VALUES ")) transferStr.WriteString(fmt.Sprintf("(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", lg.BlockNumber, lg.BlockHash.Hex(), lg.TxHash.Hex(), lg.TxIndex, lg.Index, from, to, value.String(), topics, removed, contractAddr, lg.Address.Hex()) ) } else { queryStr.WriteString(fmt.Sprintf(",(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, from, reversAmount, balance_from.String())) queryStr.WriteString(fmt.Sprintf(",(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, to, amount, balance_to.String())) transferStr.WriteString(fmt.Sprintf(",(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", lg.BlockNumber, lg.BlockHash.Hex(), lg.TxHash.Hex(), lg.TxIndex, lg.Index, from, to, value.String(), topics, removed, contractAddr, lg.Address.Hex()) ) } insertCnt++ } else { fmt.Printf("@!#!@ %v", amount) } log.Printf("processed Transfer Event contract: %s, from: %s, to: %s, value: %d\n", contractAddr, from, to, amount) } } if insertCnt > 0 { queryStr.WriteString(" ON DUPLICATE KEY UPDATE created_on=values(created_on),cntr_addr=values(cntr_addr),holder_addr=values(holder_addr),qty_sum=qty_sum+values(qty_sum),last_qty_sum=values(last_qty_sum)") _, err := this.db.Exec(queryStr.String()) er.Check(err) transferStr.WriteString(" ON DUPLICATE KEY UPDATE block_no=values(block_no),block_hash=values(block_hash),from_addr=values(from_addr),to_addr=values(to_addr)" + ",input_value=values(input_value),topic=values(topic),removed=values(removed),cntr_addr=values(cntr_addr),log_cntr_addr=values(log_cntr_addr)") _, err = this.db.Exec(transferStr.String()) er.Check(err) } nowTimestamp := util.MakeTimestamp() / 1000 dateTime := util.MakeTimestampZero(block.Time()) dateFormat := util.MakeTimestampToDate(int64(dateTime)) switch ercType { case "erc20": _, err := this.db.Exec("UPDATE bcs_token SET update_on=?,circulating_supply=circulating_supply+? WHERE cntr_addr=?", nowTimestamp, totalValue.String(), contractAddr) er.Check(err) _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE circulating_supply=circulating_supply+VALUES (circulating_supply),txn_cnt=txn_cnt+VALUES(txn_cnt)", dateTime, dateFormat, 20, contractAddr, totalValue.String(), insertCnt) er.Check(err) case "erc721": _, err := this.db.Exec("UPDATE bcs_nft SET update_on=?,circulating_supply=circulating_supply+? WHERE cntr_addr=?", nowTimestamp, totalValue.String(), contractAddr) er.Check(err) _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE circulating_supply=circulating_supply+VALUES (circulating_supply),txn_cnt=txn_cnt+VALUES(txn_cnt)", dateTime, dateFormat, 721, contractAddr, totalValue.String(), insertCnt) er.Check(err) default: // 없는 타입 return ercType } log.Println("processed token and contract:", tx.Hash(), tx.To()) return ercType } func has0xPrefix(str string) bool { return len(str) >= 2 && str[0] == '0' && (str[1] == 'x' || str[1] == 'X') } const ( // HashLength is the expected length of the hash HashLength = 32 // AddressLength is the expected length of the address AddressLength = 20 ) // 0x가 없다면 0x를 붙이고 func checkAddress(s string, size int) string { if has0xPrefix(s) { s = s[2:] } s = strings.TrimLeft(s, "0 ") if len(s) > size { s = s[len(s)-size:] } else { for len(s) < size { s = "0" + s } } s = "0x" + s return s } func checkValue(s string) string { if has0xPrefix(s) { s = s[2:] } s = strings.TrimLeft(s, "0 ") if s == "" { return "0x0" } s = "0x" + s return s } func (this *Collector) handleTxMethod(method string, input string) (from string, to string, value *big.Int, err error) { input = input[2:] method = method[2:] args := make([]string, 0) for len(input) >= 64 { args = append(args, input[:64]) input = input[64:] } switch method { case model.Method_name, model.Method_symbol, model.Method_decimals, model.Method_totalSupply: case model.Method_balanceOf2: from = args[1] case model.Method_balanceOF, model.Method_withdrawToken, model.Method_depositToken, model.Method_approve: from = args[0] case model.Method_transfer: to = checkAddress(args[0], AddressLength*2) value= hexutil.MustDecodeBig(checkValue(args[1])) case model.Method_transfer2, model.Method_Transfer, model.Method_transferFrom, model.Method_allowance, model.Method_Approval, model.Method_eventApproval, model.Method_eventTransfer: from = args[0] to = args[1] case model.Method_confirmTransaction, model.Method_balanceOf: } return } func (this *Collector) getUnConfirmBlockInfo() (map[int64][]string, error) { rows, err := this.db.Query("SELECT block_no, block_hash FROM bcs_unconfirm WHERE block_no < ? ORDER BY block_no LIMIT ?", this.confirmBlockNumber, 100) if err != nil { return nil, err } defer rows.Close() result := make(map[int64][]string,0) for rows.Next() { var ( blockNo int64 blockHash string ) err := rows.Scan(&blockNo, &blockHash) if err != nil { log.Printf("mysql getUnConfirmBlockInfo:rows.Scan() error: %v",err) return nil, err } result[blockNo] = append(result[blockNo], blockHash) } return result, nil } func (this *Collector) getAccRewardBlock() (*big.Int, error) { rows, err := this.db.Query("SELECT SUM(reward) FROM `bcs_circulating` WHERE time_on BETWEEN 0 AND UNIX_TIMESTAMP() AND erc_type = 0") if err != nil { return nil, err } defer rows.Close() for rows.Next() { var ( totalReward string ) err := rows.Scan(&totalReward) if err != nil { log.Printf("mysql getAccRewardBlock:rows.Scan() error: %v", err) return big.NewInt(0), err } totalReward = strings.Replace(totalReward,".", "",32) res, _ := big.NewInt(0).SetString(totalReward,10) return res, nil } return big.NewInt(0), nil } func StringInSlice(a string, list []string) bool { for _, b := range list { if b == a { return true } } return false } func (this *Collector) processConfirmBlock() error { // 디비에서 확정되지 않은 블록을 읽어 온다. listConfirmBlock, err := this.getUnConfirmBlockInfo() er.Check(err) if listConfirmBlock == nil { return nil } // 리스트에 있는 블록들이 같은지 비교 하고 for blockNo,blockHash := range listConfirmBlock { // 블록높이에 데이타를 client로 부터 얻어 오자. block, err := this.eth.GetBlockByNumber(big.NewInt(blockNo)) if err != nil { // geth와 연결이 끊어 졌다. return nil } lenBlock := len(blockHash) if lenBlock == 0 { // ?? 올수 없다. } else if lenBlock == 1 { if block.Hash().Hex() == blockHash[0] { _, err = this.db.Exec("UPDATE bcs_block SET status=1 WHERE block_no=? AND block_hash=?",block.Number().Uint64(), block.Hash().Hex()) er.Check(err) } else { // 새로운 블록이 컨핌 블록인 경우. _, err = this.db.Exec("UPDATE bcs_block SET status=2 WHERE block_no=?",block.Number().Uint64()) er.Check(err) // 다시 넣어 준다. this.processBlock(blockNo) } } else { equalBlock := StringInSlice(block.Hash().Hex(), blockHash) if equalBlock { // 같은 블록이 있다. _, err = this.db.Exec("UPDATE bcs_block SET status=1 WHERE block_no=? AND block_hash=?",block.Number().Uint64(), block.Hash().Hex()) er.Check(err) } else { // 같은 블록이 없다면 다시 집어넣어 준다. _, err = this.db.Exec("UPDATE bcs_block SET status=2 WHERE block_no=?",block.Number().Uint64()) er.Check(err) // 다시 넣어 준다. this.processBlock(blockNo) } } // unconfrim 레코드를 삭제 한다. _, err = this.db.Exec("DELETE FROM bcs_unconfirm WHERE block_no=?",block.Number().Uint64()) er.Check(err) } return nil } func (this *Collector) GetLastConfrimBlock() { number, err := this.eth.GetLastBlockNumber() if err != nil { return } this.confirmBlockNumber = number.Int64() - this.conifg.ImmatureDepth }