end_txn_response.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. type EndTxnResponse struct {
  6. Version int16
  7. ThrottleTime time.Duration
  8. Err KError
  9. }
  10. func (e *EndTxnResponse) encode(pe packetEncoder) error {
  11. pe.putInt32(int32(e.ThrottleTime / time.Millisecond))
  12. pe.putInt16(int16(e.Err))
  13. return nil
  14. }
  15. func (e *EndTxnResponse) decode(pd packetDecoder, version int16) (err error) {
  16. throttleTime, err := pd.getInt32()
  17. if err != nil {
  18. return err
  19. }
  20. e.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
  21. kerr, err := pd.getInt16()
  22. if err != nil {
  23. return err
  24. }
  25. e.Err = KError(kerr)
  26. return nil
  27. }
  28. func (e *EndTxnResponse) key() int16 {
  29. return 26
  30. }
  31. func (e *EndTxnResponse) version() int16 {
  32. return e.Version
  33. }
  34. func (r *EndTxnResponse) headerVersion() int16 {
  35. return 0
  36. }
  37. func (e *EndTxnResponse) isValidVersion() bool {
  38. return e.Version >= 0 && e.Version <= 2
  39. }
  40. func (e *EndTxnResponse) requiredVersion() KafkaVersion {
  41. switch e.Version {
  42. case 2:
  43. return V2_7_0_0
  44. case 1:
  45. return V2_0_0_0
  46. default:
  47. return V0_11_0_0
  48. }
  49. }
  50. func (r *EndTxnResponse) throttleTime() time.Duration {
  51. return r.ThrottleTime
  52. }