partitioner.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package sarama
  2. import (
  3. "hash"
  4. "hash/crc32"
  5. "hash/fnv"
  6. "math/rand"
  7. "time"
  8. )
  9. // Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
  10. // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
  11. // as simple default implementations.
  12. type Partitioner interface {
  13. // Partition takes a message and partition count and chooses a partition
  14. Partition(message *ProducerMessage, numPartitions int32) (int32, error)
  15. // RequiresConsistency indicates to the user of the partitioner whether the
  16. // mapping of key->partition is consistent or not. Specifically, if a
  17. // partitioner requires consistency then it must be allowed to choose from all
  18. // partitions (even ones known to be unavailable), and its choice must be
  19. // respected by the caller. The obvious example is the HashPartitioner.
  20. RequiresConsistency() bool
  21. }
  22. // DynamicConsistencyPartitioner can optionally be implemented by Partitioners
  23. // in order to allow more flexibility than is originally allowed by the
  24. // RequiresConsistency method in the Partitioner interface. This allows
  25. // partitioners to require consistency sometimes, but not all times. It's useful
  26. // for, e.g., the HashPartitioner, which does not require consistency if the
  27. // message key is nil.
  28. type DynamicConsistencyPartitioner interface {
  29. Partitioner
  30. // MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
  31. // but takes in the message being partitioned so that the partitioner can
  32. // make a per-message determination.
  33. MessageRequiresConsistency(message *ProducerMessage) bool
  34. }
  35. // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
  36. type PartitionerConstructor func(topic string) Partitioner
  37. type manualPartitioner struct{}
  38. // HashPartitionerOption lets you modify default values of the partitioner
  39. type HashPartitionerOption func(*hashPartitioner)
  40. // WithAbsFirst means that the partitioner handles absolute values
  41. // in the same way as the reference Java implementation
  42. func WithAbsFirst() HashPartitionerOption {
  43. return func(hp *hashPartitioner) {
  44. hp.referenceAbs = true
  45. }
  46. }
  47. // WithHashUnsigned means the partitioner treats the hashed value as unsigned when
  48. // partitioning. This is intended to be combined with the crc32 hash algorithm to
  49. // be compatible with librdkafka's implementation
  50. func WithHashUnsigned() HashPartitionerOption {
  51. return func(hp *hashPartitioner) {
  52. hp.hashUnsigned = true
  53. }
  54. }
  55. // WithCustomHashFunction lets you specify what hash function to use for the partitioning
  56. func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
  57. return func(hp *hashPartitioner) {
  58. hp.hasher = hasher()
  59. }
  60. }
  61. // WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
  62. func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption {
  63. return func(hp *hashPartitioner) {
  64. hp.random = randomHP
  65. }
  66. }
  67. // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
  68. // ProducerMessage's Partition field as the partition to produce to.
  69. func NewManualPartitioner(topic string) Partitioner {
  70. return new(manualPartitioner)
  71. }
  72. func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  73. return message.Partition, nil
  74. }
  75. func (p *manualPartitioner) RequiresConsistency() bool {
  76. return true
  77. }
  78. type randomPartitioner struct {
  79. generator *rand.Rand
  80. }
  81. // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
  82. func NewRandomPartitioner(topic string) Partitioner {
  83. p := new(randomPartitioner)
  84. p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
  85. return p
  86. }
  87. func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  88. return int32(p.generator.Intn(int(numPartitions))), nil
  89. }
  90. func (p *randomPartitioner) RequiresConsistency() bool {
  91. return false
  92. }
  93. type roundRobinPartitioner struct {
  94. partition int32
  95. }
  96. // NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
  97. func NewRoundRobinPartitioner(topic string) Partitioner {
  98. return &roundRobinPartitioner{}
  99. }
  100. func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  101. if p.partition >= numPartitions {
  102. p.partition = 0
  103. }
  104. ret := p.partition
  105. p.partition++
  106. return ret, nil
  107. }
  108. func (p *roundRobinPartitioner) RequiresConsistency() bool {
  109. return false
  110. }
  111. type hashPartitioner struct {
  112. random Partitioner
  113. hasher hash.Hash32
  114. referenceAbs bool
  115. hashUnsigned bool
  116. }
  117. // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
  118. // The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
  119. // each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
  120. func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
  121. return func(topic string) Partitioner {
  122. p := new(hashPartitioner)
  123. p.random = NewRandomPartitioner(topic)
  124. p.hasher = hasher()
  125. p.referenceAbs = false
  126. p.hashUnsigned = false
  127. return p
  128. }
  129. }
  130. // NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
  131. func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
  132. return func(topic string) Partitioner {
  133. p := new(hashPartitioner)
  134. p.random = NewRandomPartitioner(topic)
  135. p.hasher = fnv.New32a()
  136. p.referenceAbs = false
  137. p.hashUnsigned = false
  138. for _, option := range options {
  139. option(p)
  140. }
  141. return p
  142. }
  143. }
  144. // NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
  145. // random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
  146. // modulus the number of partitions. This ensures that messages with the same key always end up on the
  147. // same partition.
  148. func NewHashPartitioner(topic string) Partitioner {
  149. p := new(hashPartitioner)
  150. p.random = NewRandomPartitioner(topic)
  151. p.hasher = fnv.New32a()
  152. p.referenceAbs = false
  153. p.hashUnsigned = false
  154. return p
  155. }
  156. // NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
  157. // in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
  158. // that but it had a mistake and now there are people depending on both behaviors. This will
  159. // all go away on the next major version bump.
  160. func NewReferenceHashPartitioner(topic string) Partitioner {
  161. p := new(hashPartitioner)
  162. p.random = NewRandomPartitioner(topic)
  163. p.hasher = fnv.New32a()
  164. p.referenceAbs = true
  165. p.hashUnsigned = false
  166. return p
  167. }
  168. // NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash
  169. // of the encoded bytes of the message key modulus the number of partitions. This is compatible with
  170. // librdkafka's `consistent_random` partitioner
  171. func NewConsistentCRCHashPartitioner(topic string) Partitioner {
  172. p := new(hashPartitioner)
  173. p.random = NewRandomPartitioner(topic)
  174. p.hasher = crc32.NewIEEE()
  175. p.referenceAbs = false
  176. p.hashUnsigned = true
  177. return p
  178. }
  179. func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
  180. if message.Key == nil {
  181. return p.random.Partition(message, numPartitions)
  182. }
  183. bytes, err := message.Key.Encode()
  184. if err != nil {
  185. return -1, err
  186. }
  187. p.hasher.Reset()
  188. _, err = p.hasher.Write(bytes)
  189. if err != nil {
  190. return -1, err
  191. }
  192. var partition int32
  193. // Turns out we were doing our absolute value in a subtly different way from the upstream
  194. // implementation, but now we need to maintain backwards compat for people who started using
  195. // the old version; if referenceAbs is set we are compatible with the reference java client
  196. // but not past Sarama versions
  197. if p.referenceAbs {
  198. partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
  199. } else if p.hashUnsigned {
  200. // librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible
  201. // with librdkafka's `consistent` partitioning but not past Sarama versions
  202. partition = int32(p.hasher.Sum32() % uint32(numPartitions))
  203. } else {
  204. partition = int32(p.hasher.Sum32()) % numPartitions
  205. if partition < 0 {
  206. partition = -partition
  207. }
  208. }
  209. return partition, nil
  210. }
  211. func (p *hashPartitioner) RequiresConsistency() bool {
  212. return true
  213. }
  214. func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
  215. return message.Key != nil
  216. }