balance_strategy.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141
  1. package sarama
  2. import (
  3. "container/heap"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "sort"
  8. "strings"
  9. )
  10. const (
  11. // RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
  12. RangeBalanceStrategyName = "range"
  13. // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
  14. RoundRobinBalanceStrategyName = "roundrobin"
  15. // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
  16. StickyBalanceStrategyName = "sticky"
  17. defaultGeneration = -1
  18. )
  19. // BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
  20. // It contains an allocation of topic/partitions by memberID in the form of
  21. // a `memberID -> topic -> partitions` map.
  22. type BalanceStrategyPlan map[string]map[string][]int32
  23. // Add assigns a topic with a number partitions to a member.
  24. func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
  25. if len(partitions) == 0 {
  26. return
  27. }
  28. if _, ok := p[memberID]; !ok {
  29. p[memberID] = make(map[string][]int32, 1)
  30. }
  31. p[memberID][topic] = append(p[memberID][topic], partitions...)
  32. }
  33. // --------------------------------------------------------------------
  34. // BalanceStrategy is used to balance topics and partitions
  35. // across members of a consumer group
  36. type BalanceStrategy interface {
  37. // Name uniquely identifies the strategy.
  38. Name() string
  39. // Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
  40. // and returns a distribution plan.
  41. Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
  42. // AssignmentData returns the serialized assignment data for the specified
  43. // memberID
  44. AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
  45. }
  46. // --------------------------------------------------------------------
  47. // NewBalanceStrategyRange returns a range balance strategy,
  48. // which is the default and assigns partitions as ranges to consumer group members.
  49. // This follows the same logic as
  50. // https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
  51. //
  52. // Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):
  53. //
  54. // M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
  55. // M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
  56. func NewBalanceStrategyRange() BalanceStrategy {
  57. return &balanceStrategy{
  58. name: RangeBalanceStrategyName,
  59. coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
  60. partitionsPerConsumer := len(partitions) / len(memberIDs)
  61. consumersWithExtraPartition := len(partitions) % len(memberIDs)
  62. sort.Strings(memberIDs)
  63. for i, memberID := range memberIDs {
  64. min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i)))
  65. extra := 0
  66. if i < consumersWithExtraPartition {
  67. extra = 1
  68. }
  69. max := min + partitionsPerConsumer + extra
  70. plan.Add(memberID, topic, partitions[min:max]...)
  71. }
  72. },
  73. }
  74. }
  75. // Deprecated: use NewBalanceStrategyRange to avoid data race issue
  76. var BalanceStrategyRange = NewBalanceStrategyRange()
  77. // NewBalanceStrategySticky returns a sticky balance strategy,
  78. // which assigns partitions to members with an attempt to preserve earlier assignments
  79. // while maintain a balanced partition distribution.
  80. // Example with topic T with six partitions (0..5) and two members (M1, M2):
  81. //
  82. // M1: {T: [0, 2, 4]}
  83. // M2: {T: [1, 3, 5]}
  84. //
  85. // On reassignment with an additional consumer, you might get an assignment plan like:
  86. //
  87. // M1: {T: [0, 2]}
  88. // M2: {T: [1, 3]}
  89. // M3: {T: [4, 5]}
  90. func NewBalanceStrategySticky() BalanceStrategy {
  91. return &stickyBalanceStrategy{}
  92. }
  93. // Deprecated: use NewBalanceStrategySticky to avoid data race issue
  94. var BalanceStrategySticky = NewBalanceStrategySticky()
  95. // --------------------------------------------------------------------
  96. type balanceStrategy struct {
  97. coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
  98. name string
  99. }
  100. // Name implements BalanceStrategy.
  101. func (s *balanceStrategy) Name() string { return s.name }
  102. // Plan implements BalanceStrategy.
  103. func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  104. // Build members by topic map
  105. mbt := make(map[string][]string)
  106. for memberID, meta := range members {
  107. for _, topic := range meta.Topics {
  108. mbt[topic] = append(mbt[topic], memberID)
  109. }
  110. }
  111. // func to sort and de-duplicate a StringSlice
  112. uniq := func(ss sort.StringSlice) []string {
  113. if ss.Len() < 2 {
  114. return ss
  115. }
  116. sort.Sort(ss)
  117. var i, j int
  118. for i = 1; i < ss.Len(); i++ {
  119. if ss[i] == ss[j] {
  120. continue
  121. }
  122. j++
  123. ss.Swap(i, j)
  124. }
  125. return ss[:j+1]
  126. }
  127. // Assemble plan
  128. plan := make(BalanceStrategyPlan, len(members))
  129. for topic, memberIDs := range mbt {
  130. s.coreFn(plan, uniq(memberIDs), topic, topics[topic])
  131. }
  132. return plan, nil
  133. }
  134. // AssignmentData simple strategies do not require any shared assignment data
  135. func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  136. return nil, nil
  137. }
  138. type stickyBalanceStrategy struct {
  139. movements partitionMovements
  140. }
  141. // Name implements BalanceStrategy.
  142. func (s *stickyBalanceStrategy) Name() string { return StickyBalanceStrategyName }
  143. // Plan implements BalanceStrategy.
  144. func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  145. // track partition movements during generation of the partition assignment plan
  146. s.movements = partitionMovements{
  147. Movements: make(map[topicPartitionAssignment]consumerPair),
  148. PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
  149. }
  150. // prepopulate the current assignment state from userdata on the consumer group members
  151. currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
  152. if err != nil {
  153. return nil, err
  154. }
  155. // determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
  156. isFreshAssignment := len(currentAssignment) == 0
  157. // create a mapping of all current topic partitions and the consumers that can be assigned to them
  158. partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
  159. for topic, partitions := range topics {
  160. for _, partition := range partitions {
  161. partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
  162. }
  163. }
  164. // create a mapping of all consumers to all potential topic partitions that can be assigned to them
  165. // also, populate the mapping of partitions to potential consumers
  166. consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
  167. for memberID, meta := range members {
  168. consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
  169. for _, topicSubscription := range meta.Topics {
  170. // only evaluate topic subscriptions that are present in the supplied topics map
  171. if _, found := topics[topicSubscription]; found {
  172. for _, partition := range topics[topicSubscription] {
  173. topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
  174. consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
  175. partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
  176. }
  177. }
  178. }
  179. // add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
  180. if _, exists := currentAssignment[memberID]; !exists {
  181. currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
  182. }
  183. }
  184. // create a mapping of each partition to its current consumer, where possible
  185. currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
  186. unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  187. for partition := range partition2AllPotentialConsumers {
  188. unvisitedPartitions[partition] = true
  189. }
  190. var unassignedPartitions []topicPartitionAssignment
  191. for memberID, partitions := range currentAssignment {
  192. var keepPartitions []topicPartitionAssignment
  193. for _, partition := range partitions {
  194. // If this partition no longer exists at all, likely due to the
  195. // topic being deleted, we remove the partition from the member.
  196. if _, exists := partition2AllPotentialConsumers[partition]; !exists {
  197. continue
  198. }
  199. delete(unvisitedPartitions, partition)
  200. currentPartitionConsumers[partition] = memberID
  201. if !strsContains(members[memberID].Topics, partition.Topic) {
  202. unassignedPartitions = append(unassignedPartitions, partition)
  203. continue
  204. }
  205. keepPartitions = append(keepPartitions, partition)
  206. }
  207. currentAssignment[memberID] = keepPartitions
  208. }
  209. for unvisited := range unvisitedPartitions {
  210. unassignedPartitions = append(unassignedPartitions, unvisited)
  211. }
  212. // sort the topic partitions in order of priority for reassignment
  213. sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
  214. // at this point we have preserved all valid topic partition to consumer assignments and removed
  215. // all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
  216. // to consumers so that the topic partition assignments are as balanced as possible.
  217. // an ascending sorted set of consumers based on how many topic partitions are already assigned to them
  218. sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
  219. s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
  220. // Assemble plan
  221. plan := make(BalanceStrategyPlan, len(currentAssignment))
  222. for memberID, assignments := range currentAssignment {
  223. if len(assignments) == 0 {
  224. plan[memberID] = make(map[string][]int32)
  225. } else {
  226. for _, assignment := range assignments {
  227. plan.Add(memberID, assignment.Topic, assignment.Partition)
  228. }
  229. }
  230. }
  231. return plan, nil
  232. }
  233. // AssignmentData serializes the set of topics currently assigned to the
  234. // specified member as part of the supplied balance plan
  235. func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  236. return encode(&StickyAssignorUserDataV1{
  237. Topics: topics,
  238. Generation: generationID,
  239. }, nil)
  240. }
  241. func strsContains(s []string, value string) bool {
  242. for _, entry := range s {
  243. if entry == value {
  244. return true
  245. }
  246. }
  247. return false
  248. }
  249. // Balance assignments across consumers for maximum fairness and stickiness.
  250. func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedPartitions []topicPartitionAssignment, unassignedPartitions []topicPartitionAssignment, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) {
  251. initializing := len(sortedCurrentSubscriptions) == 0 || len(currentAssignment[sortedCurrentSubscriptions[0]]) == 0
  252. // assign all unassigned partitions
  253. for _, partition := range unassignedPartitions {
  254. // skip if there is no potential consumer for the partition
  255. if len(partition2AllPotentialConsumers[partition]) == 0 {
  256. continue
  257. }
  258. sortedCurrentSubscriptions = assignPartition(partition, sortedCurrentSubscriptions, currentAssignment, consumer2AllPotentialPartitions, currentPartitionConsumer)
  259. }
  260. // narrow down the reassignment scope to only those partitions that can actually be reassigned
  261. for partition := range partition2AllPotentialConsumers {
  262. if !canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  263. sortedPartitions = removeTopicPartitionFromMemberAssignments(sortedPartitions, partition)
  264. }
  265. }
  266. // narrow down the reassignment scope to only those consumers that are subject to reassignment
  267. fixedAssignments := make(map[string][]topicPartitionAssignment)
  268. for memberID := range consumer2AllPotentialPartitions {
  269. if !canConsumerParticipateInReassignment(memberID, currentAssignment, consumer2AllPotentialPartitions, partition2AllPotentialConsumers) {
  270. fixedAssignments[memberID] = currentAssignment[memberID]
  271. delete(currentAssignment, memberID)
  272. sortedCurrentSubscriptions = sortMemberIDsByPartitionAssignments(currentAssignment)
  273. }
  274. }
  275. // create a deep copy of the current assignment so we can revert to it if we do not get a more balanced assignment later
  276. preBalanceAssignment := deepCopyAssignment(currentAssignment)
  277. preBalancePartitionConsumers := make(map[topicPartitionAssignment]string, len(currentPartitionConsumer))
  278. for k, v := range currentPartitionConsumer {
  279. preBalancePartitionConsumers[k] = v
  280. }
  281. reassignmentPerformed := s.performReassignments(sortedPartitions, currentAssignment, prevAssignment, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer)
  282. // if we are not preserving existing assignments and we have made changes to the current assignment
  283. // make sure we are getting a more balanced assignment; otherwise, revert to previous assignment
  284. if !initializing && reassignmentPerformed && getBalanceScore(currentAssignment) >= getBalanceScore(preBalanceAssignment) {
  285. currentAssignment = deepCopyAssignment(preBalanceAssignment)
  286. currentPartitionConsumer = make(map[topicPartitionAssignment]string, len(preBalancePartitionConsumers))
  287. for k, v := range preBalancePartitionConsumers {
  288. currentPartitionConsumer[k] = v
  289. }
  290. }
  291. // add the fixed assignments (those that could not change) back
  292. for consumer, assignments := range fixedAssignments {
  293. currentAssignment[consumer] = assignments
  294. }
  295. }
  296. // NewBalanceStrategyRoundRobin returns a round-robin balance strategy,
  297. // which assigns partitions to members in alternating order.
  298. // For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
  299. // M0: [t0p0, t0p2, t1p1]
  300. // M1: [t0p1, t1p0, t1p2]
  301. func NewBalanceStrategyRoundRobin() BalanceStrategy {
  302. return new(roundRobinBalancer)
  303. }
  304. // Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue
  305. var BalanceStrategyRoundRobin = NewBalanceStrategyRoundRobin()
  306. type roundRobinBalancer struct{}
  307. func (b *roundRobinBalancer) Name() string {
  308. return RoundRobinBalanceStrategyName
  309. }
  310. func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
  311. if len(memberAndMetadata) == 0 || len(topics) == 0 {
  312. return nil, errors.New("members and topics are not provided")
  313. }
  314. // sort partitions
  315. var topicPartitions []topicAndPartition
  316. for topic, partitions := range topics {
  317. for _, partition := range partitions {
  318. topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
  319. }
  320. }
  321. sort.SliceStable(topicPartitions, func(i, j int) bool {
  322. pi := topicPartitions[i]
  323. pj := topicPartitions[j]
  324. return pi.comparedValue() < pj.comparedValue()
  325. })
  326. // sort members
  327. var members []memberAndTopic
  328. for memberID, meta := range memberAndMetadata {
  329. m := memberAndTopic{
  330. memberID: memberID,
  331. topics: make(map[string]struct{}),
  332. }
  333. for _, t := range meta.Topics {
  334. m.topics[t] = struct{}{}
  335. }
  336. members = append(members, m)
  337. }
  338. sort.SliceStable(members, func(i, j int) bool {
  339. mi := members[i]
  340. mj := members[j]
  341. return mi.memberID < mj.memberID
  342. })
  343. // assign partitions
  344. plan := make(BalanceStrategyPlan, len(members))
  345. i := 0
  346. n := len(members)
  347. for _, tp := range topicPartitions {
  348. m := members[i%n]
  349. for !m.hasTopic(tp.topic) {
  350. i++
  351. m = members[i%n]
  352. }
  353. plan.Add(m.memberID, tp.topic, tp.partition)
  354. i++
  355. }
  356. return plan, nil
  357. }
  358. func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
  359. return nil, nil // do nothing for now
  360. }
  361. type topicAndPartition struct {
  362. topic string
  363. partition int32
  364. }
  365. func (tp *topicAndPartition) comparedValue() string {
  366. return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
  367. }
  368. type memberAndTopic struct {
  369. topics map[string]struct{}
  370. memberID string
  371. }
  372. func (m *memberAndTopic) hasTopic(topic string) bool {
  373. _, isExist := m.topics[topic]
  374. return isExist
  375. }
  376. // Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
  377. // A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
  378. // Lower balance score indicates a more balanced assignment.
  379. func getBalanceScore(assignment map[string][]topicPartitionAssignment) int {
  380. consumer2AssignmentSize := make(map[string]int, len(assignment))
  381. for memberID, partitions := range assignment {
  382. consumer2AssignmentSize[memberID] = len(partitions)
  383. }
  384. var score float64
  385. for memberID, consumerAssignmentSize := range consumer2AssignmentSize {
  386. delete(consumer2AssignmentSize, memberID)
  387. for _, otherConsumerAssignmentSize := range consumer2AssignmentSize {
  388. score += math.Abs(float64(consumerAssignmentSize - otherConsumerAssignmentSize))
  389. }
  390. }
  391. return int(score)
  392. }
  393. // Determine whether the current assignment plan is balanced.
  394. func isBalanced(currentAssignment map[string][]topicPartitionAssignment, allSubscriptions map[string][]topicPartitionAssignment) bool {
  395. sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
  396. min := len(currentAssignment[sortedCurrentSubscriptions[0]])
  397. max := len(currentAssignment[sortedCurrentSubscriptions[len(sortedCurrentSubscriptions)-1]])
  398. if min >= max-1 {
  399. // if minimum and maximum numbers of partitions assigned to consumers differ by at most one return true
  400. return true
  401. }
  402. // create a mapping from partitions to the consumer assigned to them
  403. allPartitions := make(map[topicPartitionAssignment]string)
  404. for memberID, partitions := range currentAssignment {
  405. for _, partition := range partitions {
  406. if _, exists := allPartitions[partition]; exists {
  407. Logger.Printf("Topic %s Partition %d is assigned more than one consumer", partition.Topic, partition.Partition)
  408. }
  409. allPartitions[partition] = memberID
  410. }
  411. }
  412. // for each consumer that does not have all the topic partitions it can get make sure none of the topic partitions it
  413. // could but did not get cannot be moved to it (because that would break the balance)
  414. for _, memberID := range sortedCurrentSubscriptions {
  415. consumerPartitions := currentAssignment[memberID]
  416. consumerPartitionCount := len(consumerPartitions)
  417. // skip if this consumer already has all the topic partitions it can get
  418. if consumerPartitionCount == len(allSubscriptions[memberID]) {
  419. continue
  420. }
  421. // otherwise make sure it cannot get any more
  422. potentialTopicPartitions := allSubscriptions[memberID]
  423. for _, partition := range potentialTopicPartitions {
  424. if !memberAssignmentsIncludeTopicPartition(currentAssignment[memberID], partition) {
  425. otherConsumer := allPartitions[partition]
  426. otherConsumerPartitionCount := len(currentAssignment[otherConsumer])
  427. if consumerPartitionCount < otherConsumerPartitionCount {
  428. return false
  429. }
  430. }
  431. }
  432. }
  433. return true
  434. }
  435. // Reassign all topic partitions that need reassignment until balanced.
  436. func (s *stickyBalanceStrategy) performReassignments(reassignablePartitions []topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, prevAssignment map[topicPartitionAssignment]consumerGenerationPair, sortedCurrentSubscriptions []string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, currentPartitionConsumer map[topicPartitionAssignment]string) bool {
  437. reassignmentPerformed := false
  438. modified := false
  439. // repeat reassignment until no partition can be moved to improve the balance
  440. for {
  441. modified = false
  442. // reassign all reassignable partitions (starting from the partition with least potential consumers and if needed)
  443. // until the full list is processed or a balance is achieved
  444. for _, partition := range reassignablePartitions {
  445. if isBalanced(currentAssignment, consumer2AllPotentialPartitions) {
  446. break
  447. }
  448. // the partition must have at least two consumers
  449. if len(partition2AllPotentialConsumers[partition]) <= 1 {
  450. Logger.Printf("Expected more than one potential consumer for partition %s topic %d", partition.Topic, partition.Partition)
  451. }
  452. // the partition must have a consumer
  453. consumer := currentPartitionConsumer[partition]
  454. if consumer == "" {
  455. Logger.Printf("Expected topic %s partition %d to be assigned to a consumer", partition.Topic, partition.Partition)
  456. }
  457. if _, exists := prevAssignment[partition]; exists {
  458. if len(currentAssignment[consumer]) > (len(currentAssignment[prevAssignment[partition].MemberID]) + 1) {
  459. sortedCurrentSubscriptions = s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, prevAssignment[partition].MemberID)
  460. reassignmentPerformed = true
  461. modified = true
  462. continue
  463. }
  464. }
  465. // check if a better-suited consumer exists for the partition; if so, reassign it
  466. for _, otherConsumer := range partition2AllPotentialConsumers[partition] {
  467. if len(currentAssignment[consumer]) > (len(currentAssignment[otherConsumer]) + 1) {
  468. sortedCurrentSubscriptions = s.reassignPartitionToNewConsumer(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, consumer2AllPotentialPartitions)
  469. reassignmentPerformed = true
  470. modified = true
  471. break
  472. }
  473. }
  474. }
  475. if !modified {
  476. return reassignmentPerformed
  477. }
  478. }
  479. }
  480. // Identify a new consumer for a topic partition and reassign it.
  481. func (s *stickyBalanceStrategy) reassignPartitionToNewConsumer(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []string {
  482. for _, anotherConsumer := range sortedCurrentSubscriptions {
  483. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[anotherConsumer], partition) {
  484. return s.reassignPartition(partition, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer, anotherConsumer)
  485. }
  486. }
  487. return sortedCurrentSubscriptions
  488. }
  489. // Reassign a specific partition to a new consumer
  490. func (s *stickyBalanceStrategy) reassignPartition(partition topicPartitionAssignment, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string, newConsumer string) []string {
  491. consumer := currentPartitionConsumer[partition]
  492. // find the correct partition movement considering the stickiness requirement
  493. partitionToBeMoved := s.movements.getTheActualPartitionToBeMoved(partition, consumer, newConsumer)
  494. return s.processPartitionMovement(partitionToBeMoved, newConsumer, currentAssignment, sortedCurrentSubscriptions, currentPartitionConsumer)
  495. }
  496. // Track the movement of a topic partition after assignment
  497. func (s *stickyBalanceStrategy) processPartitionMovement(partition topicPartitionAssignment, newConsumer string, currentAssignment map[string][]topicPartitionAssignment, sortedCurrentSubscriptions []string, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  498. oldConsumer := currentPartitionConsumer[partition]
  499. s.movements.movePartition(partition, oldConsumer, newConsumer)
  500. currentAssignment[oldConsumer] = removeTopicPartitionFromMemberAssignments(currentAssignment[oldConsumer], partition)
  501. currentAssignment[newConsumer] = append(currentAssignment[newConsumer], partition)
  502. currentPartitionConsumer[partition] = newConsumer
  503. return sortMemberIDsByPartitionAssignments(currentAssignment)
  504. }
  505. // Determine whether a specific consumer should be considered for topic partition assignment.
  506. func canConsumerParticipateInReassignment(memberID string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  507. currentPartitions := currentAssignment[memberID]
  508. currentAssignmentSize := len(currentPartitions)
  509. maxAssignmentSize := len(consumer2AllPotentialPartitions[memberID])
  510. if currentAssignmentSize > maxAssignmentSize {
  511. Logger.Printf("The consumer %s is assigned more partitions than the maximum possible", memberID)
  512. }
  513. if currentAssignmentSize < maxAssignmentSize {
  514. // if a consumer is not assigned all its potential partitions it is subject to reassignment
  515. return true
  516. }
  517. for _, partition := range currentPartitions {
  518. if canTopicPartitionParticipateInReassignment(partition, partition2AllPotentialConsumers) {
  519. return true
  520. }
  521. }
  522. return false
  523. }
  524. // Only consider reassigning those topic partitions that have two or more potential consumers.
  525. func canTopicPartitionParticipateInReassignment(partition topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) bool {
  526. return len(partition2AllPotentialConsumers[partition]) >= 2
  527. }
  528. // The assignment should improve the overall balance of the partition assignments to consumers.
  529. func assignPartition(partition topicPartitionAssignment, sortedCurrentSubscriptions []string, currentAssignment map[string][]topicPartitionAssignment, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment, currentPartitionConsumer map[topicPartitionAssignment]string) []string {
  530. for _, memberID := range sortedCurrentSubscriptions {
  531. if memberAssignmentsIncludeTopicPartition(consumer2AllPotentialPartitions[memberID], partition) {
  532. currentAssignment[memberID] = append(currentAssignment[memberID], partition)
  533. currentPartitionConsumer[partition] = memberID
  534. break
  535. }
  536. }
  537. return sortMemberIDsByPartitionAssignments(currentAssignment)
  538. }
  539. // Deserialize topic partition assignment data to aid with creation of a sticky assignment.
  540. func deserializeTopicPartitionAssignment(userDataBytes []byte) (StickyAssignorUserData, error) {
  541. userDataV1 := &StickyAssignorUserDataV1{}
  542. if err := decode(userDataBytes, userDataV1, nil); err != nil {
  543. userDataV0 := &StickyAssignorUserDataV0{}
  544. if err := decode(userDataBytes, userDataV0, nil); err != nil {
  545. return nil, err
  546. }
  547. return userDataV0, nil
  548. }
  549. return userDataV1, nil
  550. }
  551. // filterAssignedPartitions returns a map of consumer group members to their list of previously-assigned topic partitions, limited
  552. // to those topic partitions currently reported by the Kafka cluster.
  553. func filterAssignedPartitions(currentAssignment map[string][]topicPartitionAssignment, partition2AllPotentialConsumers map[topicPartitionAssignment][]string) map[string][]topicPartitionAssignment {
  554. assignments := deepCopyAssignment(currentAssignment)
  555. for memberID, partitions := range assignments {
  556. // perform in-place filtering
  557. i := 0
  558. for _, partition := range partitions {
  559. if _, exists := partition2AllPotentialConsumers[partition]; exists {
  560. partitions[i] = partition
  561. i++
  562. }
  563. }
  564. assignments[memberID] = partitions[:i]
  565. }
  566. return assignments
  567. }
  568. func removeTopicPartitionFromMemberAssignments(assignments []topicPartitionAssignment, topic topicPartitionAssignment) []topicPartitionAssignment {
  569. for i, assignment := range assignments {
  570. if assignment == topic {
  571. return append(assignments[:i], assignments[i+1:]...)
  572. }
  573. }
  574. return assignments
  575. }
  576. func memberAssignmentsIncludeTopicPartition(assignments []topicPartitionAssignment, topic topicPartitionAssignment) bool {
  577. for _, assignment := range assignments {
  578. if assignment == topic {
  579. return true
  580. }
  581. }
  582. return false
  583. }
  584. func sortPartitions(currentAssignment map[string][]topicPartitionAssignment, partitionsWithADifferentPreviousAssignment map[topicPartitionAssignment]consumerGenerationPair, isFreshAssignment bool, partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) []topicPartitionAssignment {
  585. unassignedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
  586. for partition := range partition2AllPotentialConsumers {
  587. unassignedPartitions[partition] = true
  588. }
  589. sortedPartitions := make([]topicPartitionAssignment, 0)
  590. if !isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions) {
  591. // if this is a reassignment and the subscriptions are identical (all consumers can consumer from all topics)
  592. // then we just need to simply list partitions in a round robin fashion (from consumers with
  593. // most assigned partitions to those with least)
  594. assignments := filterAssignedPartitions(currentAssignment, partition2AllPotentialConsumers)
  595. // use priority-queue to evaluate consumer group members in descending-order based on
  596. // the number of topic partition assignments (i.e. consumers with most assignments first)
  597. pq := make(assignmentPriorityQueue, len(assignments))
  598. i := 0
  599. for consumerID, consumerAssignments := range assignments {
  600. pq[i] = &consumerGroupMember{
  601. id: consumerID,
  602. assignments: consumerAssignments,
  603. }
  604. i++
  605. }
  606. heap.Init(&pq)
  607. // loop until no consumer-group members remain
  608. for pq.Len() != 0 {
  609. member := pq[0]
  610. // partitions that were assigned to a different consumer last time
  611. var prevPartitionIndex int
  612. for i, partition := range member.assignments {
  613. if _, exists := partitionsWithADifferentPreviousAssignment[partition]; exists {
  614. prevPartitionIndex = i
  615. break
  616. }
  617. }
  618. if len(member.assignments) > 0 {
  619. partition := member.assignments[prevPartitionIndex]
  620. sortedPartitions = append(sortedPartitions, partition)
  621. delete(unassignedPartitions, partition)
  622. if prevPartitionIndex == 0 {
  623. member.assignments = member.assignments[1:]
  624. } else {
  625. member.assignments = append(member.assignments[:prevPartitionIndex], member.assignments[prevPartitionIndex+1:]...)
  626. }
  627. heap.Fix(&pq, 0)
  628. } else {
  629. heap.Pop(&pq)
  630. }
  631. }
  632. for partition := range unassignedPartitions {
  633. sortedPartitions = append(sortedPartitions, partition)
  634. }
  635. } else {
  636. // an ascending sorted set of topic partitions based on how many consumers can potentially use them
  637. sortedPartitions = sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers)
  638. }
  639. return sortedPartitions
  640. }
  641. func sortMemberIDsByPartitionAssignments(assignments map[string][]topicPartitionAssignment) []string {
  642. // sort the members by the number of partition assignments in ascending order
  643. sortedMemberIDs := make([]string, 0, len(assignments))
  644. for memberID := range assignments {
  645. sortedMemberIDs = append(sortedMemberIDs, memberID)
  646. }
  647. sort.SliceStable(sortedMemberIDs, func(i, j int) bool {
  648. ret := len(assignments[sortedMemberIDs[i]]) - len(assignments[sortedMemberIDs[j]])
  649. if ret == 0 {
  650. return sortedMemberIDs[i] < sortedMemberIDs[j]
  651. }
  652. return len(assignments[sortedMemberIDs[i]]) < len(assignments[sortedMemberIDs[j]])
  653. })
  654. return sortedMemberIDs
  655. }
  656. func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumers map[topicPartitionAssignment][]string) []topicPartitionAssignment {
  657. // sort the members by the number of partition assignments in descending order
  658. sortedPartionIDs := make([]topicPartitionAssignment, len(partition2AllPotentialConsumers))
  659. i := 0
  660. for partition := range partition2AllPotentialConsumers {
  661. sortedPartionIDs[i] = partition
  662. i++
  663. }
  664. sort.Slice(sortedPartionIDs, func(i, j int) bool {
  665. if len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) == len(partition2AllPotentialConsumers[sortedPartionIDs[j]]) {
  666. ret := strings.Compare(sortedPartionIDs[i].Topic, sortedPartionIDs[j].Topic)
  667. if ret == 0 {
  668. return sortedPartionIDs[i].Partition < sortedPartionIDs[j].Partition
  669. }
  670. return ret < 0
  671. }
  672. return len(partition2AllPotentialConsumers[sortedPartionIDs[i]]) < len(partition2AllPotentialConsumers[sortedPartionIDs[j]])
  673. })
  674. return sortedPartionIDs
  675. }
  676. func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
  677. m := make(map[string][]topicPartitionAssignment, len(assignment))
  678. for memberID, subscriptions := range assignment {
  679. m[memberID] = append(subscriptions[:0:0], subscriptions...)
  680. }
  681. return m
  682. }
  683. func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
  684. curMembers := make(map[string]int)
  685. for _, cur := range partition2AllPotentialConsumers {
  686. if len(curMembers) == 0 {
  687. for _, curMembersElem := range cur {
  688. curMembers[curMembersElem]++
  689. }
  690. continue
  691. }
  692. if len(curMembers) != len(cur) {
  693. return false
  694. }
  695. yMap := make(map[string]int)
  696. for _, yElem := range cur {
  697. yMap[yElem]++
  698. }
  699. for curMembersMapKey, curMembersMapVal := range curMembers {
  700. if yMap[curMembersMapKey] != curMembersMapVal {
  701. return false
  702. }
  703. }
  704. }
  705. curPartitions := make(map[topicPartitionAssignment]int)
  706. for _, cur := range consumer2AllPotentialPartitions {
  707. if len(curPartitions) == 0 {
  708. for _, curPartitionElem := range cur {
  709. curPartitions[curPartitionElem]++
  710. }
  711. continue
  712. }
  713. if len(curPartitions) != len(cur) {
  714. return false
  715. }
  716. yMap := make(map[topicPartitionAssignment]int)
  717. for _, yElem := range cur {
  718. yMap[yElem]++
  719. }
  720. for curMembersMapKey, curMembersMapVal := range curPartitions {
  721. if yMap[curMembersMapKey] != curMembersMapVal {
  722. return false
  723. }
  724. }
  725. }
  726. return true
  727. }
  728. // We need to process subscriptions' user data with each consumer's reported generation in mind
  729. // higher generations overwrite lower generations in case of a conflict
  730. // note that a conflict could exist only if user data is for different generations
  731. func prepopulateCurrentAssignments(members map[string]ConsumerGroupMemberMetadata) (map[string][]topicPartitionAssignment, map[topicPartitionAssignment]consumerGenerationPair, error) {
  732. currentAssignment := make(map[string][]topicPartitionAssignment)
  733. prevAssignment := make(map[topicPartitionAssignment]consumerGenerationPair)
  734. // for each partition we create a sorted map of its consumers by generation
  735. sortedPartitionConsumersByGeneration := make(map[topicPartitionAssignment]map[int]string)
  736. for memberID, meta := range members {
  737. consumerUserData, err := deserializeTopicPartitionAssignment(meta.UserData)
  738. if err != nil {
  739. return nil, nil, err
  740. }
  741. for _, partition := range consumerUserData.partitions() {
  742. if consumers, exists := sortedPartitionConsumersByGeneration[partition]; exists {
  743. if consumerUserData.hasGeneration() {
  744. if _, generationExists := consumers[consumerUserData.generation()]; generationExists {
  745. // same partition is assigned to two consumers during the same rebalance.
  746. // log a warning and skip this record
  747. Logger.Printf("Topic %s Partition %d is assigned to multiple consumers following sticky assignment generation %d", partition.Topic, partition.Partition, consumerUserData.generation())
  748. continue
  749. } else {
  750. consumers[consumerUserData.generation()] = memberID
  751. }
  752. } else {
  753. consumers[defaultGeneration] = memberID
  754. }
  755. } else {
  756. generation := defaultGeneration
  757. if consumerUserData.hasGeneration() {
  758. generation = consumerUserData.generation()
  759. }
  760. sortedPartitionConsumersByGeneration[partition] = map[int]string{generation: memberID}
  761. }
  762. }
  763. }
  764. // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition
  765. // current and previous consumers are the last two consumers of each partition in the above sorted map
  766. for partition, consumers := range sortedPartitionConsumersByGeneration {
  767. // sort consumers by generation in decreasing order
  768. var generations []int
  769. for generation := range consumers {
  770. generations = append(generations, generation)
  771. }
  772. sort.Sort(sort.Reverse(sort.IntSlice(generations)))
  773. consumer := consumers[generations[0]]
  774. if _, exists := currentAssignment[consumer]; !exists {
  775. currentAssignment[consumer] = []topicPartitionAssignment{partition}
  776. } else {
  777. currentAssignment[consumer] = append(currentAssignment[consumer], partition)
  778. }
  779. // check for previous assignment, if any
  780. if len(generations) > 1 {
  781. prevAssignment[partition] = consumerGenerationPair{
  782. MemberID: consumers[generations[1]],
  783. Generation: generations[1],
  784. }
  785. }
  786. }
  787. return currentAssignment, prevAssignment, nil
  788. }
  789. type consumerGenerationPair struct {
  790. MemberID string
  791. Generation int
  792. }
  793. // consumerPair represents a pair of Kafka consumer ids involved in a partition reassignment.
  794. type consumerPair struct {
  795. SrcMemberID string
  796. DstMemberID string
  797. }
  798. // partitionMovements maintains some data structures to simplify lookup of partition movements among consumers.
  799. type partitionMovements struct {
  800. PartitionMovementsByTopic map[string]map[consumerPair]map[topicPartitionAssignment]bool
  801. Movements map[topicPartitionAssignment]consumerPair
  802. }
  803. func (p *partitionMovements) removeMovementRecordOfPartition(partition topicPartitionAssignment) consumerPair {
  804. pair := p.Movements[partition]
  805. delete(p.Movements, partition)
  806. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  807. delete(partitionMovementsForThisTopic[pair], partition)
  808. if len(partitionMovementsForThisTopic[pair]) == 0 {
  809. delete(partitionMovementsForThisTopic, pair)
  810. }
  811. if len(p.PartitionMovementsByTopic[partition.Topic]) == 0 {
  812. delete(p.PartitionMovementsByTopic, partition.Topic)
  813. }
  814. return pair
  815. }
  816. func (p *partitionMovements) addPartitionMovementRecord(partition topicPartitionAssignment, pair consumerPair) {
  817. p.Movements[partition] = pair
  818. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  819. p.PartitionMovementsByTopic[partition.Topic] = make(map[consumerPair]map[topicPartitionAssignment]bool)
  820. }
  821. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  822. if _, exists := partitionMovementsForThisTopic[pair]; !exists {
  823. partitionMovementsForThisTopic[pair] = make(map[topicPartitionAssignment]bool)
  824. }
  825. partitionMovementsForThisTopic[pair][partition] = true
  826. }
  827. func (p *partitionMovements) movePartition(partition topicPartitionAssignment, oldConsumer, newConsumer string) {
  828. pair := consumerPair{
  829. SrcMemberID: oldConsumer,
  830. DstMemberID: newConsumer,
  831. }
  832. if _, exists := p.Movements[partition]; exists {
  833. // this partition has previously moved
  834. existingPair := p.removeMovementRecordOfPartition(partition)
  835. if existingPair.DstMemberID != oldConsumer {
  836. Logger.Printf("Existing pair DstMemberID %s was not equal to the oldConsumer ID %s", existingPair.DstMemberID, oldConsumer)
  837. }
  838. if existingPair.SrcMemberID != newConsumer {
  839. // the partition is not moving back to its previous consumer
  840. p.addPartitionMovementRecord(partition, consumerPair{
  841. SrcMemberID: existingPair.SrcMemberID,
  842. DstMemberID: newConsumer,
  843. })
  844. }
  845. } else {
  846. p.addPartitionMovementRecord(partition, pair)
  847. }
  848. }
  849. func (p *partitionMovements) getTheActualPartitionToBeMoved(partition topicPartitionAssignment, oldConsumer, newConsumer string) topicPartitionAssignment {
  850. if _, exists := p.PartitionMovementsByTopic[partition.Topic]; !exists {
  851. return partition
  852. }
  853. if _, exists := p.Movements[partition]; exists {
  854. // this partition has previously moved
  855. if oldConsumer != p.Movements[partition].DstMemberID {
  856. Logger.Printf("Partition movement DstMemberID %s was not equal to the oldConsumer ID %s", p.Movements[partition].DstMemberID, oldConsumer)
  857. }
  858. oldConsumer = p.Movements[partition].SrcMemberID
  859. }
  860. partitionMovementsForThisTopic := p.PartitionMovementsByTopic[partition.Topic]
  861. reversePair := consumerPair{
  862. SrcMemberID: newConsumer,
  863. DstMemberID: oldConsumer,
  864. }
  865. if _, exists := partitionMovementsForThisTopic[reversePair]; !exists {
  866. return partition
  867. }
  868. var reversePairPartition topicPartitionAssignment
  869. for otherPartition := range partitionMovementsForThisTopic[reversePair] {
  870. reversePairPartition = otherPartition
  871. }
  872. return reversePairPartition
  873. }
  874. func (p *partitionMovements) isLinked(src, dst string, pairs []consumerPair, currentPath []string) ([]string, bool) {
  875. if src == dst {
  876. return currentPath, false
  877. }
  878. if len(pairs) == 0 {
  879. return currentPath, false
  880. }
  881. for _, pair := range pairs {
  882. if src == pair.SrcMemberID && dst == pair.DstMemberID {
  883. currentPath = append(currentPath, src, dst)
  884. return currentPath, true
  885. }
  886. }
  887. for _, pair := range pairs {
  888. if pair.SrcMemberID != src {
  889. continue
  890. }
  891. // create a deep copy of the pairs, excluding the current pair
  892. reducedSet := make([]consumerPair, len(pairs)-1)
  893. i := 0
  894. for _, p := range pairs {
  895. if p != pair {
  896. reducedSet[i] = pair
  897. i++
  898. }
  899. }
  900. currentPath = append(currentPath, pair.SrcMemberID)
  901. return p.isLinked(pair.DstMemberID, dst, reducedSet, currentPath)
  902. }
  903. return currentPath, false
  904. }
  905. func (p *partitionMovements) in(cycle []string, cycles [][]string) bool {
  906. superCycle := make([]string, len(cycle)-1)
  907. for i := 0; i < len(cycle)-1; i++ {
  908. superCycle[i] = cycle[i]
  909. }
  910. superCycle = append(superCycle, cycle...)
  911. for _, foundCycle := range cycles {
  912. if len(foundCycle) == len(cycle) && indexOfSubList(superCycle, foundCycle) != -1 {
  913. return true
  914. }
  915. }
  916. return false
  917. }
  918. func (p *partitionMovements) hasCycles(pairs []consumerPair) bool {
  919. cycles := make([][]string, 0)
  920. for _, pair := range pairs {
  921. // create a deep copy of the pairs, excluding the current pair
  922. reducedPairs := make([]consumerPair, len(pairs)-1)
  923. i := 0
  924. for _, p := range pairs {
  925. if p != pair {
  926. reducedPairs[i] = pair
  927. i++
  928. }
  929. }
  930. if path, linked := p.isLinked(pair.DstMemberID, pair.SrcMemberID, reducedPairs, []string{pair.SrcMemberID}); linked {
  931. if !p.in(path, cycles) {
  932. cycles = append(cycles, path)
  933. Logger.Printf("A cycle of length %d was found: %v", len(path)-1, path)
  934. }
  935. }
  936. }
  937. // for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
  938. // the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
  939. // tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
  940. for _, cycle := range cycles {
  941. if len(cycle) == 3 {
  942. return true
  943. }
  944. }
  945. return false
  946. }
  947. func (p *partitionMovements) isSticky() bool {
  948. for topic, movements := range p.PartitionMovementsByTopic {
  949. movementPairs := make([]consumerPair, len(movements))
  950. i := 0
  951. for pair := range movements {
  952. movementPairs[i] = pair
  953. i++
  954. }
  955. if p.hasCycles(movementPairs) {
  956. Logger.Printf("Stickiness is violated for topic %s", topic)
  957. Logger.Printf("Partition movements for this topic occurred among the following consumer pairs: %v", movements)
  958. return false
  959. }
  960. }
  961. return true
  962. }
  963. func indexOfSubList(source []string, target []string) int {
  964. targetSize := len(target)
  965. maxCandidate := len(source) - targetSize
  966. nextCand:
  967. for candidate := 0; candidate <= maxCandidate; candidate++ {
  968. j := candidate
  969. for i := 0; i < targetSize; i++ {
  970. if target[i] != source[j] {
  971. // Element mismatch, try next cand
  972. continue nextCand
  973. }
  974. j++
  975. }
  976. // All elements of candidate matched target
  977. return candidate
  978. }
  979. return -1
  980. }
  981. type consumerGroupMember struct {
  982. id string
  983. assignments []topicPartitionAssignment
  984. }
  985. // assignmentPriorityQueue is a priority-queue of consumer group members that is sorted
  986. // in descending order (most assignments to least assignments).
  987. type assignmentPriorityQueue []*consumerGroupMember
  988. func (pq assignmentPriorityQueue) Len() int { return len(pq) }
  989. func (pq assignmentPriorityQueue) Less(i, j int) bool {
  990. // order assignment priority queue in descending order using assignment-count/member-id
  991. if len(pq[i].assignments) == len(pq[j].assignments) {
  992. return pq[i].id > pq[j].id
  993. }
  994. return len(pq[i].assignments) > len(pq[j].assignments)
  995. }
  996. func (pq assignmentPriorityQueue) Swap(i, j int) {
  997. pq[i], pq[j] = pq[j], pq[i]
  998. }
  999. func (pq *assignmentPriorityQueue) Push(x interface{}) {
  1000. member := x.(*consumerGroupMember)
  1001. *pq = append(*pq, member)
  1002. }
  1003. func (pq *assignmentPriorityQueue) Pop() interface{} {
  1004. old := *pq
  1005. n := len(old)
  1006. member := old[n-1]
  1007. *pq = old[0 : n-1]
  1008. return member
  1009. }