collector.go 29 KB


  1. package collector
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "github.com/ethereum/go-ethereum/common"
  6. "github.com/ethereum/go-ethereum/common/hexutil"
  7. "github.com/ethereum/go-ethereum/core/types"
  8. "strconv"
  9. "sync-block/dgn"
  10. "sync-block/util"
  11. "log"
  12. "math/big"
  13. "strings"
  14. comm "sync-block/common"
  15. "sync-block/er"
  16. "sync-block/model"
  17. "time"
  18. )
  19. type TokenInfo struct {
  20. name string
  21. symbol string
  22. totalSupply *big.Int
  23. decimals uint8
  24. tokenURI string
  25. ercType string
  26. }
  27. type Collector struct {
  28. isDone bool
  29. chDone chan struct{}
  30. db *sql.DB
  31. eth *dgn.Eth
  32. collectionAmount int64
  33. methods map[string]string
  34. conifg comm.BlockConfig
  35. interval time.Duration
  36. latestInterval time.Duration
  37. confirmBlockNumber int64
  38. erc20Info map[common.Address]*TokenInfo
  39. accReward *big.Int
  40. }
  41. func New(db *sql.DB, eth *dgn.Eth, blockConfig comm.BlockConfig ) *Collector {
  42. if blockConfig.CollectionAmount == 0 {
  43. blockConfig.CollectionAmount = 1000
  44. }
  45. var intv, latestIntv time.Duration
  46. if blockConfig.Interval == "" {
  47. intv = time.Duration(0)
  48. } else {
  49. intv = util.MustParseDuration(blockConfig.Interval)
  50. }
  51. if blockConfig.LatestInterval == "" {
  52. latestIntv = util.MustParseDuration("1s")
  53. } else {
  54. latestIntv = util.MustParseDuration(blockConfig.Interval)
  55. }
  56. return &Collector{
  57. isDone: false,
  58. chDone: nil,
  59. db: db,
  60. eth: eth,
  61. collectionAmount: blockConfig.CollectionAmount,
  62. methods: make(map[string]string),
  63. conifg: blockConfig,
  64. interval: intv,
  65. latestInterval: latestIntv,
  66. confirmBlockNumber: 0,
  67. erc20Info: make(map[common.Address]*TokenInfo),
  68. }
  69. }
  70. //func (this *Collector) Start() {
  71. // go this.Run()
  72. //}
  73. func (this *Collector) Run(hooks chan<- struct{}) {
  74. for !this.isDone {
  75. to, err := this.eth.GetLastBlockNumber()
  76. if err != nil {
  77. log.Printf("getLastBlockNumber Error: %v\n", err)
  78. time.Sleep(this.latestInterval)
  79. continue
  80. }
  81. from, err := this.getLastBlockNumberFromDb()
  82. if err != nil {
  83. log.Printf("getLastBlockNumberFromDb Error: %v\n", err)
  84. time.Sleep(this.latestInterval)
  85. continue
  86. }
  87. if to.Int64()-from != 0 {
  88. err = this.Process(from, to.Int64())
  89. er.Check(err)
  90. if this.interval > 0 {
  91. time.Sleep(this.interval)
  92. }
  93. } else {
  94. time.Sleep(this.latestInterval)
  95. }
  96. //time.Sleep(time.Second * 60)
  97. }
  98. hooks <- struct{}{}
  99. }
  100. func (this *Collector) StopCollect() {
  101. this.isDone = true
  102. }
  103. func (this *Collector) loadTxMethods() (err error) {
  104. rows, err := this.db.Query("select method_id, function_prototype from bcs_tx_method")
  105. if err != nil {
  106. return
  107. }
  108. defer rows.Close()
  109. for rows.Next() {
  110. id := ""
  111. function := ""
  112. err = rows.Scan(&id, &function)
  113. if err != nil {
  114. return
  115. }
  116. this.methods[id] = function
  117. }
  118. return
  119. }
  120. func (this *Collector) getLastBlockNumberFromDb() (n int64, err error) {
  121. var result sql.NullString
  122. rows, err := this.db.Query("SELECT MAX(block_no) FROM bcs_block")
  123. if err != nil {
  124. return 0, err
  125. }
  126. defer rows.Close()
  127. for rows.Next() {
  128. err := rows.Scan(&result)
  129. if err != nil {
  130. log.Printf("mysql getLastBlockNumberFromDb2:rows.Scan() error: %v",err)
  131. return 0, err
  132. }
  133. if result.Valid {
  134. n, _ = strconv.ParseInt(result.String, 10, 64)
  135. } else {
  136. n = 0
  137. }
  138. }
  139. return
  140. }
  141. func (this *Collector) Process(fromBlockNumber int64, toBlockNumber int64) (err error) {
  142. //현재까지 저장된 블럭 이후부터 수집
  143. if toBlockNumber-fromBlockNumber == 0 {
  144. log.Println("추가된 블럭 없음:", fromBlockNumber)
  145. return nil
  146. }
  147. if fromBlockNumber != 0 {
  148. fromBlockNumber += 1
  149. }
  150. if (toBlockNumber - fromBlockNumber) > this.collectionAmount {
  151. toBlockNumber = fromBlockNumber + this.collectionAmount
  152. }
  153. log.Printf("processing started %d to %d\n", fromBlockNumber, toBlockNumber)
  154. for n := fromBlockNumber; n <= toBlockNumber; n++ {
  155. this.GetLastConfrimBlock() // 현재 어디 까지 확정 블록인가?
  156. err = this.processBlock(n)
  157. er.Check(err)
  158. log.Println("processed block no:", n)
  159. //_, err = this.db.Exec("update bcs_last_sync_no set last_sync_no = ? where table_sort = ?", n, "block")
  160. //er.Check(err)
  161. }
  162. // 확정 블록을 처리.
  163. // 위에서 새로운 블록이 들어 왓을때마다 한번씩 처리 하면 된다.
  164. this.processConfirmBlock()
  165. return err
  166. }
  167. func (this *Collector) processBlock(blockNo int64) (err error) {
  168. defer er.Recover(&err)
  169. block := model.Block{}
  170. b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo))
  171. //b, err := this.eth.GetBlockByNumber2(big.NewInt(blockNo))
  172. er.Check(err)
  173. block.CreatedAt = b.Time()
  174. block.No = blockNo
  175. block.Hash = b.Hash().Hex()
  176. block.TxCnt = b.Transactions().Len()
  177. block.UncleCnt = len(b.Uncles())
  178. block.Miner = b.Coinbase().Hex()
  179. block.GasUsed = b.GasUsed()
  180. block.GasLimit = b.GasLimit()
  181. receipts, err := this.eth.GetTxReceipts(b.Transactions())
  182. er.Check(err)
  183. reward, txFee, rewardForUncles, err := this.eth.GetReward(b, receipts)
  184. // dangnnUnit, _ := new(big.Float).SetString("100")
  185. // reward = reward.Mul(reward, dangnnUnit)
  186. // ucr, _ := uncleReward.Float64()
  187. // ucr = ucr * 100
  188. er.Check(err)
  189. block.Reward = reward
  190. // 컨핌 블록인지 검사 하자.
  191. confirm := 0
  192. if blockNo + this.conifg.ImmatureDepth < this.confirmBlockNumber {
  193. confirm = 1 // 컨핌된 블록이다.
  194. }
  195. if block.UncleCnt > 1 {
  196. fmt.Printf("Uncle: %v\n", block.UncleCnt)
  197. }
  198. _, err = this.db.Exec("replace bcs_block set created_on = ?, block_no = ?, block_hash = ?, txn_cnt = ?, uncle_cnt = ?,"+
  199. "miner = ?, gas_used = ?, gas_limit = ?, trfee_sum = ?, reward = ?, uncle_reward = ?, status=?", block.CreatedAt, block.No, block.Hash, block.TxCnt,
  200. block.UncleCnt, block.Miner, block.GasUsed, block.GasLimit, util.RewardInShannon(txFee),
  201. util.RewardInShannon(block.Reward), util.RewardInShannon(rewardForUncles), confirm)
  202. er.Check(err)
  203. totalReward := big.NewInt(0)
  204. //if "0x0000000000000000000000000000000000000000" == block.Miner {
  205. if block.No != 0 {
  206. totalReward.Add(totalReward, util.RewardInShannonByBig(block.Reward))
  207. _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,miner_amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount+VALUES(amount),miner_amount=miner_amount+VALUES(miner_amount)", block.Miner, block.Reward.String(), block.Reward.String(), 0, 0)
  208. er.Check(err)
  209. } else {
  210. // genesis Alloc을 입력해야 된다.
  211. }
  212. // 완료 블록이 아니라면 언컴핌 블록 정보를 기입 한다.
  213. if confirm == 0 {
  214. _, err = this.db.Exec("INSERT INTO bcs_unconfirm(block_no, block_hash) VALUES (?, ?)", block.No, block.Hash)
  215. er.Check(err)
  216. }
  217. uncles := b.Uncles()
  218. for i, u := range uncles {
  219. reward := this.eth.GetUncleReward(uncles[i].Number.Uint64(), b.Number().Uint64(), this.conifg.MainNet)
  220. if reward.Cmp(big.NewInt(0)) < 0 {
  221. reward = big.NewInt(0)
  222. }
  223. totalReward.Add(totalReward, util.RewardInShannonByBig(reward))
  224. _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,miner_amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount+VALUES(amount),miner_amount=miner_amount+VALUES(miner_amount)", uncles[i].Coinbase.Hex(), reward.String(), reward.String(), 0, 0)
  225. er.Check(err)
  226. _, err = this.db.Exec("replace bcs_uncle_block set created_on = ?, block_no = ?, block_hash = ?, uncle_no = ?, uncle_hash = ?, uncle_position = ?,"+
  227. "miner = ?, gas_used = ?, gas_limit = ?, uncle_reward = ?", b.Time(), blockNo, b.Hash().Hex(), u.Number.Uint64(), u.Hash().Hex(),
  228. i, u.Coinbase.Hex(), u.GasUsed, u.GasLimit, util.RewardInShannon(reward))
  229. er.Check(err)
  230. }
  231. if this.accReward == nil {
  232. this.accReward, _ = this.getAccRewardBlock()
  233. fmt.Printf("init acc reward: %v",this.accReward)
  234. }
  235. this.accReward.Add(this.accReward, totalReward)
  236. dateTime := util.MakeTimestampZero(b.Time())
  237. dateFormat := util.MakeTimestampToDate(int64(dateTime))
  238. _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,add_block,reward,acc_reward) VALUES (?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE add_block=add_block+1,reward=reward+VALUES(reward),acc_reward=VALUES(acc_reward)", dateTime, dateFormat, 0,"main_coin", 1,this.setStringPerc(totalReward.String(),7), this.setStringPerc(this.accReward.String(),7))
  239. er.Check(err)
  240. err = this.processTx(b, receipts)
  241. er.Check(err)
  242. return
  243. }
  244. func (this *Collector) setStringPerc(s string,i int) string {
  245. length := len(s)
  246. if length < 7 {
  247. return "0.0000000"
  248. }
  249. return s[:length-i] + "." + s[length-i:]
  250. }
  251. func (this *Collector) processUncleBlock(blockNo int64) (err error) {
  252. defer er.Recover(&err)
  253. b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo))
  254. er.Check(err)
  255. uncs := b.Uncles()
  256. for i, u := range uncs {
  257. _, err = this.db.Exec("replace bcs_uncle_block set created_on = ?, block_no = ?, uncle_no = ?, uncle_position = ?,"+
  258. "miner = ?, gas_used = ?, gas_limit = ?, uncle_reward = ?", b.Time(), blockNo, u.Number.Uint64(),
  259. i, u.Coinbase.Hex(), u.GasUsed, u.GasLimit, 262.5)
  260. er.Check(err)
  261. }
  262. return
  263. }
  264. func (this *Collector) processTx(block *dgn.Block, receipts types.Receipts) (err error) {
  265. defer er.Recover(&err)
  266. for i, tx := range block.Transactions() {
  267. t, err := this.eth.GetTxByHash(tx.Hash().Hex())
  268. er.Check(err)
  269. value, err := hexutil.DecodeBig(t.Value)
  270. if err != nil {
  271. value = big.NewInt(0)
  272. }
  273. from := t.From
  274. to := t.To
  275. inputData := ""
  276. symbol := ""
  277. valueToken := big.NewInt(0)
  278. if len(tx.Data()) >= 4 {
  279. inputData = hexutil.Encode(tx.Data())
  280. t.Type = hexutil.Encode(tx.Data()[:4])
  281. symbol = t.Type
  282. inputFrom, inputTo, tmpValueToken, err := this.handleTxMethod(t.Type, hexutil.Encode(tx.Data()[4:]))
  283. er.Check(err)
  284. if inputFrom != "" {
  285. from = inputFrom
  286. }
  287. if inputTo != "" {
  288. to = inputTo
  289. }
  290. if tmpValueToken != nil {
  291. valueToken = tmpValueToken
  292. }
  293. if tmpSymbol, err := this.eth.ByteDb.Selector(tx.Data()); err == nil {
  294. symbol = tmpSymbol
  295. }
  296. }
  297. contractAddr := ""
  298. isContract := false
  299. if tx.To() == nil {
  300. isContract = true
  301. contractAddr = receipts[i].ContractAddress.Hex()
  302. } else {
  303. contractAddr = tx.To().Hex()
  304. this.db.QueryRow("select count(*) > 0 from bcs_contract where cntr_addr = ?", contractAddr).Scan(&isContract)
  305. if !isContract {
  306. contractAddr = "main_coin"
  307. } else {
  308. fmt.Printf("Contract: %v", contractAddr)
  309. }
  310. }
  311. if isContract || len(tx.Data()) > 0 {
  312. err = this.processTxInternal(block, t.Hash, contractAddr, receipts[i].Status)
  313. er.Check(err)
  314. }
  315. _, err = this.db.Exec("insert bcs_tranxn set created_on = ?, block_no = ?, block_hash = ?, txn_hash = ?, cntr_addr = ?,"+
  316. " from_addr = ?, to_addr = ?, txn_type = ?, txn_symbol=?, value = ?, input_value=?, txn_fee = ?, input=?, gas_used = ?, status=? on duplicate key update cntr_addr = values(cntr_addr),"+
  317. "from_addr = values(from_addr),to_addr = values(to_addr),txn_type = values(txn_type),txn_symbol=values(txn_symbol), value = values(value), input_value=values(input_value),"+
  318. " txn_fee = values(txn_fee), input=values(input), gas_used = values(gas_used), status=values(status)",
  319. block.Time(), block.Number().Uint64(),block.Hash().Hex(),
  320. t.Hash, contractAddr, from, to, t.Type, symbol, value.String(), valueToken.String(), float64(receipts[i].GasUsed*tx.GasPrice().Uint64())/dgn.DGC, inputData, receipts[i].GasUsed, receipts[i].Status)
  321. er.Check(err)
  322. err = this.processContract(block, t.From, tx, receipts[i])
  323. er.Check(err)
  324. dateTime := util.MakeTimestampZero(block.Time())
  325. dateFormat := util.MakeTimestampToDate(int64(dateTime))
  326. if isContract == true {
  327. ercType := this.processERC(block, tx, receipts[i])
  328. if receipts[i].Status == 1 {
  329. log.Println("processed tx", t.Hash)
  330. } else {
  331. switch ercType {
  332. case "erc20":
  333. _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_fail_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+VALUES(txn_fail_cnt)", dateTime, dateFormat, 20, contractAddr, 0, 1)
  334. er.Check(err)
  335. case "erc721":
  336. _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_fail_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+VALUES(txn_fail_cnt)", dateTime, dateFormat, 721, contractAddr, 0, 1)
  337. er.Check(err)
  338. default:
  339. }
  340. }
  341. } else {
  342. if receipts[i].Status == 1 {
  343. // dgn 홀더 성공
  344. _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount-VALUES(amount),txn_cnt=txn_cnt+1", from, value.String(), 1, 0)
  345. er.Check(err)
  346. _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE amount=amount+VALUES(amount)", to, value.String(), 0, 0)
  347. er.Check(err)
  348. // dgn 데일리 통계
  349. _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE circulating_supply=circulating_supply+VALUES(circulating_supply),txn_cnt=txn_cnt+VALUES(txn_cnt)", dateTime, dateFormat, 0, contractAddr, value.String(), 1)
  350. er.Check(err)
  351. } else {
  352. // dgn 홀더 실패
  353. _, err = this.db.Exec("INSERT INTO bcs_dgn_holder(dgn_addr,amount,txn_cnt,txn_fail_cnt) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+1", from, 0, 0, 1)
  354. er.Check(err)
  355. // dgn 실패 데일리 통계
  356. _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_fail_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE txn_fail_cnt=txn_fail_cnt+VALUES(txn_fail_cnt)", dateTime, dateFormat, 0, contractAddr, 0, 1)
  357. er.Check(err)
  358. }
  359. }
  360. }
  361. return nil
  362. }
  363. func (this *Collector) processTxInternal(block *dgn.Block, txHash string, cntrAddr string, status uint64) (err error) {
  364. lastBlockNo, err := this.eth.GetLastBlockNumber()
  365. if err != nil {
  366. return
  367. }
  368. // Retrieves an internal transaction from a specific block. If it is 0, internal transactions are checked in all sections.
  369. if this.conifg.TxInternalRange > 0 {
  370. if block.Number().Uint64() <= lastBlockNo.Uint64() - this.conifg.TxInternalRange {
  371. return
  372. }
  373. }
  374. defer er.Recover(&err)
  375. start := time.Now()
  376. calls, err := this.eth.GetInternalTxsByHash(block.Number().Uint64(), txHash, lastBlockNo.Uint64())
  377. er.Check(err)
  378. for _, call := range calls {
  379. value := big.NewInt(0)
  380. gasUsed := big.NewInt(0)
  381. gas := big.NewInt(0)
  382. if len(call.Value) > 2 {
  383. value, _ = value.SetString(call.Value[2:], 16)
  384. }
  385. if len(call.GasUsed) > 2 {
  386. gasUsed, _ = gasUsed.SetString(call.GasUsed[2:], 16)
  387. }
  388. if len(call.Gas) > 2 {
  389. gas, _ = gas.SetString(call.Gas[2:], 16)
  390. }
  391. _, err = this.db.Exec("replace bcs_trans_inter set created_on = ?, block_no = ?, block_hash = ?, parent_txn_hash = ?,cntr_addr=?, from_addr = ?, to_addr = ?,"+
  392. "txn_type = ?, value = ?,gas_used=?,gas=?,input=?,err=?,status=?", block.Time(), block.Number().Uint64(), block.Hash().Hex(), txHash, cntrAddr, call.From, call.To, call.Type, value.String(),gasUsed.String(), gas.String(),call.Input, call.Error, status)
  393. er.Check(err)
  394. }
  395. if len(calls) > 0 {
  396. log.Println("processed internal tx", txHash)
  397. }
  398. log.Println("elapsed internal tx ", time.Since(start))
  399. return
  400. }
  401. func (this *Collector) processContract(block *dgn.Block, txfrom string, tx *types.Transaction, receipt *types.Receipt) (err error) {
  402. defer er.Recover(&err)
  403. log.Println(receipt.ContractAddress.Hex())
  404. if tx == nil || tx.To() != nil || receipt.ContractAddress.Hex() == "0x0000000000000000000000000000000000000000" {
  405. return
  406. }
  407. _, err = this.db.Exec("replace bcs_contract set created_on = ?, txn_hash = ?, block_no = ?, creator_addr = ?, cntr_addr = ?",
  408. block.Time(), tx.Hash().Hex(), block.Number().Uint64(), txfrom, receipt.ContractAddress.Hex())
  409. er.Check(err)
  410. log.Println("processed contract:", tx.Hash(), receipt.ContractAddress)
  411. return nil
  412. }
  413. func (this *Collector) processERC(block *dgn.Block, tx *types.Transaction, receipt *types.Receipt) (ercType string) {
  414. if tx == nil || len(tx.Data()) == 0 {
  415. return
  416. }
  417. tokenAddr := common.Address{}
  418. if tx.To() == nil {
  419. tokenAddr = receipt.ContractAddress
  420. } else {
  421. tokenAddr = *tx.To()
  422. }
  423. var (
  424. tokenInfo *TokenInfo
  425. ok bool
  426. )
  427. ercType = "erc20"
  428. if tokenInfo, ok = this.erc20Info[tokenAddr]; ok {
  429. ercType = tokenInfo.ercType
  430. // 실패한 트랜잭션이라면 여기서 끝낸다.
  431. if receipt.Status != 1 {
  432. return ercType
  433. }
  434. } else {
  435. name, err := this.eth.GetTokenName(tokenAddr)
  436. if err != nil {
  437. return ""
  438. }
  439. symbol, err := this.eth.GetTokenSymbol(tokenAddr)
  440. if err != nil {
  441. return ""
  442. }
  443. totalSupply, err := this.eth.GetTokenTotalSupply(tokenAddr)
  444. er.Check(err)
  445. decimals, err := this.eth.GetTokenDecimals(tokenAddr)
  446. // decimal은 없을수도 있다.
  447. var tokenURI string
  448. if err != nil {
  449. // nft검사.
  450. tokenURI, err = this.eth.GetNFTTokenURI(tokenAddr,1)
  451. if err != nil {
  452. // nft도 아니다. 1155 nft일수도 있다. supportsInterface로 수정해야 된다.
  453. return ""
  454. }
  455. ercType = "erc721" // 기본 nft 이다.
  456. }
  457. tokenInfo = &TokenInfo{
  458. name: name,
  459. symbol: symbol,
  460. totalSupply: totalSupply,
  461. decimals: decimals,
  462. ercType: ercType,
  463. tokenURI: tokenURI,
  464. }
  465. this.erc20Info[tokenAddr] = tokenInfo
  466. balance, err := this.eth.GetTokenBalanceOf(tokenAddr, tokenAddr)
  467. er.Check(err)
  468. // 실패한 트랜잭션이라면 여기서 끝낸다.
  469. if receipt.Status != 1 {
  470. return ercType
  471. }
  472. switch ercType {
  473. case "erc20":
  474. _, err = this.db.Exec("insert bcs_token set created_on = ?, cntr_addr = ?, token_name = ?, "+
  475. "symbol = ?, balance = ?, decimals=?,total_supply = ? on duplicate key update token_name = values(token_name), symbol = values(symbol),"+
  476. "balance = values(balance), decimals = values(decimals),total_supply = values(total_supply)",
  477. block.Time(), tokenAddr.Hex(), tokenInfo.name, tokenInfo.symbol, balance.String(), tokenInfo.decimals, tokenInfo.totalSupply.String())
  478. er.Check(err)
  479. case "erc721":
  480. _, err = this.db.Exec("insert bcs_nft set created_on=?,cntr_addr=?,token_name=?,symbol=?,total_supply=?" +
  481. "on duplicate key update token_name=values(token_name),symbol=values(symbol),total_supply=values(total_supply)",
  482. block.Time(), tokenAddr.Hex(), tokenInfo.name, tokenInfo.symbol, tokenInfo.totalSupply.String())
  483. er.Check(err)
  484. default:
  485. // 없는 타입
  486. return ercType
  487. }
  488. }
  489. // 밑에는 erc20,erc721 같은 transfer가 있는 erc타입만 사용한다.
  490. contractAddr := tokenAddr.Hex()
  491. queryStr := strings.Builder{}
  492. transferStr := strings.Builder{}
  493. insertCnt := int64(0)
  494. totalValue := big.NewInt(0)
  495. // 밑에 대량 쿼리 처리 필요(미작업)
  496. for _, lg := range receipt.Logs {
  497. if len(lg.Topics) == 0 {
  498. continue
  499. }
  500. if lg.Topics[0].Hex() == dgn.LogTransferSignHash.Hex() && len(lg.Data) > 0 {
  501. topics := fmt.Sprintf("%x",lg.Topics)
  502. value := big.Int{}
  503. value.SetBytes(lg.Data)
  504. if contractAddr == lg.Address.Hex() {
  505. totalValue.Add(totalValue,&value)
  506. }
  507. from := "0x" + lg.Topics[1].Big().Text(16)
  508. to := "0x" + lg.Topics[2].Big().Text(16)
  509. amount := value.String()
  510. reversAmount := ""
  511. removed := int(0)
  512. if lg.Removed == true {
  513. removed = 1
  514. }
  515. if value.Cmp(big.NewInt(0)) >= 0 {
  516. reversAmount = "-" + amount
  517. } else {
  518. reversAmount = strings.TrimLeft(amount,"-")
  519. }
  520. log.Printf("Transfer Event from: %s, to: %s, value: %d\n", from, to, amount)
  521. if amount != "" && amount != "0" && from != to {
  522. balance_from, err := this.eth.GetTokenBalanceOf(common.HexToAddress(contractAddr),common.HexToAddress(from))
  523. if err != nil {
  524. balance_from = big.NewInt(0)
  525. }
  526. balance_to, err := this.eth.GetTokenBalanceOf(common.HexToAddress(contractAddr),common.HexToAddress(to))
  527. if err != nil {
  528. balance_to = big.NewInt(0)
  529. }
  530. if insertCnt == 0 {
  531. queryStr.WriteString(fmt.Sprintf("INSERT INTO bcs_token_holder(`created_on`,`cntr_addr`,`holder_addr`,`qty_sum`,`last_qty_sum`) VALUES "))
  532. if from == "0x0" {
  533. queryStr.WriteString(fmt.Sprintf("(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, to, amount, balance_to.String())) // 보내는 사람이 없는 경우.
  534. } else {
  535. queryStr.WriteString(fmt.Sprintf("(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\"),(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, from, reversAmount, balance_from.String(), block.Time(), contractAddr, to, amount, balance_to.String()))
  536. }
  537. // 트랜스퍼 RAW 데이타 저장.
  538. transferStr.WriteString(fmt.Sprintf("INSERT INTO bcs_txn_transfer(block_no,block_hash,txn_hash,tx_index,`index`,from_addr,to_addr,input_value,topic,removed,cntr_addr,log_cntr_addr) VALUES "))
  539. transferStr.WriteString(fmt.Sprintf("(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")",
  540. lg.BlockNumber, lg.BlockHash.Hex(), lg.TxHash.Hex(), lg.TxIndex, lg.Index, from, to, value.String(), topics, removed, contractAddr, lg.Address.Hex()) )
  541. } else {
  542. queryStr.WriteString(fmt.Sprintf(",(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, from, reversAmount, balance_from.String()))
  543. queryStr.WriteString(fmt.Sprintf(",(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")", block.Time(), contractAddr, to, amount, balance_to.String()))
  544. transferStr.WriteString(fmt.Sprintf(",(\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\",\"%v\")",
  545. lg.BlockNumber, lg.BlockHash.Hex(), lg.TxHash.Hex(), lg.TxIndex, lg.Index, from, to, value.String(), topics, removed, contractAddr, lg.Address.Hex()) )
  546. }
  547. insertCnt++
  548. } else {
  549. fmt.Printf("@!#!@ %v", amount)
  550. }
  551. log.Printf("processed Transfer Event contract: %s, from: %s, to: %s, value: %d\n", contractAddr, from, to, amount)
  552. }
  553. }
  554. if insertCnt > 0 {
  555. queryStr.WriteString(" ON DUPLICATE KEY UPDATE created_on=values(created_on),cntr_addr=values(cntr_addr),holder_addr=values(holder_addr),qty_sum=qty_sum+values(qty_sum),last_qty_sum=values(last_qty_sum)")
  556. _, err := this.db.Exec(queryStr.String())
  557. er.Check(err)
  558. transferStr.WriteString(" ON DUPLICATE KEY UPDATE block_no=values(block_no),block_hash=values(block_hash),from_addr=values(from_addr),to_addr=values(to_addr)" +
  559. ",input_value=values(input_value),topic=values(topic),removed=values(removed),cntr_addr=values(cntr_addr),log_cntr_addr=values(log_cntr_addr)")
  560. _, err = this.db.Exec(transferStr.String())
  561. er.Check(err)
  562. }
  563. nowTimestamp := util.MakeTimestamp() / 1000
  564. dateTime := util.MakeTimestampZero(block.Time())
  565. dateFormat := util.MakeTimestampToDate(int64(dateTime))
  566. switch ercType {
  567. case "erc20":
  568. _, err := this.db.Exec("UPDATE bcs_token SET update_on=?,circulating_supply=circulating_supply+? WHERE cntr_addr=?", nowTimestamp, totalValue.String(), contractAddr)
  569. er.Check(err)
  570. _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE circulating_supply=circulating_supply+VALUES (circulating_supply),txn_cnt=txn_cnt+VALUES(txn_cnt)", dateTime, dateFormat, 20, contractAddr, totalValue.String(), insertCnt)
  571. er.Check(err)
  572. case "erc721":
  573. _, err := this.db.Exec("UPDATE bcs_nft SET update_on=?,circulating_supply=circulating_supply+? WHERE cntr_addr=?", nowTimestamp, totalValue.String(), contractAddr)
  574. er.Check(err)
  575. _, err = this.db.Exec("INSERT INTO bcs_circulating(time_on,time_at,erc_type,cntr_addr,circulating_supply,txn_cnt) VALUES (?,?,?,?,?,?) ON DUPLICATE KEY UPDATE circulating_supply=circulating_supply+VALUES (circulating_supply),txn_cnt=txn_cnt+VALUES(txn_cnt)", dateTime, dateFormat, 721, contractAddr, totalValue.String(), insertCnt)
  576. er.Check(err)
  577. default:
  578. // 없는 타입
  579. return ercType
  580. }
  581. log.Println("processed token and contract:", tx.Hash(), tx.To())
  582. return ercType
  583. }
  584. func has0xPrefix(str string) bool {
  585. return len(str) >= 2 && str[0] == '0' && (str[1] == 'x' || str[1] == 'X')
  586. }
  587. const (
  588. // HashLength is the expected length of the hash
  589. HashLength = 32
  590. // AddressLength is the expected length of the address
  591. AddressLength = 20
  592. )
  593. // 0x가 없다면 0x를 붙이고
  594. func checkAddress(s string, size int) string {
  595. if has0xPrefix(s) {
  596. s = s[2:]
  597. }
  598. s = strings.TrimLeft(s, "0 ")
  599. if len(s) > size {
  600. s = s[len(s)-size:]
  601. } else {
  602. for len(s) < size {
  603. s = "0" + s
  604. }
  605. }
  606. s = "0x" + s
  607. return s
  608. }
  609. func checkValue(s string) string {
  610. if has0xPrefix(s) {
  611. s = s[2:]
  612. }
  613. s = strings.TrimLeft(s, "0 ")
  614. if s == "" {
  615. return "0x0"
  616. }
  617. s = "0x" + s
  618. return s
  619. }
  620. func (this *Collector) handleTxMethod(method string, input string) (from string, to string, value *big.Int, err error) {
  621. input = input[2:]
  622. method = method[2:]
  623. args := make([]string, 0)
  624. for len(input) >= 64 {
  625. args = append(args, input[:64])
  626. input = input[64:]
  627. }
  628. switch method {
  629. case model.Method_name, model.Method_symbol, model.Method_decimals, model.Method_totalSupply:
  630. case model.Method_balanceOf2:
  631. from = args[1]
  632. case model.Method_balanceOF, model.Method_withdrawToken, model.Method_depositToken, model.Method_approve:
  633. from = args[0]
  634. case model.Method_transfer:
  635. to = checkAddress(args[0], AddressLength*2)
  636. value= hexutil.MustDecodeBig(checkValue(args[1]))
  637. case model.Method_transfer2, model.Method_Transfer, model.Method_transferFrom, model.Method_allowance,
  638. model.Method_Approval, model.Method_eventApproval, model.Method_eventTransfer:
  639. from = args[0]
  640. to = args[1]
  641. case model.Method_confirmTransaction, model.Method_balanceOf:
  642. }
  643. return
  644. }
  645. func (this *Collector) getUnConfirmBlockInfo() (map[int64][]string, error) {
  646. rows, err := this.db.Query("SELECT block_no, block_hash FROM bcs_unconfirm WHERE block_no < ? ORDER BY block_no LIMIT ?", this.confirmBlockNumber, 100)
  647. if err != nil {
  648. return nil, err
  649. }
  650. defer rows.Close()
  651. result := make(map[int64][]string,0)
  652. for rows.Next() {
  653. var (
  654. blockNo int64
  655. blockHash string
  656. )
  657. err := rows.Scan(&blockNo, &blockHash)
  658. if err != nil {
  659. log.Printf("mysql getUnConfirmBlockInfo:rows.Scan() error: %v",err)
  660. return nil, err
  661. }
  662. result[blockNo] = append(result[blockNo], blockHash)
  663. }
  664. return result, nil
  665. }
  666. func (this *Collector) getAccRewardBlock() (*big.Int, error) {
  667. rows, err := this.db.Query("SELECT SUM(reward) FROM `bcs_circulating` WHERE time_on BETWEEN 0 AND UNIX_TIMESTAMP() AND erc_type = 0")
  668. if err != nil {
  669. return nil, err
  670. }
  671. defer rows.Close()
  672. for rows.Next() {
  673. var (
  674. totalReward string
  675. )
  676. err := rows.Scan(&totalReward)
  677. if err != nil {
  678. log.Printf("mysql getAccRewardBlock:rows.Scan() error: %v", err)
  679. return big.NewInt(0), err
  680. }
  681. totalReward = strings.Replace(totalReward,".", "",32)
  682. res, _ := big.NewInt(0).SetString(totalReward,10)
  683. return res, nil
  684. }
  685. return big.NewInt(0), nil
  686. }
  687. func StringInSlice(a string, list []string) bool {
  688. for _, b := range list {
  689. if b == a {
  690. return true
  691. }
  692. }
  693. return false
  694. }
  695. func (this *Collector) processConfirmBlock() error {
  696. // 디비에서 확정되지 않은 블록을 읽어 온다.
  697. listConfirmBlock, err := this.getUnConfirmBlockInfo()
  698. er.Check(err)
  699. if listConfirmBlock == nil {
  700. return nil
  701. }
  702. // 리스트에 있는 블록들이 같은지 비교 하고
  703. for blockNo,blockHash := range listConfirmBlock {
  704. // 블록높이에 데이타를 client로 부터 얻어 오자.
  705. block, err := this.eth.GetBlockByNumber(big.NewInt(blockNo))
  706. if err != nil {
  707. // geth와 연결이 끊어 졌다.
  708. return nil
  709. }
  710. lenBlock := len(blockHash)
  711. if lenBlock == 0 {
  712. // ?? 올수 없다.
  713. } else if lenBlock == 1 {
  714. if block.Hash().Hex() == blockHash[0] {
  715. _, err = this.db.Exec("UPDATE bcs_block SET status=1 WHERE block_no=? AND block_hash=?",block.Number().Uint64(), block.Hash().Hex())
  716. er.Check(err)
  717. } else {
  718. // 새로운 블록이 컨핌 블록인 경우.
  719. _, err = this.db.Exec("UPDATE bcs_block SET status=2 WHERE block_no=?",block.Number().Uint64())
  720. er.Check(err)
  721. // 다시 넣어 준다.
  722. this.processBlock(blockNo)
  723. }
  724. } else {
  725. equalBlock := StringInSlice(block.Hash().Hex(), blockHash)
  726. if equalBlock {
  727. // 같은 블록이 있다.
  728. _, err = this.db.Exec("UPDATE bcs_block SET status=1 WHERE block_no=? AND block_hash=?",block.Number().Uint64(), block.Hash().Hex())
  729. er.Check(err)
  730. } else {
  731. // 같은 블록이 없다면 다시 집어넣어 준다.
  732. _, err = this.db.Exec("UPDATE bcs_block SET status=2 WHERE block_no=?",block.Number().Uint64())
  733. er.Check(err)
  734. // 다시 넣어 준다.
  735. this.processBlock(blockNo)
  736. }
  737. }
  738. // unconfrim 레코드를 삭제 한다.
  739. _, err = this.db.Exec("DELETE FROM bcs_unconfirm WHERE block_no=?",block.Number().Uint64())
  740. er.Check(err)
  741. }
  742. return nil
  743. }
  744. func (this *Collector) GetLastConfrimBlock() {
  745. number, err := this.eth.GetLastBlockNumber()
  746. if err != nil {
  747. return
  748. }
  749. this.confirmBlockNumber = number.Int64() - this.conifg.ImmatureDepth
  750. }