delete_offsets_response.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. type DeleteOffsetsResponse struct {
  6. Version int16
  7. // The top-level error code, or 0 if there was no error.
  8. ErrorCode KError
  9. ThrottleTime time.Duration
  10. // The responses for each partition of the topics.
  11. Errors map[string]map[int32]KError
  12. }
  13. func (r *DeleteOffsetsResponse) AddError(topic string, partition int32, errorCode KError) {
  14. if r.Errors == nil {
  15. r.Errors = make(map[string]map[int32]KError)
  16. }
  17. partitions := r.Errors[topic]
  18. if partitions == nil {
  19. partitions = make(map[int32]KError)
  20. r.Errors[topic] = partitions
  21. }
  22. partitions[partition] = errorCode
  23. }
  24. func (r *DeleteOffsetsResponse) encode(pe packetEncoder) error {
  25. pe.putInt16(int16(r.ErrorCode))
  26. pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
  27. if err := pe.putArrayLength(len(r.Errors)); err != nil {
  28. return err
  29. }
  30. for topic, partitions := range r.Errors {
  31. if err := pe.putString(topic); err != nil {
  32. return err
  33. }
  34. if err := pe.putArrayLength(len(partitions)); err != nil {
  35. return err
  36. }
  37. for partition, errorCode := range partitions {
  38. pe.putInt32(partition)
  39. pe.putInt16(int16(errorCode))
  40. }
  41. }
  42. return nil
  43. }
  44. func (r *DeleteOffsetsResponse) decode(pd packetDecoder, version int16) error {
  45. tmpErr, err := pd.getInt16()
  46. if err != nil {
  47. return err
  48. }
  49. r.ErrorCode = KError(tmpErr)
  50. throttleTime, err := pd.getInt32()
  51. if err != nil {
  52. return err
  53. }
  54. r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  55. numTopics, err := pd.getArrayLength()
  56. if err != nil || numTopics == 0 {
  57. return err
  58. }
  59. r.Errors = make(map[string]map[int32]KError, numTopics)
  60. for i := 0; i < numTopics; i++ {
  61. name, err := pd.getString()
  62. if err != nil {
  63. return err
  64. }
  65. numErrors, err := pd.getArrayLength()
  66. if err != nil {
  67. return err
  68. }
  69. r.Errors[name] = make(map[int32]KError, numErrors)
  70. for j := 0; j < numErrors; j++ {
  71. id, err := pd.getInt32()
  72. if err != nil {
  73. return err
  74. }
  75. tmp, err := pd.getInt16()
  76. if err != nil {
  77. return err
  78. }
  79. r.Errors[name][id] = KError(tmp)
  80. }
  81. }
  82. return nil
  83. }
  84. func (r *DeleteOffsetsResponse) key() int16 {
  85. return 47
  86. }
  87. func (r *DeleteOffsetsResponse) version() int16 {
  88. return r.Version
  89. }
  90. func (r *DeleteOffsetsResponse) headerVersion() int16 {
  91. return 0
  92. }
  93. func (r *DeleteOffsetsResponse) isValidVersion() bool {
  94. return r.Version == 0
  95. }
  96. func (r *DeleteOffsetsResponse) requiredVersion() KafkaVersion {
  97. return V2_4_0_0
  98. }
  99. func (r *DeleteOffsetsResponse) throttleTime() time.Duration {
  100. return r.ThrottleTime
  101. }