create_topics_request.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package sarama
  2. import (
  3. "time"
  4. )
  5. type CreateTopicsRequest struct {
  6. // Version defines the protocol version to use for encode and decode
  7. Version int16
  8. // TopicDetails contains the topics to create.
  9. TopicDetails map[string]*TopicDetail
  10. // Timeout contains how long to wait before timing out the request.
  11. Timeout time.Duration
  12. // ValidateOnly if true, check that the topics can be created as specified,
  13. // but don't create anything.
  14. ValidateOnly bool
  15. }
  16. func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
  17. if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
  18. return err
  19. }
  20. for topic, detail := range c.TopicDetails {
  21. if err := pe.putString(topic); err != nil {
  22. return err
  23. }
  24. if err := detail.encode(pe); err != nil {
  25. return err
  26. }
  27. }
  28. pe.putInt32(int32(c.Timeout / time.Millisecond))
  29. if c.Version >= 1 {
  30. pe.putBool(c.ValidateOnly)
  31. }
  32. return nil
  33. }
  34. func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
  35. n, err := pd.getArrayLength()
  36. if err != nil {
  37. return err
  38. }
  39. c.TopicDetails = make(map[string]*TopicDetail, n)
  40. for i := 0; i < n; i++ {
  41. topic, err := pd.getString()
  42. if err != nil {
  43. return err
  44. }
  45. c.TopicDetails[topic] = new(TopicDetail)
  46. if err = c.TopicDetails[topic].decode(pd, version); err != nil {
  47. return err
  48. }
  49. }
  50. timeout, err := pd.getInt32()
  51. if err != nil {
  52. return err
  53. }
  54. c.Timeout = time.Duration(timeout) * time.Millisecond
  55. if version >= 1 {
  56. c.ValidateOnly, err = pd.getBool()
  57. if err != nil {
  58. return err
  59. }
  60. c.Version = version
  61. }
  62. return nil
  63. }
  64. func (c *CreateTopicsRequest) key() int16 {
  65. return 19
  66. }
  67. func (c *CreateTopicsRequest) version() int16 {
  68. return c.Version
  69. }
  70. func (r *CreateTopicsRequest) headerVersion() int16 {
  71. return 1
  72. }
  73. func (c *CreateTopicsRequest) isValidVersion() bool {
  74. return c.Version >= 0 && c.Version <= 3
  75. }
  76. func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
  77. switch c.Version {
  78. case 3:
  79. return V2_0_0_0
  80. case 2:
  81. return V0_11_0_0
  82. case 1:
  83. return V0_10_2_0
  84. case 0:
  85. return V0_10_1_0
  86. default:
  87. return V2_8_0_0
  88. }
  89. }
  90. type TopicDetail struct {
  91. // NumPartitions contains the number of partitions to create in the topic, or
  92. // -1 if we are either specifying a manual partition assignment or using the
  93. // default partitions.
  94. NumPartitions int32
  95. // ReplicationFactor contains the number of replicas to create for each
  96. // partition in the topic, or -1 if we are either specifying a manual
  97. // partition assignment or using the default replication factor.
  98. ReplicationFactor int16
  99. // ReplicaAssignment contains the manual partition assignment, or the empty
  100. // array if we are using automatic assignment.
  101. ReplicaAssignment map[int32][]int32
  102. // ConfigEntries contains the custom topic configurations to set.
  103. ConfigEntries map[string]*string
  104. }
  105. func (t *TopicDetail) encode(pe packetEncoder) error {
  106. pe.putInt32(t.NumPartitions)
  107. pe.putInt16(t.ReplicationFactor)
  108. if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
  109. return err
  110. }
  111. for partition, assignment := range t.ReplicaAssignment {
  112. pe.putInt32(partition)
  113. if err := pe.putInt32Array(assignment); err != nil {
  114. return err
  115. }
  116. }
  117. if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
  118. return err
  119. }
  120. for configKey, configValue := range t.ConfigEntries {
  121. if err := pe.putString(configKey); err != nil {
  122. return err
  123. }
  124. if err := pe.putNullableString(configValue); err != nil {
  125. return err
  126. }
  127. }
  128. return nil
  129. }
  130. func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
  131. if t.NumPartitions, err = pd.getInt32(); err != nil {
  132. return err
  133. }
  134. if t.ReplicationFactor, err = pd.getInt16(); err != nil {
  135. return err
  136. }
  137. n, err := pd.getArrayLength()
  138. if err != nil {
  139. return err
  140. }
  141. if n > 0 {
  142. t.ReplicaAssignment = make(map[int32][]int32, n)
  143. for i := 0; i < n; i++ {
  144. replica, err := pd.getInt32()
  145. if err != nil {
  146. return err
  147. }
  148. if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
  149. return err
  150. }
  151. }
  152. }
  153. n, err = pd.getArrayLength()
  154. if err != nil {
  155. return err
  156. }
  157. if n > 0 {
  158. t.ConfigEntries = make(map[string]*string, n)
  159. for i := 0; i < n; i++ {
  160. configKey, err := pd.getString()
  161. if err != nil {
  162. return err
  163. }
  164. if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
  165. return err
  166. }
  167. }
  168. }
  169. return nil
  170. }