delete_topics_request.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package sarama
  2. import "time"
  3. type DeleteTopicsRequest struct {
  4. Version int16
  5. Topics []string
  6. Timeout time.Duration
  7. }
  8. func (d *DeleteTopicsRequest) encode(pe packetEncoder) error {
  9. if err := pe.putStringArray(d.Topics); err != nil {
  10. return err
  11. }
  12. pe.putInt32(int32(d.Timeout / time.Millisecond))
  13. return nil
  14. }
  15. func (d *DeleteTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
  16. if d.Topics, err = pd.getStringArray(); err != nil {
  17. return err
  18. }
  19. timeout, err := pd.getInt32()
  20. if err != nil {
  21. return err
  22. }
  23. d.Timeout = time.Duration(timeout) * time.Millisecond
  24. d.Version = version
  25. return nil
  26. }
  27. func (d *DeleteTopicsRequest) key() int16 {
  28. return 20
  29. }
  30. func (d *DeleteTopicsRequest) version() int16 {
  31. return d.Version
  32. }
  33. func (d *DeleteTopicsRequest) headerVersion() int16 {
  34. return 1
  35. }
  36. func (d *DeleteTopicsRequest) isValidVersion() bool {
  37. return d.Version >= 0 && d.Version <= 3
  38. }
  39. func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion {
  40. switch d.Version {
  41. case 3:
  42. return V2_1_0_0
  43. case 2:
  44. return V2_0_0_0
  45. case 1:
  46. return V0_11_0_0
  47. case 0:
  48. return V0_10_1_0
  49. default:
  50. return V2_2_0_0
  51. }
  52. }