collector.go 11 KB


  1. package collector
  2. import (
  3. "fmt"
  4. "github.com/ethereum/go-ethereum/common"
  5. "github.com/ethereum/go-ethereum/common/hexutil"
  6. "github.com/ethereum/go-ethereum/core/types"
  7. "github.com/go-xorm/xorm"
  8. "log"
  9. "math/big"
  10. "strings"
  11. "syncscan-go/er"
  12. "syncscan-go/eth"
  13. "syncscan-go/model"
  14. models_table "syncscan-go/models/table"
  15. "time"
  16. )
  17. type Collector struct {
  18. db *xorm.Engine
  19. isDone bool
  20. chDone chan struct{}
  21. eth *eth.Eth
  22. collectionAmount int64
  23. methods map[string]models_table.EthTxMethod
  24. }
  25. func New(db *xorm.Engine, eth *eth.Eth, collectionAmount int64) *Collector {
  26. if collectionAmount == 0 {
  27. collectionAmount = 1000
  28. }
  29. c := &Collector{db: db, eth: eth, collectionAmount: collectionAmount, methods: make(map[string]models_table.EthTxMethod)}
  30. c.loadTxMethods()
  31. return c
  32. }
  33. func (this *Collector) Run() {
  34. to, err := this.eth.GetLastBlockNumber()
  35. er.Check(err)
  36. from, err := this.getLastBlockNumberFromDb()
  37. log.Println(to, from, err)
  38. er.Check(err)
  39. err = this.Process(from, to.Int64())
  40. er.Check(err)
  41. // time.Sleep(time.Second * 5)
  42. // time.Sleep(time.Second * 60)
  43. }
  44. func (this *Collector) Stop() {
  45. this.isDone = true
  46. }
  47. func (this *Collector) loadTxMethods() (err error) {
  48. this.db.Find(&this.methods)
  49. return
  50. }
  51. func (this *Collector) getLastBlockNumberFromDb() (n int64, err error) {
  52. sn := models_table.EthLastSync{}
  53. _, err = this.db.Get(&sn)
  54. n = int64(sn.LastNo)
  55. if n == 0 {
  56. _, err = this.db.Insert(&models_table.EthLastSync{SyncType: "eth", LastNo: 0, CreatedOn: time.Now().Unix(), UpdatedOn: time.Now().Unix()})
  57. if err != nil && strings.Contains(err.Error(), "Duplicate entry") {
  58. err = nil
  59. }
  60. }
  61. return
  62. }
  63. func (this *Collector) Process(fromBlockNumber int64, toBlockNumber int64) (err error) {
  64. //현재까지 저장된 블럭 이후부터 수집
  65. if toBlockNumber-fromBlockNumber == 0 {
  66. log.Println("추가된 블럭 없음:", fromBlockNumber)
  67. return nil
  68. }
  69. if fromBlockNumber != 0 {
  70. fromBlockNumber += 1
  71. }
  72. if (toBlockNumber - fromBlockNumber) > this.collectionAmount {
  73. toBlockNumber = fromBlockNumber + this.collectionAmount
  74. }
  75. log.Printf("processing started %d to %d\n", fromBlockNumber, toBlockNumber)
  76. for n := fromBlockNumber; n <= toBlockNumber; n++ {
  77. err = this.processBlock(n)
  78. er.Check(err)
  79. log.Println("processed block no:", n)
  80. _, err = this.db.Where("sync_type = ?", "eth").Update(&models_table.EthLastSync{LastNo: uint64(n), UpdatedOn: time.Now().Unix()})
  81. if err != nil {
  82. if strings.Contains(err.Error(), "No content found to be updated") {
  83. err = nil
  84. }
  85. }
  86. er.Check(err)
  87. }
  88. return err
  89. }
  90. func (this *Collector) processBlock(blockNo int64) (err error) {
  91. defer er.Recover(&err)
  92. block := models_table.EthBlock{}
  93. b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo))
  94. er.Check(err)
  95. block.CreatedOn = b.Time()
  96. block.BlockNo = blockNo
  97. block.TxnCnt = b.Transactions().Len()
  98. block.UncleCnt = len(b.Uncles())
  99. block.Miner = b.Coinbase().Hex()
  100. block.GasUsed = fmt.Sprintf("%d", b.GasUsed())
  101. block.GasLimit = fmt.Sprintf("%d", b.GasUsed())
  102. if b.BaseFee() != nil {
  103. block.BaseFee = fmt.Sprintf("%d", b.BaseFee().Uint64())
  104. }
  105. log.Println(block)
  106. receipts, err := this.eth.GetTxReceipts(b.Transactions())
  107. er.Check(err)
  108. reward, txFee, uncleReward, err := this.eth.GetReward(b, receipts)
  109. ucr, _ := uncleReward.Float64()
  110. er.Check(err)
  111. f, _ := reward.Float64()
  112. block.Reward = fmt.Sprintf("%f", f)
  113. block.TrfeeSum = fmt.Sprintf("%f", float64(txFee.Uint64())/eth.ETH)
  114. block.UncleReward = fmt.Sprintf("%f", ucr/eth.ETH)
  115. _, err = this.db.Insert(&block)
  116. if err != nil {
  117. _, err = this.db.Where("block_no = ?", block.BlockNo).Update(block)
  118. er.Check(err)
  119. }
  120. if blockNo > 52 {
  121. err = this.processUncleBlock(blockNo - 52)
  122. er.Check(err)
  123. }
  124. err = this.processTx(b, receipts)
  125. er.Check(err)
  126. return
  127. }
  128. func (this *Collector) processUncleBlock(blockNo int64) (err error) {
  129. defer er.Recover(&err)
  130. b, err := this.eth.GetBlockByNumber(big.NewInt(blockNo))
  131. er.Check(err)
  132. uncs := b.Uncles()
  133. for i, u := range uncs {
  134. ub := models_table.EthUncleBlock{
  135. CreatedOn: b.Time(),
  136. BlockNo: blockNo,
  137. UncleNo: u.Number.Uint64(),
  138. UnclePosition: i,
  139. Miner: u.Coinbase.Hex(),
  140. GasUsed: u.GasUsed,
  141. GasLimit: u.GasLimit,
  142. UncleReward: 262.5,
  143. }
  144. _, err := this.db.Insert(&ub)
  145. if err != nil {
  146. _, err = this.db.Where("uncle_no = ?", ub.UncleNo).Update(ub)
  147. er.Check(err)
  148. }
  149. }
  150. return
  151. }
  152. func (this *Collector) processTx(block *types.Block, receipts types.Receipts) (err error) {
  153. defer er.Recover(&err)
  154. for i, tx := range block.Transactions() {
  155. t, err := this.eth.GetTxByHash(tx.Hash().Hex())
  156. er.Check(err)
  157. value, err := hexutil.DecodeBig(t.Value)
  158. if err != nil {
  159. value = big.NewInt(0)
  160. }
  161. from := t.From
  162. to := t.To
  163. if len(tx.Data()) > 4 {
  164. t.Type = hexutil.Encode(tx.Data()[:4])
  165. from, to, err = this.handleTxMethod(t.Type, hexutil.Encode(tx.Data()[4:]))
  166. er.Check(err)
  167. }
  168. contractAddr := ""
  169. isContract := false
  170. if tx.To() == nil {
  171. isContract = true
  172. contractAddr = receipts[i].ContractAddress.Hex()
  173. } else {
  174. contractAddr = tx.To().Hex()
  175. cnt, err := this.db.Where("cntr_addr = ?").Count(models_table.EthToken{})
  176. er.Check(err)
  177. if cnt > 0 {
  178. isContract = true
  179. } else {
  180. contractAddr = "main_coin"
  181. }
  182. }
  183. if isContract || len(tx.Data()) > 0 {
  184. err = this.processTxInternal(block, t.Hash)
  185. er.Check(err)
  186. }
  187. txValue := models_table.EthTranxn{
  188. CreatedOn: block.Time(),
  189. BlockNo: block.Number().Uint64(),
  190. TxnHash: t.Hash,
  191. CntrAddr: contractAddr,
  192. FromAddr: from,
  193. ToAddr: to,
  194. TxnType: t.Type,
  195. Value: value.String(),
  196. TxnFee: float64(100*receipts[i].GasUsed*tx.GasPrice().Uint64()) / eth.ETH,
  197. GasUsed: receipts[i].GasUsed,
  198. }
  199. _, err = this.db.Insert(&tx)
  200. if err != nil {
  201. _, err = this.db.Where("txn_hash = ?", txValue.TxnHash).Update(tx)
  202. er.Check(err)
  203. }
  204. err = this.processContract(block, t.From, tx, receipts[i])
  205. er.Check(err)
  206. err = this.processToken(block, tx, receipts[i])
  207. er.Check(err)
  208. log.Println("processed tx", t.Hash)
  209. }
  210. return nil
  211. }
  212. func (this *Collector) processTxInternal(block *types.Block, txHash string) (err error) {
  213. defer er.Recover(&err)
  214. start := time.Now()
  215. calls, err := this.eth.GetInternalTxsByHash(block.Number().Uint64(), txHash)
  216. er.Check(err)
  217. for _, call := range calls {
  218. value := big.NewInt(0)
  219. if len(call.Value) > 2 {
  220. value, _ = value.SetString(call.Value[2:], 16)
  221. }
  222. txi := models_table.EthTransInter{
  223. CreatedOn: block.Time(),
  224. BlockNo: block.Number().Uint64(),
  225. ParentTxnHash: txHash,
  226. FromAddr: call.From,
  227. ToAddr: call.To,
  228. TxnType: call.Type,
  229. Value: value.String(),
  230. }
  231. _, err := this.db.Insert(&txi)
  232. if err != nil {
  233. _, err = this.db.Where("parent_txn_hash = ?", txHash).Update(txi)
  234. er.Check(err)
  235. }
  236. }
  237. if len(calls) > 0 {
  238. log.Println("processed internal tx", txHash)
  239. }
  240. log.Println("elapsed internal tx ", time.Since(start))
  241. return
  242. }
  243. func (this *Collector) processContract(block *types.Block, txfrom string, tx *types.Transaction, receipt *types.Receipt) (err error) {
  244. defer er.Recover(&err)
  245. log.Println(receipt.ContractAddress.Hex())
  246. if tx == nil || tx.To() != nil || receipt.ContractAddress.Hex() == "0x0000000000000000000000000000000000000000" {
  247. return
  248. }
  249. contract := models_table.EthContract{
  250. CreatedOn: block.Time(),
  251. TxnHash: tx.Hash().Hex(),
  252. BlockNo: block.Number().Uint64(),
  253. CreatorAddr: txfrom,
  254. CntrAddr: receipt.ContractAddress.Hex(),
  255. }
  256. _, err = this.db.Insert(&contract)
  257. if err != nil {
  258. _, err = this.db.Where("txn_hash = ?", tx.Hash().Hex()).Update(contract)
  259. er.Check(err)
  260. }
  261. log.Println("processed contract:", tx.Hash(), receipt.ContractAddress)
  262. return nil
  263. }
  264. func (this *Collector) processToken(block *types.Block, tx *types.Transaction, receipt *types.Receipt) (err error) {
  265. defer er.Recover(&err)
  266. if tx == nil || len(tx.Data()) == 0 {
  267. return
  268. }
  269. tokenAddr := common.Address{}
  270. if tx.To() == nil {
  271. tokenAddr = receipt.ContractAddress
  272. } else {
  273. tokenAddr = *tx.To()
  274. }
  275. token, err := this.eth.GetToken(tokenAddr)
  276. if err != nil {
  277. return nil
  278. }
  279. tokenName, err := token.Name(nil)
  280. if err != nil {
  281. return nil
  282. }
  283. symbol, err := token.Symbol(nil)
  284. if err != nil {
  285. return nil
  286. }
  287. balance, err := token.BalanceOf(nil, tokenAddr)
  288. er.Check(err)
  289. totalSupply, err := token.TotalSupply(nil)
  290. er.Check(err)
  291. tokenValue := models_table.EthToken{
  292. CreatedOn: block.Time(),
  293. CntrAddr: tokenAddr.Hex(),
  294. TokenName: tokenName,
  295. Symbol: symbol,
  296. CurrBalQty: balance.Uint64(),
  297. TotalCirculQty: totalSupply.Uint64(),
  298. }
  299. _, err = this.db.Insert(&tokenValue)
  300. if err != nil {
  301. _, err = this.db.Where("cntr_addr = ?", tokenAddr.Hex()).Update(tokenValue)
  302. er.Check(err)
  303. }
  304. contractAddr := tokenAddr.Hex()
  305. session := this.db.NewSession()
  306. defer session.Close()
  307. err = session.Begin()
  308. er.Check(err)
  309. for _, lg := range receipt.Logs {
  310. if len(lg.Topics) == 0 {
  311. continue
  312. }
  313. if lg.Topics[0].Hex() == eth.LogTransferSignHash.Hex() && len(lg.Data) > 0 {
  314. value := big.Int{}
  315. value.SetBytes(lg.Data)
  316. from := "0x" + lg.Topics[1].Big().Text(16)
  317. to := "0x" + lg.Topics[2].Big().Text(16)
  318. amount := value.Uint64()
  319. log.Printf("Transfer Event from: %s, to: %s, value: %d\n", from, to, amount)
  320. if amount != 0 && from != to {
  321. _, err = session.Where("cntr_addr = ? and holder_addr = ?", contractAddr, from).Cols("created_on",
  322. "total_sum").Decr("total_sum", -amount).Update(&models_table.EthTokenHolder{
  323. CreatedOn: block.Time(),
  324. })
  325. er.Check(err)
  326. affected, err := session.Where("cntr_addr = ? and holder_addr = ?", contractAddr, to).Cols("created_on",
  327. "total_sum").Decr("total_sum", amount).Update(&models_table.EthTokenHolder{
  328. CreatedOn: block.Time(),
  329. })
  330. er.Check(err)
  331. if err == nil && affected == 0 {
  332. _, err = session.Insert(&models_table.EthTokenHolder{
  333. CreatedOn: block.Time(),
  334. CntrAddr: contractAddr,
  335. HolderAddr: to,
  336. TotalSum: amount,
  337. })
  338. er.Check(err)
  339. }
  340. }
  341. log.Printf("processed Transfer Event contract: %s, from: %s, to: %s, value: %d\n", contractAddr, from, to, amount)
  342. }
  343. }
  344. log.Println("processed token and contract:", tx.Hash(), tx.To())
  345. return nil
  346. }
  347. func (this *Collector) handleTxMethod(method string, input string) (from string, to string, err error) {
  348. input = input[2:]
  349. method = method[2:]
  350. args := make([]string, 0)
  351. for len(input) >= 64 {
  352. args = append(args, input[:64])
  353. input = input[64:]
  354. }
  355. switch method {
  356. case model.Method_name, model.Method_symbol, model.Method_decimals, model.Method_totalSupply:
  357. case model.Method_balanceOf2:
  358. from = args[1]
  359. case model.Method_balanceOF, model.Method_transfer, model.Method_withdrawToken, model.Method_depositToken, model.Method_approve:
  360. from = args[0]
  361. case model.Method_transfer2, model.Method_Transfer, model.Method_transferFrom, model.Method_allowance,
  362. model.Method_Approval, model.Method_eventApproval, model.Method_eventTransfer:
  363. from = args[0]
  364. to = args[1]
  365. case model.Method_confirmTransaction, model.Method_balanceOf:
  366. }
  367. return
  368. }