mockresponses.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509
  1. package sarama
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. )
  7. // TestReporter has methods matching go's testing.T to avoid importing
  8. // `testing` in the main part of the library.
  9. type TestReporter interface {
  10. Error(...interface{})
  11. Errorf(string, ...interface{})
  12. Fatal(...interface{})
  13. Fatalf(string, ...interface{})
  14. Helper()
  15. }
  16. // MockResponse is a response builder interface it defines one method that
  17. // allows generating a response based on a request body. MockResponses are used
  18. // to program behavior of MockBroker in tests.
  19. type MockResponse interface {
  20. For(reqBody versionedDecoder) (res encoderWithHeader)
  21. }
  22. // MockWrapper is a mock response builder that returns a particular concrete
  23. // response regardless of the actual request passed to the `For` method.
  24. type MockWrapper struct {
  25. res encoderWithHeader
  26. }
  27. func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
  28. return mw.res
  29. }
  30. func NewMockWrapper(res encoderWithHeader) *MockWrapper {
  31. return &MockWrapper{res: res}
  32. }
  33. // MockSequence is a mock response builder that is created from a sequence of
  34. // concrete responses. Every time when a `MockBroker` calls its `For` method
  35. // the next response from the sequence is returned. When the end of the
  36. // sequence is reached the last element from the sequence is returned.
  37. type MockSequence struct {
  38. responses []MockResponse
  39. }
  40. func NewMockSequence(responses ...interface{}) *MockSequence {
  41. ms := &MockSequence{}
  42. ms.responses = make([]MockResponse, len(responses))
  43. for i, res := range responses {
  44. switch res := res.(type) {
  45. case MockResponse:
  46. ms.responses[i] = res
  47. case encoderWithHeader:
  48. ms.responses[i] = NewMockWrapper(res)
  49. default:
  50. panic(fmt.Sprintf("Unexpected response type: %T", res))
  51. }
  52. }
  53. return ms
  54. }
  55. func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
  56. res = mc.responses[0].For(reqBody)
  57. if len(mc.responses) > 1 {
  58. mc.responses = mc.responses[1:]
  59. }
  60. return res
  61. }
  62. type MockListGroupsResponse struct {
  63. groups map[string]string
  64. t TestReporter
  65. }
  66. func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
  67. return &MockListGroupsResponse{
  68. groups: make(map[string]string),
  69. t: t,
  70. }
  71. }
  72. func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  73. request := reqBody.(*ListGroupsRequest)
  74. response := &ListGroupsResponse{
  75. Version: request.Version,
  76. Groups: m.groups,
  77. }
  78. return response
  79. }
  80. func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
  81. m.groups[groupID] = protocolType
  82. return m
  83. }
  84. type MockDescribeGroupsResponse struct {
  85. groups map[string]*GroupDescription
  86. t TestReporter
  87. }
  88. func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
  89. return &MockDescribeGroupsResponse{
  90. t: t,
  91. groups: make(map[string]*GroupDescription),
  92. }
  93. }
  94. func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
  95. m.groups[groupID] = description
  96. return m
  97. }
  98. func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  99. request := reqBody.(*DescribeGroupsRequest)
  100. response := &DescribeGroupsResponse{Version: request.version()}
  101. for _, requestedGroup := range request.Groups {
  102. if group, ok := m.groups[requestedGroup]; ok {
  103. response.Groups = append(response.Groups, group)
  104. } else {
  105. // Mimic real kafka - if a group doesn't exist, return
  106. // an entry with state "Dead"
  107. response.Groups = append(response.Groups, &GroupDescription{
  108. GroupId: requestedGroup,
  109. State: "Dead",
  110. })
  111. }
  112. }
  113. return response
  114. }
  115. // MockMetadataResponse is a `MetadataResponse` builder.
  116. type MockMetadataResponse struct {
  117. controllerID int32
  118. errors map[string]KError
  119. leaders map[string]map[int32]int32
  120. brokers map[string]int32
  121. t TestReporter
  122. }
  123. func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
  124. return &MockMetadataResponse{
  125. errors: make(map[string]KError),
  126. leaders: make(map[string]map[int32]int32),
  127. brokers: make(map[string]int32),
  128. t: t,
  129. }
  130. }
  131. func (mmr *MockMetadataResponse) SetError(topic string, kerror KError) *MockMetadataResponse {
  132. mmr.errors[topic] = kerror
  133. return mmr
  134. }
  135. func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
  136. partitions := mmr.leaders[topic]
  137. if partitions == nil {
  138. partitions = make(map[int32]int32)
  139. mmr.leaders[topic] = partitions
  140. }
  141. partitions[partition] = brokerID
  142. return mmr
  143. }
  144. func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
  145. mmr.brokers[addr] = brokerID
  146. return mmr
  147. }
  148. func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
  149. mmr.controllerID = brokerID
  150. return mmr
  151. }
  152. func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
  153. metadataRequest := reqBody.(*MetadataRequest)
  154. metadataResponse := &MetadataResponse{
  155. Version: metadataRequest.version(),
  156. ControllerID: mmr.controllerID,
  157. }
  158. for addr, brokerID := range mmr.brokers {
  159. metadataResponse.AddBroker(addr, brokerID)
  160. }
  161. // Generate set of replicas
  162. var replicas []int32
  163. var offlineReplicas []int32
  164. for _, brokerID := range mmr.brokers {
  165. replicas = append(replicas, brokerID)
  166. }
  167. if len(metadataRequest.Topics) == 0 {
  168. for topic, partitions := range mmr.leaders {
  169. for partition, brokerID := range partitions {
  170. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  171. }
  172. }
  173. for topic, err := range mmr.errors {
  174. metadataResponse.AddTopic(topic, err)
  175. }
  176. return metadataResponse
  177. }
  178. for _, topic := range metadataRequest.Topics {
  179. leaders, ok := mmr.leaders[topic]
  180. if !ok {
  181. if err, ok := mmr.errors[topic]; ok {
  182. metadataResponse.AddTopic(topic, err)
  183. } else {
  184. metadataResponse.AddTopic(topic, ErrUnknownTopicOrPartition)
  185. }
  186. continue
  187. }
  188. for partition, brokerID := range leaders {
  189. metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
  190. }
  191. }
  192. return metadataResponse
  193. }
  194. // MockOffsetResponse is an `OffsetResponse` builder.
  195. type MockOffsetResponse struct {
  196. offsets map[string]map[int32]map[int64]int64
  197. t TestReporter
  198. }
  199. func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
  200. return &MockOffsetResponse{
  201. offsets: make(map[string]map[int32]map[int64]int64),
  202. t: t,
  203. }
  204. }
  205. func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
  206. partitions := mor.offsets[topic]
  207. if partitions == nil {
  208. partitions = make(map[int32]map[int64]int64)
  209. mor.offsets[topic] = partitions
  210. }
  211. times := partitions[partition]
  212. if times == nil {
  213. times = make(map[int64]int64)
  214. partitions[partition] = times
  215. }
  216. times[time] = offset
  217. return mor
  218. }
  219. func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
  220. offsetRequest := reqBody.(*OffsetRequest)
  221. offsetResponse := &OffsetResponse{Version: offsetRequest.Version}
  222. for topic, partitions := range offsetRequest.blocks {
  223. for partition, block := range partitions {
  224. offset := mor.getOffset(topic, partition, block.timestamp)
  225. offsetResponse.AddTopicPartition(topic, partition, offset)
  226. }
  227. }
  228. return offsetResponse
  229. }
  230. func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
  231. partitions := mor.offsets[topic]
  232. if partitions == nil {
  233. mor.t.Errorf("missing topic: %s", topic)
  234. }
  235. times := partitions[partition]
  236. if times == nil {
  237. mor.t.Errorf("missing partition: %d", partition)
  238. }
  239. offset, ok := times[time]
  240. if !ok {
  241. mor.t.Errorf("missing time: %d", time)
  242. }
  243. return offset
  244. }
  245. // mockMessage is a message that used to be mocked for `FetchResponse`
  246. type mockMessage struct {
  247. key Encoder
  248. msg Encoder
  249. }
  250. func newMockMessage(key, msg Encoder) *mockMessage {
  251. return &mockMessage{
  252. key: key,
  253. msg: msg,
  254. }
  255. }
  256. // MockFetchResponse is a `FetchResponse` builder.
  257. type MockFetchResponse struct {
  258. messages map[string]map[int32]map[int64]*mockMessage
  259. messagesLock *sync.RWMutex
  260. highWaterMarks map[string]map[int32]int64
  261. t TestReporter
  262. batchSize int
  263. }
  264. func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
  265. return &MockFetchResponse{
  266. messages: make(map[string]map[int32]map[int64]*mockMessage),
  267. messagesLock: &sync.RWMutex{},
  268. highWaterMarks: make(map[string]map[int32]int64),
  269. t: t,
  270. batchSize: batchSize,
  271. }
  272. }
  273. func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
  274. return mfr.SetMessageWithKey(topic, partition, offset, nil, msg)
  275. }
  276. func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse {
  277. mfr.messagesLock.Lock()
  278. defer mfr.messagesLock.Unlock()
  279. partitions := mfr.messages[topic]
  280. if partitions == nil {
  281. partitions = make(map[int32]map[int64]*mockMessage)
  282. mfr.messages[topic] = partitions
  283. }
  284. messages := partitions[partition]
  285. if messages == nil {
  286. messages = make(map[int64]*mockMessage)
  287. partitions[partition] = messages
  288. }
  289. messages[offset] = newMockMessage(key, msg)
  290. return mfr
  291. }
  292. func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
  293. partitions := mfr.highWaterMarks[topic]
  294. if partitions == nil {
  295. partitions = make(map[int32]int64)
  296. mfr.highWaterMarks[topic] = partitions
  297. }
  298. partitions[partition] = offset
  299. return mfr
  300. }
  301. func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
  302. fetchRequest := reqBody.(*FetchRequest)
  303. res := &FetchResponse{
  304. Version: fetchRequest.Version,
  305. }
  306. for topic, partitions := range fetchRequest.blocks {
  307. for partition, block := range partitions {
  308. initialOffset := block.fetchOffset
  309. offset := initialOffset
  310. maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
  311. for i := 0; i < mfr.batchSize && offset < maxOffset; {
  312. msg := mfr.getMessage(topic, partition, offset)
  313. if msg != nil {
  314. res.AddMessage(topic, partition, msg.key, msg.msg, offset)
  315. i++
  316. }
  317. offset++
  318. }
  319. fb := res.GetBlock(topic, partition)
  320. if fb == nil {
  321. res.AddError(topic, partition, ErrNoError)
  322. fb = res.GetBlock(topic, partition)
  323. }
  324. fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
  325. }
  326. }
  327. return res
  328. }
  329. func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) *mockMessage {
  330. mfr.messagesLock.RLock()
  331. defer mfr.messagesLock.RUnlock()
  332. partitions := mfr.messages[topic]
  333. if partitions == nil {
  334. return nil
  335. }
  336. messages := partitions[partition]
  337. if messages == nil {
  338. return nil
  339. }
  340. return messages[offset]
  341. }
  342. func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
  343. mfr.messagesLock.RLock()
  344. defer mfr.messagesLock.RUnlock()
  345. partitions := mfr.messages[topic]
  346. if partitions == nil {
  347. return 0
  348. }
  349. messages := partitions[partition]
  350. if messages == nil {
  351. return 0
  352. }
  353. return len(messages)
  354. }
  355. func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
  356. partitions := mfr.highWaterMarks[topic]
  357. if partitions == nil {
  358. return 0
  359. }
  360. return partitions[partition]
  361. }
  362. // MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
  363. type MockConsumerMetadataResponse struct {
  364. coordinators map[string]interface{}
  365. t TestReporter
  366. }
  367. func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
  368. return &MockConsumerMetadataResponse{
  369. coordinators: make(map[string]interface{}),
  370. t: t,
  371. }
  372. }
  373. func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
  374. mr.coordinators[group] = broker
  375. return mr
  376. }
  377. func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
  378. mr.coordinators[group] = kerror
  379. return mr
  380. }
  381. func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
  382. req := reqBody.(*ConsumerMetadataRequest)
  383. group := req.ConsumerGroup
  384. res := &ConsumerMetadataResponse{Version: req.version()}
  385. v := mr.coordinators[group]
  386. switch v := v.(type) {
  387. case *MockBroker:
  388. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  389. case KError:
  390. res.Err = v
  391. }
  392. return res
  393. }
  394. // MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
  395. type MockFindCoordinatorResponse struct {
  396. groupCoordinators map[string]interface{}
  397. transCoordinators map[string]interface{}
  398. t TestReporter
  399. }
  400. func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
  401. return &MockFindCoordinatorResponse{
  402. groupCoordinators: make(map[string]interface{}),
  403. transCoordinators: make(map[string]interface{}),
  404. t: t,
  405. }
  406. }
  407. func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
  408. switch coordinatorType {
  409. case CoordinatorGroup:
  410. mr.groupCoordinators[group] = broker
  411. case CoordinatorTransaction:
  412. mr.transCoordinators[group] = broker
  413. }
  414. return mr
  415. }
  416. func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
  417. switch coordinatorType {
  418. case CoordinatorGroup:
  419. mr.groupCoordinators[group] = kerror
  420. case CoordinatorTransaction:
  421. mr.transCoordinators[group] = kerror
  422. }
  423. return mr
  424. }
  425. func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
  426. req := reqBody.(*FindCoordinatorRequest)
  427. res := &FindCoordinatorResponse{Version: req.version()}
  428. var v interface{}
  429. switch req.CoordinatorType {
  430. case CoordinatorGroup:
  431. v = mr.groupCoordinators[req.CoordinatorKey]
  432. case CoordinatorTransaction:
  433. v = mr.transCoordinators[req.CoordinatorKey]
  434. }
  435. switch v := v.(type) {
  436. case *MockBroker:
  437. res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
  438. case KError:
  439. res.Err = v
  440. }
  441. return res
  442. }
  443. // MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
  444. type MockOffsetCommitResponse struct {
  445. errors map[string]map[string]map[int32]KError
  446. t TestReporter
  447. }
  448. func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
  449. return &MockOffsetCommitResponse{t: t}
  450. }
  451. func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
  452. if mr.errors == nil {
  453. mr.errors = make(map[string]map[string]map[int32]KError)
  454. }
  455. topics := mr.errors[group]
  456. if topics == nil {
  457. topics = make(map[string]map[int32]KError)
  458. mr.errors[group] = topics
  459. }
  460. partitions := topics[topic]
  461. if partitions == nil {
  462. partitions = make(map[int32]KError)
  463. topics[topic] = partitions
  464. }
  465. partitions[partition] = kerror
  466. return mr
  467. }
  468. func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
  469. req := reqBody.(*OffsetCommitRequest)
  470. group := req.ConsumerGroup
  471. res := &OffsetCommitResponse{Version: req.version()}
  472. for topic, partitions := range req.blocks {
  473. for partition := range partitions {
  474. res.AddError(topic, partition, mr.getError(group, topic, partition))
  475. }
  476. }
  477. return res
  478. }
  479. func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
  480. topics := mr.errors[group]
  481. if topics == nil {
  482. return ErrNoError
  483. }
  484. partitions := topics[topic]
  485. if partitions == nil {
  486. return ErrNoError
  487. }
  488. kerror, ok := partitions[partition]
  489. if !ok {
  490. return ErrNoError
  491. }
  492. return kerror
  493. }
  494. // MockProduceResponse is a `ProduceResponse` builder.
  495. type MockProduceResponse struct {
  496. version int16
  497. errors map[string]map[int32]KError
  498. t TestReporter
  499. }
  500. func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
  501. return &MockProduceResponse{t: t}
  502. }
  503. func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
  504. mr.version = version
  505. return mr
  506. }
  507. func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
  508. if mr.errors == nil {
  509. mr.errors = make(map[string]map[int32]KError)
  510. }
  511. partitions := mr.errors[topic]
  512. if partitions == nil {
  513. partitions = make(map[int32]KError)
  514. mr.errors[topic] = partitions
  515. }
  516. partitions[partition] = kerror
  517. return mr
  518. }
  519. func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
  520. req := reqBody.(*ProduceRequest)
  521. res := &ProduceResponse{
  522. Version: req.version(),
  523. }
  524. if mr.version > 0 {
  525. res.Version = mr.version
  526. }
  527. for topic, partitions := range req.records {
  528. for partition := range partitions {
  529. res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
  530. }
  531. }
  532. return res
  533. }
  534. func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
  535. partitions := mr.errors[topic]
  536. if partitions == nil {
  537. return ErrNoError
  538. }
  539. kerror, ok := partitions[partition]
  540. if !ok {
  541. return ErrNoError
  542. }
  543. return kerror
  544. }
  545. // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
  546. type MockOffsetFetchResponse struct {
  547. offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
  548. error KError
  549. t TestReporter
  550. }
  551. func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
  552. return &MockOffsetFetchResponse{t: t}
  553. }
  554. func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
  555. if mr.offsets == nil {
  556. mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
  557. }
  558. topics := mr.offsets[group]
  559. if topics == nil {
  560. topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
  561. mr.offsets[group] = topics
  562. }
  563. partitions := topics[topic]
  564. if partitions == nil {
  565. partitions = make(map[int32]*OffsetFetchResponseBlock)
  566. topics[topic] = partitions
  567. }
  568. partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
  569. return mr
  570. }
  571. func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
  572. mr.error = kerror
  573. return mr
  574. }
  575. func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
  576. req := reqBody.(*OffsetFetchRequest)
  577. group := req.ConsumerGroup
  578. res := &OffsetFetchResponse{Version: req.Version}
  579. for topic, partitions := range mr.offsets[group] {
  580. for partition, block := range partitions {
  581. res.AddBlock(topic, partition, block)
  582. }
  583. }
  584. if res.Version >= 2 {
  585. res.Err = mr.error
  586. }
  587. return res
  588. }
  589. type MockCreateTopicsResponse struct {
  590. t TestReporter
  591. }
  592. func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
  593. return &MockCreateTopicsResponse{t: t}
  594. }
  595. func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  596. req := reqBody.(*CreateTopicsRequest)
  597. res := &CreateTopicsResponse{
  598. Version: req.Version,
  599. }
  600. res.TopicErrors = make(map[string]*TopicError)
  601. for topic := range req.TopicDetails {
  602. if res.Version >= 1 && strings.HasPrefix(topic, "_") {
  603. msg := "insufficient permissions to create topic with reserved prefix"
  604. res.TopicErrors[topic] = &TopicError{
  605. Err: ErrTopicAuthorizationFailed,
  606. ErrMsg: &msg,
  607. }
  608. continue
  609. }
  610. res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
  611. }
  612. return res
  613. }
  614. type MockDeleteTopicsResponse struct {
  615. t TestReporter
  616. error KError
  617. }
  618. func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
  619. return &MockDeleteTopicsResponse{t: t}
  620. }
  621. func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  622. req := reqBody.(*DeleteTopicsRequest)
  623. res := &DeleteTopicsResponse{Version: req.version()}
  624. res.TopicErrorCodes = make(map[string]KError)
  625. for _, topic := range req.Topics {
  626. res.TopicErrorCodes[topic] = mr.error
  627. }
  628. res.Version = req.Version
  629. return res
  630. }
  631. func (mr *MockDeleteTopicsResponse) SetError(kerror KError) *MockDeleteTopicsResponse {
  632. mr.error = kerror
  633. return mr
  634. }
  635. type MockCreatePartitionsResponse struct {
  636. t TestReporter
  637. }
  638. func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
  639. return &MockCreatePartitionsResponse{t: t}
  640. }
  641. func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  642. req := reqBody.(*CreatePartitionsRequest)
  643. res := &CreatePartitionsResponse{Version: req.version()}
  644. res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
  645. for topic := range req.TopicPartitions {
  646. if strings.HasPrefix(topic, "_") {
  647. msg := "insufficient permissions to create partition on topic with reserved prefix"
  648. res.TopicPartitionErrors[topic] = &TopicPartitionError{
  649. Err: ErrTopicAuthorizationFailed,
  650. ErrMsg: &msg,
  651. }
  652. continue
  653. }
  654. res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
  655. }
  656. return res
  657. }
  658. type MockAlterPartitionReassignmentsResponse struct {
  659. t TestReporter
  660. }
  661. func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
  662. return &MockAlterPartitionReassignmentsResponse{t: t}
  663. }
  664. func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  665. req := reqBody.(*AlterPartitionReassignmentsRequest)
  666. _ = req
  667. res := &AlterPartitionReassignmentsResponse{Version: req.version()}
  668. return res
  669. }
  670. type MockListPartitionReassignmentsResponse struct {
  671. t TestReporter
  672. }
  673. func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
  674. return &MockListPartitionReassignmentsResponse{t: t}
  675. }
  676. func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  677. req := reqBody.(*ListPartitionReassignmentsRequest)
  678. _ = req
  679. res := &ListPartitionReassignmentsResponse{Version: req.version()}
  680. for topic, partitions := range req.blocks {
  681. for _, partition := range partitions {
  682. res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
  683. }
  684. }
  685. return res
  686. }
  687. type MockDeleteRecordsResponse struct {
  688. t TestReporter
  689. }
  690. func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
  691. return &MockDeleteRecordsResponse{t: t}
  692. }
  693. func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  694. req := reqBody.(*DeleteRecordsRequest)
  695. res := &DeleteRecordsResponse{Version: req.version()}
  696. res.Topics = make(map[string]*DeleteRecordsResponseTopic)
  697. for topic, deleteRecordRequestTopic := range req.Topics {
  698. partitions := make(map[int32]*DeleteRecordsResponsePartition)
  699. for partition := range deleteRecordRequestTopic.PartitionOffsets {
  700. partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
  701. }
  702. res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
  703. }
  704. return res
  705. }
  706. type MockDescribeConfigsResponse struct {
  707. t TestReporter
  708. }
  709. func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
  710. return &MockDescribeConfigsResponse{t: t}
  711. }
  712. func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  713. req := reqBody.(*DescribeConfigsRequest)
  714. res := &DescribeConfigsResponse{
  715. Version: req.Version,
  716. }
  717. includeSynonyms := req.Version > 0
  718. includeSource := req.Version > 0
  719. for _, r := range req.Resources {
  720. var configEntries []*ConfigEntry
  721. switch r.Type {
  722. case BrokerResource:
  723. configEntries = append(configEntries,
  724. &ConfigEntry{
  725. Name: "min.insync.replicas",
  726. Value: "2",
  727. ReadOnly: false,
  728. Default: false,
  729. },
  730. )
  731. res.Resources = append(res.Resources, &ResourceResponse{
  732. Name: r.Name,
  733. Configs: configEntries,
  734. })
  735. case BrokerLoggerResource:
  736. configEntries = append(configEntries,
  737. &ConfigEntry{
  738. Name: "kafka.controller.KafkaController",
  739. Value: "DEBUG",
  740. ReadOnly: false,
  741. Default: false,
  742. },
  743. )
  744. res.Resources = append(res.Resources, &ResourceResponse{
  745. Name: r.Name,
  746. Configs: configEntries,
  747. })
  748. case TopicResource:
  749. maxMessageBytes := &ConfigEntry{
  750. Name: "max.message.bytes",
  751. Value: "1000000",
  752. ReadOnly: false,
  753. Default: !includeSource,
  754. Sensitive: false,
  755. }
  756. if includeSource {
  757. maxMessageBytes.Source = SourceDefault
  758. }
  759. if includeSynonyms {
  760. maxMessageBytes.Synonyms = []*ConfigSynonym{
  761. {
  762. ConfigName: "max.message.bytes",
  763. ConfigValue: "500000",
  764. },
  765. }
  766. }
  767. retentionMs := &ConfigEntry{
  768. Name: "retention.ms",
  769. Value: "5000",
  770. ReadOnly: false,
  771. Default: false,
  772. Sensitive: false,
  773. }
  774. if includeSynonyms {
  775. retentionMs.Synonyms = []*ConfigSynonym{
  776. {
  777. ConfigName: "log.retention.ms",
  778. ConfigValue: "2500",
  779. },
  780. }
  781. }
  782. password := &ConfigEntry{
  783. Name: "password",
  784. Value: "12345",
  785. ReadOnly: false,
  786. Default: false,
  787. Sensitive: true,
  788. }
  789. configEntries = append(
  790. configEntries, maxMessageBytes, retentionMs, password)
  791. res.Resources = append(res.Resources, &ResourceResponse{
  792. Name: r.Name,
  793. Configs: configEntries,
  794. })
  795. }
  796. }
  797. return res
  798. }
  799. type MockDescribeConfigsResponseWithErrorCode struct {
  800. t TestReporter
  801. }
  802. func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
  803. return &MockDescribeConfigsResponseWithErrorCode{t: t}
  804. }
  805. func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
  806. req := reqBody.(*DescribeConfigsRequest)
  807. res := &DescribeConfigsResponse{
  808. Version: req.Version,
  809. }
  810. for _, r := range req.Resources {
  811. res.Resources = append(res.Resources, &ResourceResponse{
  812. Name: r.Name,
  813. Type: r.Type,
  814. ErrorCode: 83,
  815. ErrorMsg: "",
  816. })
  817. }
  818. return res
  819. }
  820. type MockAlterConfigsResponse struct {
  821. t TestReporter
  822. }
  823. func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
  824. return &MockAlterConfigsResponse{t: t}
  825. }
  826. func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  827. req := reqBody.(*AlterConfigsRequest)
  828. res := &AlterConfigsResponse{Version: req.version()}
  829. for _, r := range req.Resources {
  830. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
  831. Name: r.Name,
  832. Type: r.Type,
  833. ErrorMsg: "",
  834. })
  835. }
  836. return res
  837. }
  838. type MockAlterConfigsResponseWithErrorCode struct {
  839. t TestReporter
  840. }
  841. func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
  842. return &MockAlterConfigsResponseWithErrorCode{t: t}
  843. }
  844. func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
  845. req := reqBody.(*AlterConfigsRequest)
  846. res := &AlterConfigsResponse{Version: req.version()}
  847. for _, r := range req.Resources {
  848. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
  849. Name: r.Name,
  850. Type: r.Type,
  851. ErrorCode: 83,
  852. ErrorMsg: "",
  853. })
  854. }
  855. return res
  856. }
  857. type MockIncrementalAlterConfigsResponse struct {
  858. t TestReporter
  859. }
  860. func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse {
  861. return &MockIncrementalAlterConfigsResponse{t: t}
  862. }
  863. func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  864. req := reqBody.(*IncrementalAlterConfigsRequest)
  865. res := &IncrementalAlterConfigsResponse{Version: req.version()}
  866. for _, r := range req.Resources {
  867. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
  868. Name: r.Name,
  869. Type: r.Type,
  870. ErrorMsg: "",
  871. })
  872. }
  873. return res
  874. }
  875. type MockIncrementalAlterConfigsResponseWithErrorCode struct {
  876. t TestReporter
  877. }
  878. func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode {
  879. return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t}
  880. }
  881. func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
  882. req := reqBody.(*IncrementalAlterConfigsRequest)
  883. res := &IncrementalAlterConfigsResponse{Version: req.version()}
  884. for _, r := range req.Resources {
  885. res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
  886. Name: r.Name,
  887. Type: r.Type,
  888. ErrorCode: 83,
  889. ErrorMsg: "",
  890. })
  891. }
  892. return res
  893. }
  894. type MockCreateAclsResponse struct {
  895. t TestReporter
  896. }
  897. func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
  898. return &MockCreateAclsResponse{t: t}
  899. }
  900. func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  901. req := reqBody.(*CreateAclsRequest)
  902. res := &CreateAclsResponse{Version: req.version()}
  903. for range req.AclCreations {
  904. res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
  905. }
  906. return res
  907. }
  908. type MockCreateAclsResponseError struct {
  909. t TestReporter
  910. }
  911. func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError {
  912. return &MockCreateAclsResponseError{t: t}
  913. }
  914. func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader {
  915. req := reqBody.(*CreateAclsRequest)
  916. res := &CreateAclsResponse{Version: req.version()}
  917. for range req.AclCreations {
  918. res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrInvalidRequest})
  919. }
  920. return res
  921. }
  922. type MockListAclsResponse struct {
  923. t TestReporter
  924. }
  925. func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
  926. return &MockListAclsResponse{t: t}
  927. }
  928. func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  929. req := reqBody.(*DescribeAclsRequest)
  930. res := &DescribeAclsResponse{Version: req.version()}
  931. res.Err = ErrNoError
  932. acl := &ResourceAcls{}
  933. if req.ResourceName != nil {
  934. acl.Resource.ResourceName = *req.ResourceName
  935. }
  936. acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
  937. acl.Resource.ResourceType = req.ResourceType
  938. host := "*"
  939. if req.Host != nil {
  940. host = *req.Host
  941. }
  942. principal := "User:test"
  943. if req.Principal != nil {
  944. principal = *req.Principal
  945. }
  946. permissionType := req.PermissionType
  947. if permissionType == AclPermissionAny {
  948. permissionType = AclPermissionAllow
  949. }
  950. acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
  951. res.ResourceAcls = append(res.ResourceAcls, acl)
  952. res.Version = int16(req.Version)
  953. return res
  954. }
  955. type MockSaslAuthenticateResponse struct {
  956. t TestReporter
  957. kerror KError
  958. saslAuthBytes []byte
  959. sessionLifetimeMs int64
  960. }
  961. func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
  962. return &MockSaslAuthenticateResponse{t: t}
  963. }
  964. func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
  965. req := reqBody.(*SaslAuthenticateRequest)
  966. res := &SaslAuthenticateResponse{
  967. Version: req.version(),
  968. Err: msar.kerror,
  969. SaslAuthBytes: msar.saslAuthBytes,
  970. SessionLifetimeMs: msar.sessionLifetimeMs,
  971. }
  972. return res
  973. }
  974. func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
  975. msar.kerror = kerror
  976. return msar
  977. }
  978. func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
  979. msar.saslAuthBytes = saslAuthBytes
  980. return msar
  981. }
  982. func (msar *MockSaslAuthenticateResponse) SetSessionLifetimeMs(sessionLifetimeMs int64) *MockSaslAuthenticateResponse {
  983. msar.sessionLifetimeMs = sessionLifetimeMs
  984. return msar
  985. }
  986. type MockDeleteAclsResponse struct {
  987. t TestReporter
  988. }
  989. type MockSaslHandshakeResponse struct {
  990. enabledMechanisms []string
  991. kerror KError
  992. t TestReporter
  993. }
  994. func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
  995. return &MockSaslHandshakeResponse{t: t}
  996. }
  997. func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
  998. req := reqBody.(*SaslHandshakeRequest)
  999. res := &SaslHandshakeResponse{Version: req.version()}
  1000. res.Err = mshr.kerror
  1001. res.EnabledMechanisms = mshr.enabledMechanisms
  1002. return res
  1003. }
  1004. func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
  1005. mshr.kerror = kerror
  1006. return mshr
  1007. }
  1008. func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
  1009. mshr.enabledMechanisms = enabledMechanisms
  1010. return mshr
  1011. }
  1012. func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
  1013. return &MockDeleteAclsResponse{t: t}
  1014. }
  1015. func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1016. req := reqBody.(*DeleteAclsRequest)
  1017. res := &DeleteAclsResponse{Version: req.version()}
  1018. for range req.Filters {
  1019. response := &FilterResponse{Err: ErrNoError}
  1020. response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
  1021. res.FilterResponses = append(res.FilterResponses, response)
  1022. }
  1023. res.Version = int16(req.Version)
  1024. return res
  1025. }
  1026. type MockDeleteGroupsResponse struct {
  1027. deletedGroups []string
  1028. }
  1029. func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
  1030. return &MockDeleteGroupsResponse{}
  1031. }
  1032. func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
  1033. m.deletedGroups = groups
  1034. return m
  1035. }
  1036. func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1037. req := reqBody.(*DeleteGroupsRequest)
  1038. resp := &DeleteGroupsResponse{
  1039. Version: req.version(),
  1040. GroupErrorCodes: map[string]KError{},
  1041. }
  1042. for _, group := range m.deletedGroups {
  1043. resp.GroupErrorCodes[group] = ErrNoError
  1044. }
  1045. return resp
  1046. }
  1047. type MockDeleteOffsetResponse struct {
  1048. errorCode KError
  1049. topic string
  1050. partition int32
  1051. errorPartition KError
  1052. }
  1053. func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {
  1054. return &MockDeleteOffsetResponse{}
  1055. }
  1056. func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {
  1057. m.errorCode = errorCode
  1058. m.topic = topic
  1059. m.partition = partition
  1060. m.errorPartition = errorPartition
  1061. return m
  1062. }
  1063. func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1064. req := reqBody.(*DeleteOffsetsRequest)
  1065. resp := &DeleteOffsetsResponse{
  1066. Version: req.version(),
  1067. ErrorCode: m.errorCode,
  1068. Errors: map[string]map[int32]KError{
  1069. m.topic: {m.partition: m.errorPartition},
  1070. },
  1071. }
  1072. return resp
  1073. }
  1074. type MockJoinGroupResponse struct {
  1075. t TestReporter
  1076. ThrottleTime int32
  1077. Err KError
  1078. GenerationId int32
  1079. GroupProtocol string
  1080. LeaderId string
  1081. MemberId string
  1082. Members []GroupMember
  1083. }
  1084. func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
  1085. return &MockJoinGroupResponse{
  1086. t: t,
  1087. Members: make([]GroupMember, 0),
  1088. }
  1089. }
  1090. func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1091. req := reqBody.(*JoinGroupRequest)
  1092. resp := &JoinGroupResponse{
  1093. Version: req.Version,
  1094. ThrottleTime: m.ThrottleTime,
  1095. Err: m.Err,
  1096. GenerationId: m.GenerationId,
  1097. GroupProtocol: m.GroupProtocol,
  1098. LeaderId: m.LeaderId,
  1099. MemberId: m.MemberId,
  1100. Members: m.Members,
  1101. }
  1102. return resp
  1103. }
  1104. func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
  1105. m.ThrottleTime = t
  1106. return m
  1107. }
  1108. func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
  1109. m.Err = kerr
  1110. return m
  1111. }
  1112. func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
  1113. m.GenerationId = id
  1114. return m
  1115. }
  1116. func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
  1117. m.GroupProtocol = proto
  1118. return m
  1119. }
  1120. func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
  1121. m.LeaderId = id
  1122. return m
  1123. }
  1124. func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
  1125. m.MemberId = id
  1126. return m
  1127. }
  1128. func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
  1129. bin, err := encode(meta, nil)
  1130. if err != nil {
  1131. panic(fmt.Sprintf("error encoding member metadata: %v", err))
  1132. }
  1133. m.Members = append(m.Members, GroupMember{MemberId: id, Metadata: bin})
  1134. return m
  1135. }
  1136. type MockLeaveGroupResponse struct {
  1137. t TestReporter
  1138. Err KError
  1139. }
  1140. func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
  1141. return &MockLeaveGroupResponse{t: t}
  1142. }
  1143. func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1144. req := reqBody.(*LeaveGroupRequest)
  1145. resp := &LeaveGroupResponse{
  1146. Version: req.version(),
  1147. Err: m.Err,
  1148. }
  1149. return resp
  1150. }
  1151. func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
  1152. m.Err = kerr
  1153. return m
  1154. }
  1155. type MockSyncGroupResponse struct {
  1156. t TestReporter
  1157. Err KError
  1158. MemberAssignment []byte
  1159. }
  1160. func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
  1161. return &MockSyncGroupResponse{t: t}
  1162. }
  1163. func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1164. req := reqBody.(*SyncGroupRequest)
  1165. resp := &SyncGroupResponse{
  1166. Version: req.version(),
  1167. Err: m.Err,
  1168. MemberAssignment: m.MemberAssignment,
  1169. }
  1170. return resp
  1171. }
  1172. func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
  1173. m.Err = kerr
  1174. return m
  1175. }
  1176. func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
  1177. bin, err := encode(assignment, nil)
  1178. if err != nil {
  1179. panic(fmt.Sprintf("error encoding member assignment: %v", err))
  1180. }
  1181. m.MemberAssignment = bin
  1182. return m
  1183. }
  1184. type MockHeartbeatResponse struct {
  1185. t TestReporter
  1186. Err KError
  1187. }
  1188. func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
  1189. return &MockHeartbeatResponse{t: t}
  1190. }
  1191. func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1192. req := reqBody.(*HeartbeatRequest)
  1193. resp := &HeartbeatResponse{
  1194. Version: req.version(),
  1195. }
  1196. return resp
  1197. }
  1198. func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
  1199. m.Err = kerr
  1200. return m
  1201. }
  1202. type MockDescribeLogDirsResponse struct {
  1203. t TestReporter
  1204. logDirs []DescribeLogDirsResponseDirMetadata
  1205. }
  1206. func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
  1207. return &MockDescribeLogDirsResponse{t: t}
  1208. }
  1209. func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
  1210. var topics []DescribeLogDirsResponseTopic
  1211. for topic := range topicPartitions {
  1212. var partitions []DescribeLogDirsResponsePartition
  1213. for i := 0; i < topicPartitions[topic]; i++ {
  1214. partitions = append(partitions, DescribeLogDirsResponsePartition{
  1215. PartitionID: int32(i),
  1216. IsTemporary: false,
  1217. OffsetLag: int64(0),
  1218. Size: int64(1234),
  1219. })
  1220. }
  1221. topics = append(topics, DescribeLogDirsResponseTopic{
  1222. Topic: topic,
  1223. Partitions: partitions,
  1224. })
  1225. }
  1226. logDir := DescribeLogDirsResponseDirMetadata{
  1227. ErrorCode: ErrNoError,
  1228. Path: logDirPath,
  1229. Topics: topics,
  1230. }
  1231. m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
  1232. return m
  1233. }
  1234. func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1235. req := reqBody.(*DescribeLogDirsRequest)
  1236. resp := &DescribeLogDirsResponse{
  1237. Version: req.version(),
  1238. LogDirs: m.logDirs,
  1239. }
  1240. return resp
  1241. }
  1242. type MockApiVersionsResponse struct {
  1243. t TestReporter
  1244. apiKeys []ApiVersionsResponseKey
  1245. }
  1246. func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {
  1247. return &MockApiVersionsResponse{
  1248. t: t,
  1249. apiKeys: []ApiVersionsResponseKey{
  1250. {
  1251. ApiKey: 0,
  1252. MinVersion: 5,
  1253. MaxVersion: 8,
  1254. },
  1255. {
  1256. ApiKey: 1,
  1257. MinVersion: 7,
  1258. MaxVersion: 11,
  1259. },
  1260. },
  1261. }
  1262. }
  1263. func (m *MockApiVersionsResponse) SetApiKeys(apiKeys []ApiVersionsResponseKey) *MockApiVersionsResponse {
  1264. m.apiKeys = apiKeys
  1265. return m
  1266. }
  1267. func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1268. req := reqBody.(*ApiVersionsRequest)
  1269. res := &ApiVersionsResponse{
  1270. Version: req.Version,
  1271. ApiKeys: m.apiKeys,
  1272. }
  1273. return res
  1274. }
  1275. // MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.
  1276. type MockInitProducerIDResponse struct {
  1277. producerID int64
  1278. producerEpoch int16
  1279. err KError
  1280. t TestReporter
  1281. }
  1282. func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse {
  1283. return &MockInitProducerIDResponse{
  1284. t: t,
  1285. }
  1286. }
  1287. func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse {
  1288. m.producerID = int64(id)
  1289. return m
  1290. }
  1291. func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse {
  1292. m.producerEpoch = int16(epoch)
  1293. return m
  1294. }
  1295. func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse {
  1296. m.err = err
  1297. return m
  1298. }
  1299. func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader {
  1300. req := reqBody.(*InitProducerIDRequest)
  1301. res := &InitProducerIDResponse{
  1302. Version: req.Version,
  1303. Err: m.err,
  1304. ProducerID: m.producerID,
  1305. ProducerEpoch: m.producerEpoch,
  1306. }
  1307. return res
  1308. }