consumer_metadata_response.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package sarama
  2. import (
  3. "net"
  4. "strconv"
  5. )
  6. // ConsumerMetadataResponse holds the response for a consumer group meta data requests
  7. type ConsumerMetadataResponse struct {
  8. Version int16
  9. Err KError
  10. Coordinator *Broker
  11. CoordinatorID int32 // deprecated: use Coordinator.ID()
  12. CoordinatorHost string // deprecated: use Coordinator.Addr()
  13. CoordinatorPort int32 // deprecated: use Coordinator.Addr()
  14. }
  15. func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
  16. tmp := new(FindCoordinatorResponse)
  17. if err := tmp.decode(pd, version); err != nil {
  18. return err
  19. }
  20. r.Err = tmp.Err
  21. r.Coordinator = tmp.Coordinator
  22. if tmp.Coordinator == nil {
  23. return nil
  24. }
  25. // this can all go away in 2.0, but we have to fill in deprecated fields to maintain
  26. // backwards compatibility
  27. host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
  28. if err != nil {
  29. return err
  30. }
  31. port, err := strconv.ParseInt(portstr, 10, 32)
  32. if err != nil {
  33. return err
  34. }
  35. r.CoordinatorID = r.Coordinator.ID()
  36. r.CoordinatorHost = host
  37. r.CoordinatorPort = int32(port)
  38. return nil
  39. }
  40. func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
  41. if r.Coordinator == nil {
  42. r.Coordinator = new(Broker)
  43. r.Coordinator.id = r.CoordinatorID
  44. r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
  45. }
  46. tmp := &FindCoordinatorResponse{
  47. Version: r.Version,
  48. Err: r.Err,
  49. Coordinator: r.Coordinator,
  50. }
  51. if err := tmp.encode(pe); err != nil {
  52. return err
  53. }
  54. return nil
  55. }
  56. func (r *ConsumerMetadataResponse) key() int16 {
  57. return 10
  58. }
  59. func (r *ConsumerMetadataResponse) version() int16 {
  60. return r.Version
  61. }
  62. func (r *ConsumerMetadataResponse) headerVersion() int16 {
  63. return 0
  64. }
  65. func (r *ConsumerMetadataResponse) isValidVersion() bool {
  66. return r.Version >= 0 && r.Version <= 2
  67. }
  68. func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
  69. switch r.Version {
  70. case 2:
  71. return V2_0_0_0
  72. case 1:
  73. return V0_11_0_0
  74. default:
  75. return V0_8_2_0
  76. }
  77. }