offset_manager.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638
  1. package sarama
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Offset Manager
  7. // OffsetManager uses Kafka to store and fetch consumed partition offsets.
  8. type OffsetManager interface {
  9. // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
  10. // It will return an error if this OffsetManager is already managing the given
  11. // topic/partition.
  12. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
  13. // Close stops the OffsetManager from managing offsets. It is required to call
  14. // this function before an OffsetManager object passes out of scope, as it
  15. // will otherwise leak memory. You must call this after all the
  16. // PartitionOffsetManagers are closed.
  17. Close() error
  18. // Commit commits the offsets. This method can be used if AutoCommit.Enable is
  19. // set to false.
  20. Commit()
  21. }
  22. type offsetManager struct {
  23. client Client
  24. conf *Config
  25. group string
  26. ticker *time.Ticker
  27. sessionCanceler func()
  28. memberID string
  29. groupInstanceId *string
  30. generation int32
  31. broker *Broker
  32. brokerLock sync.RWMutex
  33. poms map[string]map[int32]*partitionOffsetManager
  34. pomsLock sync.RWMutex
  35. closeOnce sync.Once
  36. closing chan none
  37. closed chan none
  38. }
  39. // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
  40. // It is still necessary to call Close() on the underlying client when finished with the partition manager.
  41. func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
  42. return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client, nil)
  43. }
  44. func newOffsetManagerFromClient(group, memberID string, generation int32, client Client, sessionCanceler func()) (*offsetManager, error) {
  45. // Check that we are not dealing with a closed Client before processing any other arguments
  46. if client.Closed() {
  47. return nil, ErrClosedClient
  48. }
  49. conf := client.Config()
  50. om := &offsetManager{
  51. client: client,
  52. conf: conf,
  53. group: group,
  54. poms: make(map[string]map[int32]*partitionOffsetManager),
  55. sessionCanceler: sessionCanceler,
  56. memberID: memberID,
  57. generation: generation,
  58. closing: make(chan none),
  59. closed: make(chan none),
  60. }
  61. if conf.Consumer.Group.InstanceId != "" {
  62. om.groupInstanceId = &conf.Consumer.Group.InstanceId
  63. }
  64. if conf.Consumer.Offsets.AutoCommit.Enable {
  65. om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
  66. go withRecover(om.mainLoop)
  67. }
  68. return om, nil
  69. }
  70. func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
  71. pom, err := om.newPartitionOffsetManager(topic, partition)
  72. if err != nil {
  73. return nil, err
  74. }
  75. om.pomsLock.Lock()
  76. defer om.pomsLock.Unlock()
  77. topicManagers := om.poms[topic]
  78. if topicManagers == nil {
  79. topicManagers = make(map[int32]*partitionOffsetManager)
  80. om.poms[topic] = topicManagers
  81. }
  82. if topicManagers[partition] != nil {
  83. return nil, ConfigurationError("That topic/partition is already being managed")
  84. }
  85. topicManagers[partition] = pom
  86. return pom, nil
  87. }
  88. func (om *offsetManager) Close() error {
  89. om.closeOnce.Do(func() {
  90. // exit the mainLoop
  91. close(om.closing)
  92. if om.conf.Consumer.Offsets.AutoCommit.Enable {
  93. <-om.closed
  94. }
  95. // mark all POMs as closed
  96. om.asyncClosePOMs()
  97. // flush one last time
  98. if om.conf.Consumer.Offsets.AutoCommit.Enable {
  99. for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
  100. om.flushToBroker()
  101. if om.releasePOMs(false) == 0 {
  102. break
  103. }
  104. }
  105. }
  106. om.releasePOMs(true)
  107. om.brokerLock.Lock()
  108. om.broker = nil
  109. om.brokerLock.Unlock()
  110. })
  111. return nil
  112. }
  113. func (om *offsetManager) computeBackoff(retries int) time.Duration {
  114. if om.conf.Metadata.Retry.BackoffFunc != nil {
  115. return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
  116. } else {
  117. return om.conf.Metadata.Retry.Backoff
  118. }
  119. }
  120. func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, int32, string, error) {
  121. broker, err := om.coordinator()
  122. if err != nil {
  123. if retries <= 0 {
  124. return 0, 0, "", err
  125. }
  126. return om.fetchInitialOffset(topic, partition, retries-1)
  127. }
  128. partitions := map[string][]int32{topic: {partition}}
  129. req := NewOffsetFetchRequest(om.conf.Version, om.group, partitions)
  130. resp, err := broker.FetchOffset(req)
  131. if err != nil {
  132. if retries <= 0 {
  133. return 0, 0, "", err
  134. }
  135. om.releaseCoordinator(broker)
  136. return om.fetchInitialOffset(topic, partition, retries-1)
  137. }
  138. block := resp.GetBlock(topic, partition)
  139. if block == nil {
  140. return 0, 0, "", ErrIncompleteResponse
  141. }
  142. switch block.Err {
  143. case ErrNoError:
  144. return block.Offset, block.LeaderEpoch, block.Metadata, nil
  145. case ErrNotCoordinatorForConsumer:
  146. if retries <= 0 {
  147. return 0, 0, "", block.Err
  148. }
  149. om.releaseCoordinator(broker)
  150. return om.fetchInitialOffset(topic, partition, retries-1)
  151. case ErrOffsetsLoadInProgress:
  152. if retries <= 0 {
  153. return 0, 0, "", block.Err
  154. }
  155. backoff := om.computeBackoff(retries)
  156. select {
  157. case <-om.closing:
  158. return 0, 0, "", block.Err
  159. case <-time.After(backoff):
  160. }
  161. return om.fetchInitialOffset(topic, partition, retries-1)
  162. default:
  163. return 0, 0, "", block.Err
  164. }
  165. }
  166. func (om *offsetManager) coordinator() (*Broker, error) {
  167. om.brokerLock.RLock()
  168. broker := om.broker
  169. om.brokerLock.RUnlock()
  170. if broker != nil {
  171. return broker, nil
  172. }
  173. om.brokerLock.Lock()
  174. defer om.brokerLock.Unlock()
  175. if broker := om.broker; broker != nil {
  176. return broker, nil
  177. }
  178. if err := om.client.RefreshCoordinator(om.group); err != nil {
  179. return nil, err
  180. }
  181. broker, err := om.client.Coordinator(om.group)
  182. if err != nil {
  183. return nil, err
  184. }
  185. om.broker = broker
  186. return broker, nil
  187. }
  188. func (om *offsetManager) releaseCoordinator(b *Broker) {
  189. om.brokerLock.Lock()
  190. if om.broker == b {
  191. om.broker = nil
  192. }
  193. om.brokerLock.Unlock()
  194. }
  195. func (om *offsetManager) mainLoop() {
  196. defer om.ticker.Stop()
  197. defer close(om.closed)
  198. for {
  199. select {
  200. case <-om.ticker.C:
  201. om.Commit()
  202. case <-om.closing:
  203. return
  204. }
  205. }
  206. }
  207. func (om *offsetManager) Commit() {
  208. om.flushToBroker()
  209. om.releasePOMs(false)
  210. }
  211. func (om *offsetManager) flushToBroker() {
  212. req := om.constructRequest()
  213. if req == nil {
  214. return
  215. }
  216. broker, err := om.coordinator()
  217. if err != nil {
  218. om.handleError(err)
  219. return
  220. }
  221. resp, err := broker.CommitOffset(req)
  222. if err != nil {
  223. om.handleError(err)
  224. om.releaseCoordinator(broker)
  225. _ = broker.Close()
  226. return
  227. }
  228. om.handleResponse(broker, req, resp)
  229. }
  230. func (om *offsetManager) constructRequest() *OffsetCommitRequest {
  231. r := &OffsetCommitRequest{
  232. Version: 1,
  233. ConsumerGroup: om.group,
  234. ConsumerID: om.memberID,
  235. ConsumerGroupGeneration: om.generation,
  236. }
  237. // Version 1 adds timestamp and group membership information, as well as the commit timestamp.
  238. //
  239. // Version 2 adds retention time. It removes the commit timestamp added in version 1.
  240. if om.conf.Version.IsAtLeast(V0_9_0_0) {
  241. r.Version = 2
  242. }
  243. // Version 3 and 4 are the same as version 2.
  244. if om.conf.Version.IsAtLeast(V0_11_0_0) {
  245. r.Version = 3
  246. }
  247. if om.conf.Version.IsAtLeast(V2_0_0_0) {
  248. r.Version = 4
  249. }
  250. // Version 5 removes the retention time, which is now controlled only by a broker configuration.
  251. //
  252. // Version 6 adds the leader epoch for fencing.
  253. if om.conf.Version.IsAtLeast(V2_1_0_0) {
  254. r.Version = 6
  255. }
  256. // version 7 adds a new field called groupInstanceId to indicate member identity across restarts.
  257. if om.conf.Version.IsAtLeast(V2_3_0_0) {
  258. r.Version = 7
  259. r.GroupInstanceId = om.groupInstanceId
  260. }
  261. // commit timestamp was only briefly supported in V1 where we set it to
  262. // ReceiveTime (-1) to tell the broker to set it to the time when the commit
  263. // request was received
  264. var commitTimestamp int64
  265. if r.Version == 1 {
  266. commitTimestamp = ReceiveTime
  267. }
  268. // request controlled retention was only supported from V2-V4 (it became
  269. // broker-only after that) so if the user has set the config options then
  270. // flow those through as retention time on the commit request.
  271. if r.Version >= 2 && r.Version < 5 {
  272. // Map Sarama's default of 0 to Kafka's default of -1
  273. r.RetentionTime = -1
  274. if om.conf.Consumer.Offsets.Retention > 0 {
  275. r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond)
  276. }
  277. }
  278. om.pomsLock.RLock()
  279. defer om.pomsLock.RUnlock()
  280. for _, topicManagers := range om.poms {
  281. for _, pom := range topicManagers {
  282. pom.lock.Lock()
  283. if pom.dirty {
  284. r.AddBlockWithLeaderEpoch(pom.topic, pom.partition, pom.offset, pom.leaderEpoch, commitTimestamp, pom.metadata)
  285. }
  286. pom.lock.Unlock()
  287. }
  288. }
  289. if len(r.blocks) > 0 {
  290. return r
  291. }
  292. return nil
  293. }
  294. func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
  295. om.pomsLock.RLock()
  296. defer om.pomsLock.RUnlock()
  297. for _, topicManagers := range om.poms {
  298. for _, pom := range topicManagers {
  299. if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
  300. continue
  301. }
  302. var err KError
  303. var ok bool
  304. if resp.Errors[pom.topic] == nil {
  305. pom.handleError(ErrIncompleteResponse)
  306. continue
  307. }
  308. if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
  309. pom.handleError(ErrIncompleteResponse)
  310. continue
  311. }
  312. switch err {
  313. case ErrNoError:
  314. block := req.blocks[pom.topic][pom.partition]
  315. pom.updateCommitted(block.offset, block.metadata)
  316. case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
  317. ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
  318. // not a critical error, we just need to redispatch
  319. om.releaseCoordinator(broker)
  320. case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
  321. // nothing we can do about this, just tell the user and carry on
  322. pom.handleError(err)
  323. case ErrOffsetsLoadInProgress:
  324. // nothing wrong but we didn't commit, we'll get it next time round
  325. case ErrFencedInstancedId:
  326. pom.handleError(err)
  327. // TODO close the whole consumer for instance fenced....
  328. om.tryCancelSession()
  329. case ErrUnknownTopicOrPartition:
  330. // let the user know *and* try redispatching - if topic-auto-create is
  331. // enabled, redispatching should trigger a metadata req and create the
  332. // topic; if not then re-dispatching won't help, but we've let the user
  333. // know and it shouldn't hurt either (see https://github.com/IBM/sarama/issues/706)
  334. fallthrough
  335. default:
  336. // dunno, tell the user and try redispatching
  337. pom.handleError(err)
  338. om.releaseCoordinator(broker)
  339. }
  340. }
  341. }
  342. }
  343. func (om *offsetManager) handleError(err error) {
  344. om.pomsLock.RLock()
  345. defer om.pomsLock.RUnlock()
  346. for _, topicManagers := range om.poms {
  347. for _, pom := range topicManagers {
  348. pom.handleError(err)
  349. }
  350. }
  351. }
  352. func (om *offsetManager) asyncClosePOMs() {
  353. om.pomsLock.RLock()
  354. defer om.pomsLock.RUnlock()
  355. for _, topicManagers := range om.poms {
  356. for _, pom := range topicManagers {
  357. pom.AsyncClose()
  358. }
  359. }
  360. }
  361. // Releases/removes closed POMs once they are clean (or when forced)
  362. func (om *offsetManager) releasePOMs(force bool) (remaining int) {
  363. om.pomsLock.Lock()
  364. defer om.pomsLock.Unlock()
  365. for topic, topicManagers := range om.poms {
  366. for partition, pom := range topicManagers {
  367. pom.lock.Lock()
  368. releaseDue := pom.done && (force || !pom.dirty)
  369. pom.lock.Unlock()
  370. if releaseDue {
  371. pom.release()
  372. delete(om.poms[topic], partition)
  373. if len(om.poms[topic]) == 0 {
  374. delete(om.poms, topic)
  375. }
  376. }
  377. }
  378. remaining += len(om.poms[topic])
  379. }
  380. return
  381. }
  382. func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
  383. om.pomsLock.RLock()
  384. defer om.pomsLock.RUnlock()
  385. if partitions, ok := om.poms[topic]; ok {
  386. if pom, ok := partitions[partition]; ok {
  387. return pom
  388. }
  389. }
  390. return nil
  391. }
  392. func (om *offsetManager) tryCancelSession() {
  393. if om.sessionCanceler != nil {
  394. om.sessionCanceler()
  395. }
  396. }
  397. // Partition Offset Manager
  398. // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
  399. // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
  400. // out of scope.
  401. type PartitionOffsetManager interface {
  402. // NextOffset returns the next offset that should be consumed for the managed
  403. // partition, accompanied by metadata which can be used to reconstruct the state
  404. // of the partition consumer when it resumes. NextOffset() will return
  405. // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
  406. // was committed for this partition yet.
  407. NextOffset() (int64, string)
  408. // MarkOffset marks the provided offset, alongside a metadata string
  409. // that represents the state of the partition consumer at that point in time. The
  410. // metadata string can be used by another consumer to restore that state, so it
  411. // can resume consumption.
  412. //
  413. // To follow upstream conventions, you are expected to mark the offset of the
  414. // next message to read, not the last message read. Thus, when calling `MarkOffset`
  415. // you should typically add one to the offset of the last consumed message.
  416. //
  417. // Note: calling MarkOffset does not necessarily commit the offset to the backend
  418. // store immediately for efficiency reasons, and it may never be committed if
  419. // your application crashes. This means that you may end up processing the same
  420. // message twice, and your processing should ideally be idempotent.
  421. MarkOffset(offset int64, metadata string)
  422. // ResetOffset resets to the provided offset, alongside a metadata string that
  423. // represents the state of the partition consumer at that point in time. Reset
  424. // acts as a counterpart to MarkOffset, the difference being that it allows to
  425. // reset an offset to an earlier or smaller value, where MarkOffset only
  426. // allows incrementing the offset. cf MarkOffset for more details.
  427. ResetOffset(offset int64, metadata string)
  428. // Errors returns a read channel of errors that occur during offset management, if
  429. // enabled. By default, errors are logged and not returned over this channel. If
  430. // you want to implement any custom error handling, set your config's
  431. // Consumer.Return.Errors setting to true, and read from this channel.
  432. Errors() <-chan *ConsumerError
  433. // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
  434. // return immediately, after which you should wait until the 'errors' channel has
  435. // been drained and closed. It is required to call this function, or Close before
  436. // a consumer object passes out of scope, as it will otherwise leak memory. You
  437. // must call this before calling Close on the underlying client.
  438. AsyncClose()
  439. // Close stops the PartitionOffsetManager from managing offsets. It is required to
  440. // call this function (or AsyncClose) before a PartitionOffsetManager object
  441. // passes out of scope, as it will otherwise leak memory. You must call this
  442. // before calling Close on the underlying client.
  443. Close() error
  444. }
  445. type partitionOffsetManager struct {
  446. parent *offsetManager
  447. topic string
  448. partition int32
  449. leaderEpoch int32
  450. lock sync.Mutex
  451. offset int64
  452. metadata string
  453. dirty bool
  454. done bool
  455. releaseOnce sync.Once
  456. errors chan *ConsumerError
  457. }
  458. func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
  459. offset, leaderEpoch, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
  460. if err != nil {
  461. return nil, err
  462. }
  463. return &partitionOffsetManager{
  464. parent: om,
  465. topic: topic,
  466. partition: partition,
  467. leaderEpoch: leaderEpoch,
  468. errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
  469. offset: offset,
  470. metadata: metadata,
  471. }, nil
  472. }
  473. func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
  474. return pom.errors
  475. }
  476. func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
  477. pom.lock.Lock()
  478. defer pom.lock.Unlock()
  479. if offset > pom.offset {
  480. pom.offset = offset
  481. pom.metadata = metadata
  482. pom.dirty = true
  483. }
  484. }
  485. func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
  486. pom.lock.Lock()
  487. defer pom.lock.Unlock()
  488. if offset <= pom.offset {
  489. pom.offset = offset
  490. pom.metadata = metadata
  491. pom.dirty = true
  492. }
  493. }
  494. func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
  495. pom.lock.Lock()
  496. defer pom.lock.Unlock()
  497. if pom.offset == offset && pom.metadata == metadata {
  498. pom.dirty = false
  499. }
  500. }
  501. func (pom *partitionOffsetManager) NextOffset() (int64, string) {
  502. pom.lock.Lock()
  503. defer pom.lock.Unlock()
  504. if pom.offset >= 0 {
  505. return pom.offset, pom.metadata
  506. }
  507. return pom.parent.conf.Consumer.Offsets.Initial, ""
  508. }
  509. func (pom *partitionOffsetManager) AsyncClose() {
  510. pom.lock.Lock()
  511. pom.done = true
  512. pom.lock.Unlock()
  513. }
  514. func (pom *partitionOffsetManager) Close() error {
  515. pom.AsyncClose()
  516. var errors ConsumerErrors
  517. for err := range pom.errors {
  518. errors = append(errors, err)
  519. }
  520. if len(errors) > 0 {
  521. return errors
  522. }
  523. return nil
  524. }
  525. func (pom *partitionOffsetManager) handleError(err error) {
  526. cErr := &ConsumerError{
  527. Topic: pom.topic,
  528. Partition: pom.partition,
  529. Err: err,
  530. }
  531. if pom.parent.conf.Consumer.Return.Errors {
  532. pom.errors <- cErr
  533. } else {
  534. Logger.Println(cErr)
  535. }
  536. }
  537. func (pom *partitionOffsetManager) release() {
  538. pom.releaseOnce.Do(func() {
  539. close(pom.errors)
  540. })
  541. }