txn_offset_commit_response.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. type TxnOffsetCommitResponse struct {
  6. Version int16
  7. ThrottleTime time.Duration
  8. Topics map[string][]*PartitionError
  9. }
  10. func (t *TxnOffsetCommitResponse) encode(pe packetEncoder) error {
  11. pe.putInt32(int32(t.ThrottleTime / time.Millisecond))
  12. if err := pe.putArrayLength(len(t.Topics)); err != nil {
  13. return err
  14. }
  15. for topic, e := range t.Topics {
  16. if err := pe.putString(topic); err != nil {
  17. return err
  18. }
  19. if err := pe.putArrayLength(len(e)); err != nil {
  20. return err
  21. }
  22. for _, partitionError := range e {
  23. if err := partitionError.encode(pe); err != nil {
  24. return err
  25. }
  26. }
  27. }
  28. return nil
  29. }
  30. func (t *TxnOffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
  31. t.Version = version
  32. throttleTime, err := pd.getInt32()
  33. if err != nil {
  34. return err
  35. }
  36. t.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  37. n, err := pd.getArrayLength()
  38. if err != nil {
  39. return err
  40. }
  41. t.Topics = make(map[string][]*PartitionError)
  42. for i := 0; i < n; i++ {
  43. topic, err := pd.getString()
  44. if err != nil {
  45. return err
  46. }
  47. m, err := pd.getArrayLength()
  48. if err != nil {
  49. return err
  50. }
  51. t.Topics[topic] = make([]*PartitionError, m)
  52. for j := 0; j < m; j++ {
  53. t.Topics[topic][j] = new(PartitionError)
  54. if err := t.Topics[topic][j].decode(pd, version); err != nil {
  55. return err
  56. }
  57. }
  58. }
  59. return nil
  60. }
  61. func (a *TxnOffsetCommitResponse) key() int16 {
  62. return 28
  63. }
  64. func (a *TxnOffsetCommitResponse) version() int16 {
  65. return a.Version
  66. }
  67. func (a *TxnOffsetCommitResponse) headerVersion() int16 {
  68. return 0
  69. }
  70. func (a *TxnOffsetCommitResponse) isValidVersion() bool {
  71. return a.Version >= 0 && a.Version <= 2
  72. }
  73. func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion {
  74. switch a.Version {
  75. case 2:
  76. return V2_1_0_0
  77. case 1:
  78. return V2_0_0_0
  79. case 0:
  80. return V0_11_0_0
  81. default:
  82. return V2_1_0_0
  83. }
  84. }
  85. func (r *TxnOffsetCommitResponse) throttleTime() time.Duration {
  86. return r.ThrottleTime
  87. }