metadata_request.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package sarama
  2. import "encoding/base64"
  3. type Uuid [16]byte
  4. func (u Uuid) String() string {
  5. return base64.URLEncoding.WithPadding(base64.NoPadding).EncodeToString(u[:])
  6. }
  7. var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
  8. type MetadataRequest struct {
  9. // Version defines the protocol version to use for encode and decode
  10. Version int16
  11. // Topics contains the topics to fetch metadata for.
  12. Topics []string
  13. // AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
  14. AllowAutoTopicCreation bool
  15. IncludeClusterAuthorizedOperations bool // version 8 and up
  16. IncludeTopicAuthorizedOperations bool // version 8 and up
  17. }
  18. func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest {
  19. m := &MetadataRequest{Topics: topics}
  20. if version.IsAtLeast(V2_8_0_0) {
  21. m.Version = 10
  22. } else if version.IsAtLeast(V2_4_0_0) {
  23. m.Version = 9
  24. } else if version.IsAtLeast(V2_4_0_0) {
  25. m.Version = 8
  26. } else if version.IsAtLeast(V2_1_0_0) {
  27. m.Version = 7
  28. } else if version.IsAtLeast(V2_0_0_0) {
  29. m.Version = 6
  30. } else if version.IsAtLeast(V1_0_0_0) {
  31. m.Version = 5
  32. } else if version.IsAtLeast(V0_11_0_0) {
  33. m.Version = 4
  34. } else if version.IsAtLeast(V0_10_1_0) {
  35. m.Version = 2
  36. } else if version.IsAtLeast(V0_10_0_0) {
  37. m.Version = 1
  38. }
  39. return m
  40. }
  41. func (r *MetadataRequest) encode(pe packetEncoder) (err error) {
  42. if r.Version < 0 || r.Version > 10 {
  43. return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
  44. }
  45. if r.Version == 0 || len(r.Topics) > 0 {
  46. if r.Version < 9 {
  47. err := pe.putArrayLength(len(r.Topics))
  48. if err != nil {
  49. return err
  50. }
  51. for i := range r.Topics {
  52. err = pe.putString(r.Topics[i])
  53. if err != nil {
  54. return err
  55. }
  56. }
  57. } else if r.Version == 9 {
  58. pe.putCompactArrayLength(len(r.Topics))
  59. for _, topicName := range r.Topics {
  60. if err := pe.putCompactString(topicName); err != nil {
  61. return err
  62. }
  63. pe.putEmptyTaggedFieldArray()
  64. }
  65. } else { // r.Version = 10
  66. pe.putCompactArrayLength(len(r.Topics))
  67. for _, topicName := range r.Topics {
  68. if err := pe.putRawBytes(NullUUID); err != nil {
  69. return err
  70. }
  71. // Avoid implicit memory aliasing in for loop
  72. tn := topicName
  73. if err := pe.putNullableCompactString(&tn); err != nil {
  74. return err
  75. }
  76. pe.putEmptyTaggedFieldArray()
  77. }
  78. }
  79. } else {
  80. if r.Version < 9 {
  81. pe.putInt32(-1)
  82. } else {
  83. pe.putCompactArrayLength(-1)
  84. }
  85. }
  86. if r.Version > 3 {
  87. pe.putBool(r.AllowAutoTopicCreation)
  88. }
  89. if r.Version > 7 {
  90. pe.putBool(r.IncludeClusterAuthorizedOperations)
  91. pe.putBool(r.IncludeTopicAuthorizedOperations)
  92. }
  93. if r.Version > 8 {
  94. pe.putEmptyTaggedFieldArray()
  95. }
  96. return nil
  97. }
  98. func (r *MetadataRequest) decode(pd packetDecoder, version int16) (err error) {
  99. r.Version = version
  100. if r.Version < 9 {
  101. size, err := pd.getInt32()
  102. if err != nil {
  103. return err
  104. }
  105. if size > 0 {
  106. r.Topics = make([]string, size)
  107. for i := range r.Topics {
  108. topic, err := pd.getString()
  109. if err != nil {
  110. return err
  111. }
  112. r.Topics[i] = topic
  113. }
  114. }
  115. } else if r.Version == 9 {
  116. size, err := pd.getCompactArrayLength()
  117. if err != nil {
  118. return err
  119. }
  120. if size > 0 {
  121. r.Topics = make([]string, size)
  122. }
  123. for i := range r.Topics {
  124. topic, err := pd.getCompactString()
  125. if err != nil {
  126. return err
  127. }
  128. r.Topics[i] = topic
  129. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  130. return err
  131. }
  132. }
  133. } else { // version 10+
  134. size, err := pd.getCompactArrayLength()
  135. if err != nil {
  136. return err
  137. }
  138. if size > 0 {
  139. r.Topics = make([]string, size)
  140. }
  141. for i := range r.Topics {
  142. if _, err = pd.getRawBytes(16); err != nil { // skip UUID
  143. return err
  144. }
  145. topic, err := pd.getCompactNullableString()
  146. if err != nil {
  147. return err
  148. }
  149. if topic != nil {
  150. r.Topics[i] = *topic
  151. }
  152. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  153. return err
  154. }
  155. }
  156. }
  157. if r.Version >= 4 {
  158. if r.AllowAutoTopicCreation, err = pd.getBool(); err != nil {
  159. return err
  160. }
  161. }
  162. if r.Version > 7 {
  163. includeClusterAuthz, err := pd.getBool()
  164. if err != nil {
  165. return err
  166. }
  167. r.IncludeClusterAuthorizedOperations = includeClusterAuthz
  168. includeTopicAuthz, err := pd.getBool()
  169. if err != nil {
  170. return err
  171. }
  172. r.IncludeTopicAuthorizedOperations = includeTopicAuthz
  173. }
  174. if r.Version > 8 {
  175. if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
  176. return err
  177. }
  178. }
  179. return nil
  180. }
  181. func (r *MetadataRequest) key() int16 {
  182. return 3
  183. }
  184. func (r *MetadataRequest) version() int16 {
  185. return r.Version
  186. }
  187. func (r *MetadataRequest) headerVersion() int16 {
  188. if r.Version >= 9 {
  189. return 2
  190. }
  191. return 1
  192. }
  193. func (r *MetadataRequest) isValidVersion() bool {
  194. return r.Version >= 0 && r.Version <= 10
  195. }
  196. func (r *MetadataRequest) requiredVersion() KafkaVersion {
  197. switch r.Version {
  198. case 10:
  199. return V2_8_0_0
  200. case 9:
  201. return V2_4_0_0
  202. case 8:
  203. return V2_3_0_0
  204. case 7:
  205. return V2_1_0_0
  206. case 6:
  207. return V2_0_0_0
  208. case 5:
  209. return V1_0_0_0
  210. case 3, 4:
  211. return V0_11_0_0
  212. case 2:
  213. return V0_10_1_0
  214. case 1:
  215. return V0_10_0_0
  216. case 0:
  217. return V0_8_2_0
  218. default:
  219. return V2_8_0_0
  220. }
  221. }