alter_configs_response.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package sarama
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // AlterConfigsResponse is a response type for alter config
  7. type AlterConfigsResponse struct {
  8. Version int16
  9. ThrottleTime time.Duration
  10. Resources []*AlterConfigsResourceResponse
  11. }
  12. type AlterConfigError struct {
  13. Err KError
  14. ErrMsg string
  15. }
  16. func (c *AlterConfigError) Error() string {
  17. text := c.Err.Error()
  18. if c.ErrMsg != "" {
  19. text = fmt.Sprintf("%s - %s", text, c.ErrMsg)
  20. }
  21. return text
  22. }
  23. // AlterConfigsResourceResponse is a response type for alter config resource
  24. type AlterConfigsResourceResponse struct {
  25. ErrorCode int16
  26. ErrorMsg string
  27. Type ConfigResourceType
  28. Name string
  29. }
  30. func (a *AlterConfigsResponse) encode(pe packetEncoder) error {
  31. pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
  32. if err := pe.putArrayLength(len(a.Resources)); err != nil {
  33. return err
  34. }
  35. for _, v := range a.Resources {
  36. if err := v.encode(pe); err != nil {
  37. return err
  38. }
  39. }
  40. return nil
  41. }
  42. func (a *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
  43. throttleTime, err := pd.getInt32()
  44. if err != nil {
  45. return err
  46. }
  47. a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  48. responseCount, err := pd.getArrayLength()
  49. if err != nil {
  50. return err
  51. }
  52. a.Resources = make([]*AlterConfigsResourceResponse, responseCount)
  53. for i := range a.Resources {
  54. a.Resources[i] = new(AlterConfigsResourceResponse)
  55. if err := a.Resources[i].decode(pd, version); err != nil {
  56. return err
  57. }
  58. }
  59. return nil
  60. }
  61. func (a *AlterConfigsResourceResponse) encode(pe packetEncoder) error {
  62. pe.putInt16(a.ErrorCode)
  63. err := pe.putString(a.ErrorMsg)
  64. if err != nil {
  65. return err
  66. }
  67. pe.putInt8(int8(a.Type))
  68. err = pe.putString(a.Name)
  69. if err != nil {
  70. return err
  71. }
  72. return nil
  73. }
  74. func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) error {
  75. errCode, err := pd.getInt16()
  76. if err != nil {
  77. return err
  78. }
  79. a.ErrorCode = errCode
  80. e, err := pd.getString()
  81. if err != nil {
  82. return err
  83. }
  84. a.ErrorMsg = e
  85. t, err := pd.getInt8()
  86. if err != nil {
  87. return err
  88. }
  89. a.Type = ConfigResourceType(t)
  90. name, err := pd.getString()
  91. if err != nil {
  92. return err
  93. }
  94. a.Name = name
  95. return nil
  96. }
  97. func (a *AlterConfigsResponse) key() int16 {
  98. return 33
  99. }
  100. func (a *AlterConfigsResponse) version() int16 {
  101. return a.Version
  102. }
  103. func (a *AlterConfigsResponse) headerVersion() int16 {
  104. return 0
  105. }
  106. func (a *AlterConfigsResponse) isValidVersion() bool {
  107. return a.Version >= 0 && a.Version <= 1
  108. }
  109. func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
  110. switch a.Version {
  111. case 1:
  112. return V2_0_0_0
  113. case 0:
  114. return V0_11_0_0
  115. default:
  116. return V2_0_0_0
  117. }
  118. }
  119. func (r *AlterConfigsResponse) throttleTime() time.Duration {
  120. return r.ThrottleTime
  121. }