create_topics_response.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package sarama
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. type CreateTopicsResponse struct {
  7. // Version defines the protocol version to use for encode and decode
  8. Version int16
  9. // ThrottleTime contains the duration for which the request was throttled due
  10. // to a quota violation, or zero if the request did not violate any quota.
  11. ThrottleTime time.Duration
  12. // TopicErrors contains a map of any errors for the topics we tried to create.
  13. TopicErrors map[string]*TopicError
  14. }
  15. func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
  16. if c.Version >= 2 {
  17. pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
  18. }
  19. if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
  20. return err
  21. }
  22. for topic, topicError := range c.TopicErrors {
  23. if err := pe.putString(topic); err != nil {
  24. return err
  25. }
  26. if err := topicError.encode(pe, c.Version); err != nil {
  27. return err
  28. }
  29. }
  30. return nil
  31. }
  32. func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
  33. c.Version = version
  34. if version >= 2 {
  35. throttleTime, err := pd.getInt32()
  36. if err != nil {
  37. return err
  38. }
  39. c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  40. }
  41. n, err := pd.getArrayLength()
  42. if err != nil {
  43. return err
  44. }
  45. c.TopicErrors = make(map[string]*TopicError, n)
  46. for i := 0; i < n; i++ {
  47. topic, err := pd.getString()
  48. if err != nil {
  49. return err
  50. }
  51. c.TopicErrors[topic] = new(TopicError)
  52. if err := c.TopicErrors[topic].decode(pd, version); err != nil {
  53. return err
  54. }
  55. }
  56. return nil
  57. }
  58. func (c *CreateTopicsResponse) key() int16 {
  59. return 19
  60. }
  61. func (c *CreateTopicsResponse) version() int16 {
  62. return c.Version
  63. }
  64. func (c *CreateTopicsResponse) headerVersion() int16 {
  65. return 0
  66. }
  67. func (c *CreateTopicsResponse) isValidVersion() bool {
  68. return c.Version >= 0 && c.Version <= 3
  69. }
  70. func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
  71. switch c.Version {
  72. case 3:
  73. return V2_0_0_0
  74. case 2:
  75. return V0_11_0_0
  76. case 1:
  77. return V0_10_2_0
  78. case 0:
  79. return V0_10_1_0
  80. default:
  81. return V2_8_0_0
  82. }
  83. }
  84. func (r *CreateTopicsResponse) throttleTime() time.Duration {
  85. return r.ThrottleTime
  86. }
  87. type TopicError struct {
  88. Err KError
  89. ErrMsg *string
  90. }
  91. func (t *TopicError) Error() string {
  92. text := t.Err.Error()
  93. if t.ErrMsg != nil {
  94. text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
  95. }
  96. return text
  97. }
  98. func (t *TopicError) Unwrap() error {
  99. return t.Err
  100. }
  101. func (t *TopicError) encode(pe packetEncoder, version int16) error {
  102. pe.putInt16(int16(t.Err))
  103. if version >= 1 {
  104. if err := pe.putNullableString(t.ErrMsg); err != nil {
  105. return err
  106. }
  107. }
  108. return nil
  109. }
  110. func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
  111. kErr, err := pd.getInt16()
  112. if err != nil {
  113. return err
  114. }
  115. t.Err = KError(kErr)
  116. if version >= 1 {
  117. if t.ErrMsg, err = pd.getNullableString(); err != nil {
  118. return err
  119. }
  120. }
  121. return nil
  122. }