describe_log_dirs_request.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package sarama
  2. // DescribeLogDirsRequest is a describe request to get partitions' log size
  3. type DescribeLogDirsRequest struct {
  4. // Version 0 and 1 are equal
  5. // The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
  6. Version int16
  7. // If this is an empty array, all topics will be queried
  8. DescribeTopics []DescribeLogDirsRequestTopic
  9. }
  10. // DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic
  11. type DescribeLogDirsRequestTopic struct {
  12. Topic string
  13. PartitionIDs []int32
  14. }
  15. func (r *DescribeLogDirsRequest) encode(pe packetEncoder) error {
  16. length := len(r.DescribeTopics)
  17. if length == 0 {
  18. // In order to query all topics we must send null
  19. length = -1
  20. }
  21. if err := pe.putArrayLength(length); err != nil {
  22. return err
  23. }
  24. for _, d := range r.DescribeTopics {
  25. if err := pe.putString(d.Topic); err != nil {
  26. return err
  27. }
  28. if err := pe.putInt32Array(d.PartitionIDs); err != nil {
  29. return err
  30. }
  31. }
  32. return nil
  33. }
  34. func (r *DescribeLogDirsRequest) decode(pd packetDecoder, version int16) error {
  35. n, err := pd.getArrayLength()
  36. if err != nil {
  37. return err
  38. }
  39. if n == -1 {
  40. n = 0
  41. }
  42. topics := make([]DescribeLogDirsRequestTopic, n)
  43. for i := 0; i < n; i++ {
  44. topics[i] = DescribeLogDirsRequestTopic{}
  45. topic, err := pd.getString()
  46. if err != nil {
  47. return err
  48. }
  49. topics[i].Topic = topic
  50. pIDs, err := pd.getInt32Array()
  51. if err != nil {
  52. return err
  53. }
  54. topics[i].PartitionIDs = pIDs
  55. }
  56. r.DescribeTopics = topics
  57. return nil
  58. }
  59. func (r *DescribeLogDirsRequest) key() int16 {
  60. return 35
  61. }
  62. func (r *DescribeLogDirsRequest) version() int16 {
  63. return r.Version
  64. }
  65. func (r *DescribeLogDirsRequest) headerVersion() int16 {
  66. return 1
  67. }
  68. func (r *DescribeLogDirsRequest) isValidVersion() bool {
  69. return r.Version >= 0 && r.Version <= 1
  70. }
  71. func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion {
  72. if r.Version > 0 {
  73. return V2_0_0_0
  74. }
  75. return V1_0_0_0
  76. }