metadata_response.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. package sarama
  2. import "time"
  3. // PartitionMetadata contains each partition in the topic.
  4. type PartitionMetadata struct {
  5. // Version defines the protocol version to use for encode and decode
  6. Version int16
  7. // Err contains the partition error, or 0 if there was no error.
  8. Err KError
  9. // ID contains the partition index.
  10. ID int32
  11. // Leader contains the ID of the leader broker.
  12. Leader int32
  13. // LeaderEpoch contains the leader epoch of this partition.
  14. LeaderEpoch int32
  15. // Replicas contains the set of all nodes that host this partition.
  16. Replicas []int32
  17. // Isr contains the set of nodes that are in sync with the leader for this partition.
  18. Isr []int32
  19. // OfflineReplicas contains the set of offline replicas of this partition.
  20. OfflineReplicas []int32
  21. }
  22. func (p *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
  23. p.Version = version
  24. tmp, err := pd.getInt16()
  25. if err != nil {
  26. return err
  27. }
  28. p.Err = KError(tmp)
  29. if p.ID, err = pd.getInt32(); err != nil {
  30. return err
  31. }
  32. if p.Leader, err = pd.getInt32(); err != nil {
  33. return err
  34. }
  35. if p.Version >= 7 {
  36. if p.LeaderEpoch, err = pd.getInt32(); err != nil {
  37. return err
  38. }
  39. }
  40. if p.Version < 9 {
  41. p.Replicas, err = pd.getInt32Array()
  42. } else {
  43. p.Replicas, err = pd.getCompactInt32Array()
  44. }
  45. if err != nil {
  46. return err
  47. }
  48. if p.Version < 9 {
  49. p.Isr, err = pd.getInt32Array()
  50. } else {
  51. p.Isr, err = pd.getCompactInt32Array()
  52. }
  53. if err != nil {
  54. return err
  55. }
  56. if p.Version >= 5 {
  57. if p.Version < 9 {
  58. p.OfflineReplicas, err = pd.getInt32Array()
  59. } else {
  60. p.OfflineReplicas, err = pd.getCompactInt32Array()
  61. }
  62. if err != nil {
  63. return err
  64. }
  65. }
  66. if p.Version >= 9 {
  67. _, err = pd.getEmptyTaggedFieldArray()
  68. if err != nil {
  69. return err
  70. }
  71. }
  72. return nil
  73. }
  74. func (p *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
  75. p.Version = version
  76. pe.putInt16(int16(p.Err))
  77. pe.putInt32(p.ID)
  78. pe.putInt32(p.Leader)
  79. if p.Version >= 7 {
  80. pe.putInt32(p.LeaderEpoch)
  81. }
  82. if p.Version < 9 {
  83. err = pe.putInt32Array(p.Replicas)
  84. } else {
  85. err = pe.putCompactInt32Array(p.Replicas)
  86. }
  87. if err != nil {
  88. return err
  89. }
  90. if p.Version < 9 {
  91. err = pe.putInt32Array(p.Isr)
  92. } else {
  93. err = pe.putCompactInt32Array(p.Isr)
  94. }
  95. if err != nil {
  96. return err
  97. }
  98. if p.Version >= 5 {
  99. if p.Version < 9 {
  100. err = pe.putInt32Array(p.OfflineReplicas)
  101. } else {
  102. err = pe.putCompactInt32Array(p.OfflineReplicas)
  103. }
  104. if err != nil {
  105. return err
  106. }
  107. }
  108. if p.Version >= 9 {
  109. pe.putEmptyTaggedFieldArray()
  110. }
  111. return nil
  112. }
  113. // TopicMetadata contains each topic in the response.
  114. type TopicMetadata struct {
  115. // Version defines the protocol version to use for encode and decode
  116. Version int16
  117. // Err contains the topic error, or 0 if there was no error.
  118. Err KError
  119. // Name contains the topic name.
  120. Name string
  121. Uuid Uuid
  122. // IsInternal contains a True if the topic is internal.
  123. IsInternal bool
  124. // Partitions contains each partition in the topic.
  125. Partitions []*PartitionMetadata
  126. TopicAuthorizedOperations int32 // Only valid for Version >= 8
  127. }
  128. func (t *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
  129. t.Version = version
  130. tmp, err := pd.getInt16()
  131. if err != nil {
  132. return err
  133. }
  134. t.Err = KError(tmp)
  135. if t.Version < 9 {
  136. t.Name, err = pd.getString()
  137. } else {
  138. t.Name, err = pd.getCompactString()
  139. }
  140. if err != nil {
  141. return err
  142. }
  143. if t.Version >= 10 {
  144. uuid, err := pd.getRawBytes(16)
  145. if err != nil {
  146. return err
  147. }
  148. t.Uuid = [16]byte{}
  149. for i := 0; i < 16; i++ {
  150. t.Uuid[i] = uuid[i]
  151. }
  152. }
  153. if t.Version >= 1 {
  154. t.IsInternal, err = pd.getBool()
  155. if err != nil {
  156. return err
  157. }
  158. }
  159. var n int
  160. if t.Version < 9 {
  161. n, err = pd.getArrayLength()
  162. } else {
  163. n, err = pd.getCompactArrayLength()
  164. }
  165. if err != nil {
  166. return err
  167. } else {
  168. t.Partitions = make([]*PartitionMetadata, n)
  169. for i := 0; i < n; i++ {
  170. block := &PartitionMetadata{}
  171. if err := block.decode(pd, t.Version); err != nil {
  172. return err
  173. }
  174. t.Partitions[i] = block
  175. }
  176. }
  177. if t.Version >= 8 {
  178. t.TopicAuthorizedOperations, err = pd.getInt32()
  179. if err != nil {
  180. return err
  181. }
  182. }
  183. if t.Version >= 9 {
  184. _, err = pd.getEmptyTaggedFieldArray()
  185. if err != nil {
  186. return err
  187. }
  188. }
  189. return nil
  190. }
  191. func (t *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
  192. t.Version = version
  193. pe.putInt16(int16(t.Err))
  194. if t.Version < 9 {
  195. err = pe.putString(t.Name)
  196. } else {
  197. err = pe.putCompactString(t.Name)
  198. }
  199. if err != nil {
  200. return err
  201. }
  202. if t.Version >= 10 {
  203. err = pe.putRawBytes(t.Uuid[:])
  204. if err != nil {
  205. return err
  206. }
  207. }
  208. if t.Version >= 1 {
  209. pe.putBool(t.IsInternal)
  210. }
  211. if t.Version < 9 {
  212. err = pe.putArrayLength(len(t.Partitions))
  213. if err != nil {
  214. return err
  215. }
  216. } else {
  217. pe.putCompactArrayLength(len(t.Partitions))
  218. }
  219. for _, block := range t.Partitions {
  220. if err := block.encode(pe, t.Version); err != nil {
  221. return err
  222. }
  223. }
  224. if t.Version >= 8 {
  225. pe.putInt32(t.TopicAuthorizedOperations)
  226. }
  227. if t.Version >= 9 {
  228. pe.putEmptyTaggedFieldArray()
  229. }
  230. return nil
  231. }
  232. type MetadataResponse struct {
  233. // Version defines the protocol version to use for encode and decode
  234. Version int16
  235. // ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
  236. ThrottleTimeMs int32
  237. // Brokers contains each broker in the response.
  238. Brokers []*Broker
  239. // ClusterID contains the cluster ID that responding broker belongs to.
  240. ClusterID *string
  241. // ControllerID contains the ID of the controller broker.
  242. ControllerID int32
  243. // Topics contains each topic in the response.
  244. Topics []*TopicMetadata
  245. ClusterAuthorizedOperations int32 // Only valid for Version >= 8
  246. }
  247. func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
  248. r.Version = version
  249. if r.Version >= 3 {
  250. if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
  251. return err
  252. }
  253. }
  254. var brokerArrayLen int
  255. if r.Version < 9 {
  256. brokerArrayLen, err = pd.getArrayLength()
  257. } else {
  258. brokerArrayLen, err = pd.getCompactArrayLength()
  259. }
  260. if err != nil {
  261. return err
  262. }
  263. r.Brokers = make([]*Broker, brokerArrayLen)
  264. for i := 0; i < brokerArrayLen; i++ {
  265. r.Brokers[i] = new(Broker)
  266. err = r.Brokers[i].decode(pd, version)
  267. if err != nil {
  268. return err
  269. }
  270. }
  271. if r.Version >= 2 {
  272. if r.Version < 9 {
  273. r.ClusterID, err = pd.getNullableString()
  274. } else {
  275. r.ClusterID, err = pd.getCompactNullableString()
  276. }
  277. if err != nil {
  278. return err
  279. }
  280. }
  281. if r.Version >= 1 {
  282. if r.ControllerID, err = pd.getInt32(); err != nil {
  283. return err
  284. }
  285. }
  286. var topicArrayLen int
  287. if version < 9 {
  288. topicArrayLen, err = pd.getArrayLength()
  289. } else {
  290. topicArrayLen, err = pd.getCompactArrayLength()
  291. }
  292. if err != nil {
  293. return err
  294. }
  295. r.Topics = make([]*TopicMetadata, topicArrayLen)
  296. for i := 0; i < topicArrayLen; i++ {
  297. r.Topics[i] = new(TopicMetadata)
  298. err = r.Topics[i].decode(pd, version)
  299. if err != nil {
  300. return err
  301. }
  302. }
  303. if r.Version >= 8 {
  304. r.ClusterAuthorizedOperations, err = pd.getInt32()
  305. if err != nil {
  306. return err
  307. }
  308. }
  309. if r.Version >= 9 {
  310. _, err := pd.getEmptyTaggedFieldArray()
  311. if err != nil {
  312. return err
  313. }
  314. }
  315. return nil
  316. }
  317. func (r *MetadataResponse) encode(pe packetEncoder) (err error) {
  318. if r.Version >= 3 {
  319. pe.putInt32(r.ThrottleTimeMs)
  320. }
  321. if r.Version < 9 {
  322. err = pe.putArrayLength(len(r.Brokers))
  323. if err != nil {
  324. return err
  325. }
  326. } else {
  327. pe.putCompactArrayLength(len(r.Brokers))
  328. }
  329. for _, broker := range r.Brokers {
  330. err = broker.encode(pe, r.Version)
  331. if err != nil {
  332. return err
  333. }
  334. }
  335. if r.Version >= 2 {
  336. if r.Version < 9 {
  337. err = pe.putNullableString(r.ClusterID)
  338. if err != nil {
  339. return err
  340. }
  341. } else {
  342. err = pe.putNullableCompactString(r.ClusterID)
  343. if err != nil {
  344. return err
  345. }
  346. }
  347. }
  348. if r.Version >= 1 {
  349. pe.putInt32(r.ControllerID)
  350. }
  351. if r.Version < 9 {
  352. err = pe.putArrayLength(len(r.Topics))
  353. } else {
  354. pe.putCompactArrayLength(len(r.Topics))
  355. }
  356. if err != nil {
  357. return err
  358. }
  359. for _, block := range r.Topics {
  360. if err := block.encode(pe, r.Version); err != nil {
  361. return err
  362. }
  363. }
  364. if r.Version >= 8 {
  365. pe.putInt32(r.ClusterAuthorizedOperations)
  366. }
  367. if r.Version >= 9 {
  368. pe.putEmptyTaggedFieldArray()
  369. }
  370. return nil
  371. }
  372. func (r *MetadataResponse) key() int16 {
  373. return 3
  374. }
  375. func (r *MetadataResponse) version() int16 {
  376. return r.Version
  377. }
  378. func (r *MetadataResponse) headerVersion() int16 {
  379. if r.Version < 9 {
  380. return 0
  381. } else {
  382. return 1
  383. }
  384. }
  385. func (r *MetadataResponse) isValidVersion() bool {
  386. return r.Version >= 0 && r.Version <= 7
  387. }
  388. func (r *MetadataResponse) requiredVersion() KafkaVersion {
  389. switch r.Version {
  390. case 10:
  391. return V2_8_0_0
  392. case 9:
  393. return V2_4_0_0
  394. case 8:
  395. return V2_3_0_0
  396. case 7:
  397. return V2_1_0_0
  398. case 6:
  399. return V2_0_0_0
  400. case 5:
  401. return V1_0_0_0
  402. case 3, 4:
  403. return V0_11_0_0
  404. case 2:
  405. return V0_10_1_0
  406. case 1:
  407. return V0_10_0_0
  408. case 0:
  409. return V0_8_2_0
  410. default:
  411. return V2_8_0_0
  412. }
  413. }
  414. func (r *MetadataResponse) throttleTime() time.Duration {
  415. return time.Duration(r.ThrottleTimeMs) * time.Millisecond
  416. }
  417. // testing API
  418. func (r *MetadataResponse) AddBroker(addr string, id int32) {
  419. r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
  420. }
  421. func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
  422. var tmatch *TopicMetadata
  423. for _, tm := range r.Topics {
  424. if tm.Name == topic {
  425. tmatch = tm
  426. goto foundTopic
  427. }
  428. }
  429. tmatch = new(TopicMetadata)
  430. tmatch.Name = topic
  431. r.Topics = append(r.Topics, tmatch)
  432. foundTopic:
  433. tmatch.Err = err
  434. return tmatch
  435. }
  436. func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
  437. tmatch := r.AddTopic(topic, ErrNoError)
  438. var pmatch *PartitionMetadata
  439. for _, pm := range tmatch.Partitions {
  440. if pm.ID == partition {
  441. pmatch = pm
  442. goto foundPartition
  443. }
  444. }
  445. pmatch = new(PartitionMetadata)
  446. pmatch.ID = partition
  447. tmatch.Partitions = append(tmatch.Partitions, pmatch)
  448. foundPartition:
  449. pmatch.Leader = brokerID
  450. pmatch.Replicas = replicas
  451. if pmatch.Replicas == nil {
  452. pmatch.Replicas = []int32{}
  453. }
  454. pmatch.Isr = isr
  455. if pmatch.Isr == nil {
  456. pmatch.Isr = []int32{}
  457. }
  458. pmatch.OfflineReplicas = offline
  459. if pmatch.OfflineReplicas == nil {
  460. pmatch.OfflineReplicas = []int32{}
  461. }
  462. pmatch.Err = err
  463. }