admin.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271
  1. package sarama
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "strconv"
  7. "sync"
  8. "time"
  9. )
  10. // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
  11. // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
  12. // Methods with stricter requirements will specify the minimum broker version required.
  13. // You MUST call Close() on a client to avoid leaks
  14. type ClusterAdmin interface {
  15. // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
  16. // It may take several seconds after CreateTopic returns success for all the brokers
  17. // to become aware that the topic has been created. During this time, listTopics
  18. // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
  19. CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
  20. // List the topics available in the cluster with the default options.
  21. ListTopics() (map[string]TopicDetail, error)
  22. // Describe some topics in the cluster.
  23. DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
  24. // Delete a topic. It may take several seconds after the DeleteTopic to returns success
  25. // and for all the brokers to become aware that the topics are gone.
  26. // During this time, listTopics may continue to return information about the deleted topic.
  27. // If delete.topic.enable is false on the brokers, deleteTopic will mark
  28. // the topic for deletion, but not actually delete them.
  29. // This operation is supported by brokers with version 0.10.1.0 or higher.
  30. DeleteTopic(topic string) error
  31. // Increase the number of partitions of the topics according to the corresponding values.
  32. // If partitions are increased for a topic that has a key, the partition logic or ordering of
  33. // the messages will be affected. It may take several seconds after this method returns
  34. // success for all the brokers to become aware that the partitions have been created.
  35. // During this time, ClusterAdmin#describeTopics may not return information about the
  36. // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
  37. CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
  38. // Alter the replica assignment for partitions.
  39. // This operation is supported by brokers with version 2.4.0.0 or higher.
  40. AlterPartitionReassignments(topic string, assignment [][]int32) error
  41. // Provides info on ongoing partitions replica reassignments.
  42. // This operation is supported by brokers with version 2.4.0.0 or higher.
  43. ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
  44. // Delete records whose offset is smaller than the given offset of the corresponding partition.
  45. // This operation is supported by brokers with version 0.11.0.0 or higher.
  46. DeleteRecords(topic string, partitionOffsets map[int32]int64) error
  47. // Get the configuration for the specified resources.
  48. // The returned configuration includes default values and the Default is true
  49. // can be used to distinguish them from user supplied values.
  50. // Config entries where ReadOnly is true cannot be updated.
  51. // The value of config entries where Sensitive is true is always nil so
  52. // sensitive information is not disclosed.
  53. // This operation is supported by brokers with version 0.11.0.0 or higher.
  54. DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
  55. // Update the configuration for the specified resources with the default options.
  56. // This operation is supported by brokers with version 0.11.0.0 or higher.
  57. // The resources with their configs (topic is the only resource type with configs
  58. // that can be updated currently Updates are not transactional so they may succeed
  59. // for some resources while fail for others. The configs for a particular resource are updated automatically.
  60. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
  61. // IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
  62. // This operation is supported by brokers with version 2.3.0.0 or higher.
  63. // Updates are not transactional so they may succeed for some resources while fail for others.
  64. // The configs for a particular resource are updated automatically.
  65. IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
  66. // Creates an access control list (ACL) which is bound to a specific resource.
  67. // This operation is not transactional so it may succeed or fail.
  68. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  69. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  70. // Deprecated: Use CreateACLs instead.
  71. CreateACL(resource Resource, acl Acl) error
  72. // Creates access control lists (ACLs) which are bound to specific resources.
  73. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  74. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
  75. // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
  76. CreateACLs([]*ResourceAcls) error
  77. // Lists access control lists (ACLs) according to the supplied filter.
  78. // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
  79. // This operation is supported by brokers with version 0.11.0.0 or higher.
  80. ListAcls(filter AclFilter) ([]ResourceAcls, error)
  81. // Deletes access control lists (ACLs) according to the supplied filters.
  82. // This operation is not transactional so it may succeed for some ACLs while fail for others.
  83. // This operation is supported by brokers with version 0.11.0.0 or higher.
  84. DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
  85. // List the consumer groups available in the cluster.
  86. ListConsumerGroups() (map[string]string, error)
  87. // Describe the given consumer groups.
  88. DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
  89. // List the consumer group offsets available in the cluster.
  90. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
  91. // Deletes a consumer group offset
  92. DeleteConsumerGroupOffset(group string, topic string, partition int32) error
  93. // Delete a consumer group.
  94. DeleteConsumerGroup(group string) error
  95. // Get information about the nodes in the cluster
  96. DescribeCluster() (brokers []*Broker, controllerID int32, err error)
  97. // Get information about all log directories on the given set of brokers
  98. DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
  99. // Get information about SCRAM users
  100. DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
  101. // Delete SCRAM users
  102. DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
  103. // Upsert SCRAM users
  104. UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
  105. // Get client quota configurations corresponding to the specified filter.
  106. // This operation is supported by brokers with version 2.6.0.0 or higher.
  107. DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
  108. // Alters client quota configurations with the specified alterations.
  109. // This operation is supported by brokers with version 2.6.0.0 or higher.
  110. AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
  111. // Controller returns the cluster controller broker. It will return a
  112. // locally cached value if it's available.
  113. Controller() (*Broker, error)
  114. // Remove members from the consumer group by given member identities.
  115. // This operation is supported by brokers with version 2.3 or higher
  116. // This is for static membership feature. KIP-345
  117. RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)
  118. // Close shuts down the admin and closes underlying client.
  119. Close() error
  120. }
  121. type clusterAdmin struct {
  122. client Client
  123. conf *Config
  124. }
  125. // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
  126. func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
  127. client, err := NewClient(addrs, conf)
  128. if err != nil {
  129. return nil, err
  130. }
  131. admin, err := NewClusterAdminFromClient(client)
  132. if err != nil {
  133. client.Close()
  134. }
  135. return admin, err
  136. }
  137. // NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
  138. // Note that underlying client will also be closed on admin's Close() call.
  139. func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
  140. // make sure we can retrieve the controller
  141. _, err := client.Controller()
  142. if err != nil {
  143. return nil, err
  144. }
  145. ca := &clusterAdmin{
  146. client: client,
  147. conf: client.Config(),
  148. }
  149. return ca, nil
  150. }
  151. func (ca *clusterAdmin) Close() error {
  152. return ca.client.Close()
  153. }
  154. func (ca *clusterAdmin) Controller() (*Broker, error) {
  155. return ca.client.Controller()
  156. }
  157. func (ca *clusterAdmin) refreshController() (*Broker, error) {
  158. return ca.client.RefreshController()
  159. }
  160. // isErrNotController returns `true` if the given error type unwraps to an
  161. // `ErrNotController` response from Kafka
  162. func isErrNotController(err error) bool {
  163. return errors.Is(err, ErrNotController)
  164. }
  165. // retryOnError will repeatedly call the given (error-returning) func in the
  166. // case that its response is non-nil and retryable (as determined by the
  167. // provided retryable func) up to the maximum number of tries permitted by
  168. // the admin client configuration
  169. func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
  170. for attemptsRemaining := ca.conf.Admin.Retry.Max + 1; ; {
  171. err := fn()
  172. attemptsRemaining--
  173. if err == nil || attemptsRemaining <= 0 || !retryable(err) {
  174. return err
  175. }
  176. Logger.Printf(
  177. "admin/request retrying after %dms... (%d attempts remaining)\n",
  178. ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)
  179. time.Sleep(ca.conf.Admin.Retry.Backoff)
  180. }
  181. }
  182. func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
  183. if topic == "" {
  184. return ErrInvalidTopic
  185. }
  186. if detail == nil {
  187. return errors.New("you must specify topic details")
  188. }
  189. topicDetails := make(map[string]*TopicDetail)
  190. topicDetails[topic] = detail
  191. request := &CreateTopicsRequest{
  192. TopicDetails: topicDetails,
  193. ValidateOnly: validateOnly,
  194. Timeout: ca.conf.Admin.Timeout,
  195. }
  196. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  197. // Version 3 is the same as version 2 (brokers response before throttling)
  198. request.Version = 3
  199. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  200. // Version 2 is the same as version 1 (response has ThrottleTime)
  201. request.Version = 2
  202. } else if ca.conf.Version.IsAtLeast(V0_10_2_0) {
  203. // Version 1 adds validateOnly.
  204. request.Version = 1
  205. }
  206. return ca.retryOnError(isErrNotController, func() error {
  207. b, err := ca.Controller()
  208. if err != nil {
  209. return err
  210. }
  211. rsp, err := b.CreateTopics(request)
  212. if err != nil {
  213. return err
  214. }
  215. topicErr, ok := rsp.TopicErrors[topic]
  216. if !ok {
  217. return ErrIncompleteResponse
  218. }
  219. if !errors.Is(topicErr.Err, ErrNoError) {
  220. if errors.Is(topicErr.Err, ErrNotController) {
  221. _, _ = ca.refreshController()
  222. }
  223. return topicErr
  224. }
  225. return nil
  226. })
  227. }
  228. func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
  229. var response *MetadataResponse
  230. err = ca.retryOnError(isErrNotController, func() error {
  231. controller, err := ca.Controller()
  232. if err != nil {
  233. return err
  234. }
  235. request := NewMetadataRequest(ca.conf.Version, topics)
  236. response, err = controller.GetMetadata(request)
  237. if isErrNotController(err) {
  238. _, _ = ca.refreshController()
  239. }
  240. return err
  241. })
  242. if err != nil {
  243. return nil, err
  244. }
  245. return response.Topics, nil
  246. }
  247. func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
  248. var response *MetadataResponse
  249. err = ca.retryOnError(isErrNotController, func() error {
  250. controller, err := ca.Controller()
  251. if err != nil {
  252. return err
  253. }
  254. request := NewMetadataRequest(ca.conf.Version, nil)
  255. response, err = controller.GetMetadata(request)
  256. if isErrNotController(err) {
  257. _, _ = ca.refreshController()
  258. }
  259. return err
  260. })
  261. if err != nil {
  262. return nil, int32(0), err
  263. }
  264. return response.Brokers, response.ControllerID, nil
  265. }
  266. func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
  267. brokers := ca.client.Brokers()
  268. for _, b := range brokers {
  269. if b.ID() == id {
  270. return b, nil
  271. }
  272. }
  273. return nil, fmt.Errorf("could not find broker id %d", id)
  274. }
  275. func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
  276. brokers := ca.client.Brokers()
  277. if len(brokers) > 0 {
  278. index := rand.Intn(len(brokers))
  279. return brokers[index], nil
  280. }
  281. return nil, errors.New("no available broker")
  282. }
  283. func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
  284. // In order to build TopicDetails we need to first get the list of all
  285. // topics using a MetadataRequest and then get their configs using a
  286. // DescribeConfigsRequest request. To avoid sending many requests to the
  287. // broker, we use a single DescribeConfigsRequest.
  288. // Send the all-topic MetadataRequest
  289. b, err := ca.findAnyBroker()
  290. if err != nil {
  291. return nil, err
  292. }
  293. _ = b.Open(ca.client.Config())
  294. metadataReq := NewMetadataRequest(ca.conf.Version, nil)
  295. metadataResp, err := b.GetMetadata(metadataReq)
  296. if err != nil {
  297. return nil, err
  298. }
  299. topicsDetailsMap := make(map[string]TopicDetail)
  300. var describeConfigsResources []*ConfigResource
  301. for _, topic := range metadataResp.Topics {
  302. topicDetails := TopicDetail{
  303. NumPartitions: int32(len(topic.Partitions)),
  304. }
  305. if len(topic.Partitions) > 0 {
  306. topicDetails.ReplicaAssignment = map[int32][]int32{}
  307. for _, partition := range topic.Partitions {
  308. topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
  309. }
  310. topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
  311. }
  312. topicsDetailsMap[topic.Name] = topicDetails
  313. // we populate the resources we want to describe from the MetadataResponse
  314. topicResource := ConfigResource{
  315. Type: TopicResource,
  316. Name: topic.Name,
  317. }
  318. describeConfigsResources = append(describeConfigsResources, &topicResource)
  319. }
  320. // Send the DescribeConfigsRequest
  321. describeConfigsReq := &DescribeConfigsRequest{
  322. Resources: describeConfigsResources,
  323. }
  324. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  325. describeConfigsReq.Version = 1
  326. }
  327. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  328. describeConfigsReq.Version = 2
  329. }
  330. describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
  331. if err != nil {
  332. return nil, err
  333. }
  334. for _, resource := range describeConfigsResp.Resources {
  335. topicDetails := topicsDetailsMap[resource.Name]
  336. topicDetails.ConfigEntries = make(map[string]*string)
  337. for _, entry := range resource.Configs {
  338. entry := entry
  339. // only include non-default non-sensitive config
  340. // (don't actually think topic config will ever be sensitive)
  341. if entry.Default || entry.Sensitive {
  342. continue
  343. }
  344. topicDetails.ConfigEntries[entry.Name] = &entry.Value
  345. }
  346. topicsDetailsMap[resource.Name] = topicDetails
  347. }
  348. return topicsDetailsMap, nil
  349. }
  350. func (ca *clusterAdmin) DeleteTopic(topic string) error {
  351. if topic == "" {
  352. return ErrInvalidTopic
  353. }
  354. request := &DeleteTopicsRequest{
  355. Topics: []string{topic},
  356. Timeout: ca.conf.Admin.Timeout,
  357. }
  358. // Versions 0, 1, 2, and 3 are the same.
  359. if ca.conf.Version.IsAtLeast(V2_1_0_0) {
  360. request.Version = 3
  361. } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  362. request.Version = 2
  363. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  364. request.Version = 1
  365. }
  366. return ca.retryOnError(isErrNotController, func() error {
  367. b, err := ca.Controller()
  368. if err != nil {
  369. return err
  370. }
  371. rsp, err := b.DeleteTopics(request)
  372. if err != nil {
  373. return err
  374. }
  375. topicErr, ok := rsp.TopicErrorCodes[topic]
  376. if !ok {
  377. return ErrIncompleteResponse
  378. }
  379. if !errors.Is(topicErr, ErrNoError) {
  380. if errors.Is(topicErr, ErrNotController) {
  381. _, _ = ca.refreshController()
  382. }
  383. return topicErr
  384. }
  385. return nil
  386. })
  387. }
  388. func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
  389. if topic == "" {
  390. return ErrInvalidTopic
  391. }
  392. topicPartitions := make(map[string]*TopicPartition)
  393. topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
  394. request := &CreatePartitionsRequest{
  395. TopicPartitions: topicPartitions,
  396. Timeout: ca.conf.Admin.Timeout,
  397. ValidateOnly: validateOnly,
  398. }
  399. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  400. request.Version = 1
  401. }
  402. return ca.retryOnError(isErrNotController, func() error {
  403. b, err := ca.Controller()
  404. if err != nil {
  405. return err
  406. }
  407. rsp, err := b.CreatePartitions(request)
  408. if err != nil {
  409. return err
  410. }
  411. topicErr, ok := rsp.TopicPartitionErrors[topic]
  412. if !ok {
  413. return ErrIncompleteResponse
  414. }
  415. if !errors.Is(topicErr.Err, ErrNoError) {
  416. if errors.Is(topicErr.Err, ErrNotController) {
  417. _, _ = ca.refreshController()
  418. }
  419. return topicErr
  420. }
  421. return nil
  422. })
  423. }
  424. func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
  425. if topic == "" {
  426. return ErrInvalidTopic
  427. }
  428. request := &AlterPartitionReassignmentsRequest{
  429. TimeoutMs: int32(60000),
  430. Version: int16(0),
  431. }
  432. for i := 0; i < len(assignment); i++ {
  433. request.AddBlock(topic, int32(i), assignment[i])
  434. }
  435. return ca.retryOnError(isErrNotController, func() error {
  436. b, err := ca.Controller()
  437. if err != nil {
  438. return err
  439. }
  440. errs := make([]error, 0)
  441. rsp, err := b.AlterPartitionReassignments(request)
  442. if err != nil {
  443. errs = append(errs, err)
  444. } else {
  445. if rsp.ErrorCode > 0 {
  446. errs = append(errs, rsp.ErrorCode)
  447. }
  448. for topic, topicErrors := range rsp.Errors {
  449. for partition, partitionError := range topicErrors {
  450. if !errors.Is(partitionError.errorCode, ErrNoError) {
  451. errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
  452. }
  453. }
  454. }
  455. }
  456. if len(errs) > 0 {
  457. return Wrap(ErrReassignPartitions, errs...)
  458. }
  459. return nil
  460. })
  461. }
  462. func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
  463. if topic == "" {
  464. return nil, ErrInvalidTopic
  465. }
  466. request := &ListPartitionReassignmentsRequest{
  467. TimeoutMs: int32(60000),
  468. Version: int16(0),
  469. }
  470. request.AddBlock(topic, partitions)
  471. var rsp *ListPartitionReassignmentsResponse
  472. err = ca.retryOnError(isErrNotController, func() error {
  473. b, err := ca.Controller()
  474. if err != nil {
  475. return err
  476. }
  477. _ = b.Open(ca.client.Config())
  478. rsp, err = b.ListPartitionReassignments(request)
  479. if isErrNotController(err) {
  480. _, _ = ca.refreshController()
  481. }
  482. return err
  483. })
  484. if err == nil && rsp != nil {
  485. return rsp.TopicStatus, nil
  486. } else {
  487. return nil, err
  488. }
  489. }
  490. func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
  491. if topic == "" {
  492. return ErrInvalidTopic
  493. }
  494. errs := make([]error, 0)
  495. partitionPerBroker := make(map[*Broker][]int32)
  496. for partition := range partitionOffsets {
  497. broker, err := ca.client.Leader(topic, partition)
  498. if err != nil {
  499. errs = append(errs, err)
  500. continue
  501. }
  502. partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
  503. }
  504. for broker, partitions := range partitionPerBroker {
  505. topics := make(map[string]*DeleteRecordsRequestTopic)
  506. recordsToDelete := make(map[int32]int64)
  507. for _, p := range partitions {
  508. recordsToDelete[p] = partitionOffsets[p]
  509. }
  510. topics[topic] = &DeleteRecordsRequestTopic{
  511. PartitionOffsets: recordsToDelete,
  512. }
  513. request := &DeleteRecordsRequest{
  514. Topics: topics,
  515. Timeout: ca.conf.Admin.Timeout,
  516. }
  517. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  518. request.Version = 1
  519. }
  520. rsp, err := broker.DeleteRecords(request)
  521. if err != nil {
  522. errs = append(errs, err)
  523. continue
  524. }
  525. deleteRecordsResponseTopic, ok := rsp.Topics[topic]
  526. if !ok {
  527. errs = append(errs, ErrIncompleteResponse)
  528. continue
  529. }
  530. for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
  531. if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
  532. errs = append(errs, deleteRecordsResponsePartition.Err)
  533. continue
  534. }
  535. }
  536. }
  537. if len(errs) > 0 {
  538. return Wrap(ErrDeleteRecords, errs...)
  539. }
  540. // todo since we are dealing with couple of partitions it would be good if we return slice of errors
  541. // for each partition instead of one error
  542. return nil
  543. }
  544. // Returns a bool indicating whether the resource request needs to go to a
  545. // specific broker
  546. func dependsOnSpecificNode(resource ConfigResource) bool {
  547. return (resource.Type == BrokerResource && resource.Name != "") ||
  548. resource.Type == BrokerLoggerResource
  549. }
  550. func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
  551. var entries []ConfigEntry
  552. var resources []*ConfigResource
  553. resources = append(resources, &resource)
  554. request := &DescribeConfigsRequest{
  555. Resources: resources,
  556. }
  557. if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  558. request.Version = 1
  559. }
  560. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  561. request.Version = 2
  562. }
  563. var (
  564. b *Broker
  565. err error
  566. )
  567. // DescribeConfig of broker/broker logger must be sent to the broker in question
  568. if dependsOnSpecificNode(resource) {
  569. var id int64
  570. id, err = strconv.ParseInt(resource.Name, 10, 32)
  571. if err != nil {
  572. return nil, err
  573. }
  574. b, err = ca.findBroker(int32(id))
  575. } else {
  576. b, err = ca.findAnyBroker()
  577. }
  578. if err != nil {
  579. return nil, err
  580. }
  581. _ = b.Open(ca.client.Config())
  582. rsp, err := b.DescribeConfigs(request)
  583. if err != nil {
  584. return nil, err
  585. }
  586. for _, rspResource := range rsp.Resources {
  587. if rspResource.Name == resource.Name {
  588. if rspResource.ErrorCode != 0 {
  589. return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
  590. }
  591. for _, cfgEntry := range rspResource.Configs {
  592. entries = append(entries, *cfgEntry)
  593. }
  594. }
  595. }
  596. return entries, nil
  597. }
  598. func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
  599. var resources []*AlterConfigsResource
  600. resources = append(resources, &AlterConfigsResource{
  601. Type: resourceType,
  602. Name: name,
  603. ConfigEntries: entries,
  604. })
  605. request := &AlterConfigsRequest{
  606. Resources: resources,
  607. ValidateOnly: validateOnly,
  608. }
  609. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  610. request.Version = 1
  611. }
  612. var (
  613. b *Broker
  614. err error
  615. )
  616. // AlterConfig of broker/broker logger must be sent to the broker in question
  617. if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
  618. var id int64
  619. id, err = strconv.ParseInt(name, 10, 32)
  620. if err != nil {
  621. return err
  622. }
  623. b, err = ca.findBroker(int32(id))
  624. } else {
  625. b, err = ca.findAnyBroker()
  626. }
  627. if err != nil {
  628. return err
  629. }
  630. _ = b.Open(ca.client.Config())
  631. rsp, err := b.AlterConfigs(request)
  632. if err != nil {
  633. return err
  634. }
  635. for _, rspResource := range rsp.Resources {
  636. if rspResource.Name == name {
  637. if rspResource.ErrorCode != 0 {
  638. return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
  639. }
  640. }
  641. }
  642. return nil
  643. }
  644. func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
  645. var resources []*IncrementalAlterConfigsResource
  646. resources = append(resources, &IncrementalAlterConfigsResource{
  647. Type: resourceType,
  648. Name: name,
  649. ConfigEntries: entries,
  650. })
  651. request := &IncrementalAlterConfigsRequest{
  652. Resources: resources,
  653. ValidateOnly: validateOnly,
  654. }
  655. var (
  656. b *Broker
  657. err error
  658. )
  659. // AlterConfig of broker/broker logger must be sent to the broker in question
  660. if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
  661. var id int64
  662. id, err = strconv.ParseInt(name, 10, 32)
  663. if err != nil {
  664. return err
  665. }
  666. b, err = ca.findBroker(int32(id))
  667. } else {
  668. b, err = ca.findAnyBroker()
  669. }
  670. if err != nil {
  671. return err
  672. }
  673. _ = b.Open(ca.client.Config())
  674. rsp, err := b.IncrementalAlterConfigs(request)
  675. if err != nil {
  676. return err
  677. }
  678. for _, rspResource := range rsp.Resources {
  679. if rspResource.Name == name {
  680. if rspResource.ErrorMsg != "" {
  681. return errors.New(rspResource.ErrorMsg)
  682. }
  683. if rspResource.ErrorCode != 0 {
  684. return KError(rspResource.ErrorCode)
  685. }
  686. }
  687. }
  688. return nil
  689. }
  690. func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
  691. var acls []*AclCreation
  692. acls = append(acls, &AclCreation{resource, acl})
  693. request := &CreateAclsRequest{AclCreations: acls}
  694. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  695. request.Version = 1
  696. }
  697. b, err := ca.Controller()
  698. if err != nil {
  699. return err
  700. }
  701. _, err = b.CreateAcls(request)
  702. return err
  703. }
  704. func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
  705. var acls []*AclCreation
  706. for _, resourceACL := range resourceACLs {
  707. for _, acl := range resourceACL.Acls {
  708. acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
  709. }
  710. }
  711. request := &CreateAclsRequest{AclCreations: acls}
  712. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  713. request.Version = 1
  714. }
  715. b, err := ca.Controller()
  716. if err != nil {
  717. return err
  718. }
  719. _, err = b.CreateAcls(request)
  720. return err
  721. }
  722. func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
  723. request := &DescribeAclsRequest{AclFilter: filter}
  724. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  725. request.Version = 1
  726. }
  727. b, err := ca.Controller()
  728. if err != nil {
  729. return nil, err
  730. }
  731. rsp, err := b.DescribeAcls(request)
  732. if err != nil {
  733. return nil, err
  734. }
  735. var lAcls []ResourceAcls
  736. for _, rAcl := range rsp.ResourceAcls {
  737. lAcls = append(lAcls, *rAcl)
  738. }
  739. return lAcls, nil
  740. }
  741. func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
  742. var filters []*AclFilter
  743. filters = append(filters, &filter)
  744. request := &DeleteAclsRequest{Filters: filters}
  745. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  746. request.Version = 1
  747. }
  748. b, err := ca.Controller()
  749. if err != nil {
  750. return nil, err
  751. }
  752. rsp, err := b.DeleteAcls(request)
  753. if err != nil {
  754. return nil, err
  755. }
  756. var mAcls []MatchingAcl
  757. for _, fr := range rsp.FilterResponses {
  758. for _, mACL := range fr.MatchingAcls {
  759. mAcls = append(mAcls, *mACL)
  760. }
  761. }
  762. return mAcls, nil
  763. }
  764. func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
  765. groupsPerBroker := make(map[*Broker][]string)
  766. for _, group := range groups {
  767. controller, err := ca.client.Coordinator(group)
  768. if err != nil {
  769. return nil, err
  770. }
  771. groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
  772. }
  773. for broker, brokerGroups := range groupsPerBroker {
  774. describeReq := &DescribeGroupsRequest{
  775. Groups: brokerGroups,
  776. }
  777. if ca.conf.Version.IsAtLeast(V2_4_0_0) {
  778. // Starting in version 4, the response will include group.instance.id info for members.
  779. describeReq.Version = 4
  780. } else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
  781. // Starting in version 3, authorized operations can be requested.
  782. describeReq.Version = 3
  783. } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  784. // Version 2 is the same as version 0.
  785. describeReq.Version = 2
  786. } else if ca.conf.Version.IsAtLeast(V1_1_0_0) {
  787. // Version 1 is the same as version 0.
  788. describeReq.Version = 1
  789. }
  790. response, err := broker.DescribeGroups(describeReq)
  791. if err != nil {
  792. return nil, err
  793. }
  794. result = append(result, response.Groups...)
  795. }
  796. return result, nil
  797. }
  798. func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
  799. allGroups = make(map[string]string)
  800. // Query brokers in parallel, since we have to query *all* brokers
  801. brokers := ca.client.Brokers()
  802. groupMaps := make(chan map[string]string, len(brokers))
  803. errChan := make(chan error, len(brokers))
  804. wg := sync.WaitGroup{}
  805. for _, b := range brokers {
  806. wg.Add(1)
  807. go func(b *Broker, conf *Config) {
  808. defer wg.Done()
  809. _ = b.Open(conf) // Ensure that broker is opened
  810. request := &ListGroupsRequest{}
  811. if ca.conf.Version.IsAtLeast(V2_6_0_0) {
  812. // Version 4 adds the StatesFilter field (KIP-518).
  813. request.Version = 4
  814. } else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
  815. // Version 3 is the first flexible version.
  816. request.Version = 3
  817. } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  818. // Version 2 is the same as version 0.
  819. request.Version = 2
  820. } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
  821. // Version 1 is the same as version 0.
  822. request.Version = 1
  823. }
  824. response, err := b.ListGroups(request)
  825. if err != nil {
  826. errChan <- err
  827. return
  828. }
  829. groups := make(map[string]string)
  830. for group, typ := range response.Groups {
  831. groups[group] = typ
  832. }
  833. groupMaps <- groups
  834. }(b, ca.conf)
  835. }
  836. wg.Wait()
  837. close(groupMaps)
  838. close(errChan)
  839. for groupMap := range groupMaps {
  840. for group, protocolType := range groupMap {
  841. allGroups[group] = protocolType
  842. }
  843. }
  844. // Intentionally return only the first error for simplicity
  845. err = <-errChan
  846. return
  847. }
  848. func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
  849. coordinator, err := ca.client.Coordinator(group)
  850. if err != nil {
  851. return nil, err
  852. }
  853. request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)
  854. return coordinator.FetchOffset(request)
  855. }
  856. func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
  857. coordinator, err := ca.client.Coordinator(group)
  858. if err != nil {
  859. return err
  860. }
  861. request := &DeleteOffsetsRequest{
  862. Group: group,
  863. partitions: map[string][]int32{
  864. topic: {partition},
  865. },
  866. }
  867. resp, err := coordinator.DeleteOffsets(request)
  868. if err != nil {
  869. return err
  870. }
  871. if !errors.Is(resp.ErrorCode, ErrNoError) {
  872. return resp.ErrorCode
  873. }
  874. if !errors.Is(resp.Errors[topic][partition], ErrNoError) {
  875. return resp.Errors[topic][partition]
  876. }
  877. return nil
  878. }
  879. func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
  880. coordinator, err := ca.client.Coordinator(group)
  881. if err != nil {
  882. return err
  883. }
  884. request := &DeleteGroupsRequest{
  885. Groups: []string{group},
  886. }
  887. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  888. request.Version = 1
  889. }
  890. resp, err := coordinator.DeleteGroups(request)
  891. if err != nil {
  892. return err
  893. }
  894. groupErr, ok := resp.GroupErrorCodes[group]
  895. if !ok {
  896. return ErrIncompleteResponse
  897. }
  898. if !errors.Is(groupErr, ErrNoError) {
  899. return groupErr
  900. }
  901. return nil
  902. }
  903. func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
  904. allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
  905. // Query brokers in parallel, since we may have to query multiple brokers
  906. logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
  907. errChan := make(chan error, len(brokerIds))
  908. wg := sync.WaitGroup{}
  909. for _, b := range brokerIds {
  910. broker, err := ca.findBroker(b)
  911. if err != nil {
  912. Logger.Printf("Unable to find broker with ID = %v\n", b)
  913. continue
  914. }
  915. wg.Add(1)
  916. go func(b *Broker, conf *Config) {
  917. defer wg.Done()
  918. _ = b.Open(conf) // Ensure that broker is opened
  919. request := &DescribeLogDirsRequest{}
  920. if ca.conf.Version.IsAtLeast(V2_0_0_0) {
  921. request.Version = 1
  922. }
  923. response, err := b.DescribeLogDirs(request)
  924. if err != nil {
  925. errChan <- err
  926. return
  927. }
  928. logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
  929. logDirs[b.ID()] = response.LogDirs
  930. logDirsMaps <- logDirs
  931. }(broker, ca.conf)
  932. }
  933. wg.Wait()
  934. close(logDirsMaps)
  935. close(errChan)
  936. for logDirsMap := range logDirsMaps {
  937. for id, logDirs := range logDirsMap {
  938. allLogDirs[id] = logDirs
  939. }
  940. }
  941. // Intentionally return only the first error for simplicity
  942. err = <-errChan
  943. return
  944. }
  945. func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
  946. req := &DescribeUserScramCredentialsRequest{}
  947. for _, u := range users {
  948. req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
  949. Name: u,
  950. })
  951. }
  952. b, err := ca.Controller()
  953. if err != nil {
  954. return nil, err
  955. }
  956. rsp, err := b.DescribeUserScramCredentials(req)
  957. if err != nil {
  958. return nil, err
  959. }
  960. return rsp.Results, nil
  961. }
  962. func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
  963. res, err := ca.AlterUserScramCredentials(upsert, nil)
  964. if err != nil {
  965. return nil, err
  966. }
  967. return res, nil
  968. }
  969. func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
  970. res, err := ca.AlterUserScramCredentials(nil, delete)
  971. if err != nil {
  972. return nil, err
  973. }
  974. return res, nil
  975. }
  976. func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
  977. req := &AlterUserScramCredentialsRequest{
  978. Deletions: d,
  979. Upsertions: u,
  980. }
  981. var rsp *AlterUserScramCredentialsResponse
  982. err := ca.retryOnError(isErrNotController, func() error {
  983. b, err := ca.Controller()
  984. if err != nil {
  985. return err
  986. }
  987. rsp, err = b.AlterUserScramCredentials(req)
  988. return err
  989. })
  990. if err != nil {
  991. return nil, err
  992. }
  993. return rsp.Results, nil
  994. }
  995. // Describe All : use an empty/nil components slice + strict = false
  996. // Contains components: strict = false
  997. // Contains only components: strict = true
  998. func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
  999. request := &DescribeClientQuotasRequest{
  1000. Components: components,
  1001. Strict: strict,
  1002. }
  1003. b, err := ca.Controller()
  1004. if err != nil {
  1005. return nil, err
  1006. }
  1007. rsp, err := b.DescribeClientQuotas(request)
  1008. if err != nil {
  1009. return nil, err
  1010. }
  1011. if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
  1012. return nil, errors.New(*rsp.ErrorMsg)
  1013. }
  1014. if !errors.Is(rsp.ErrorCode, ErrNoError) {
  1015. return nil, rsp.ErrorCode
  1016. }
  1017. return rsp.Entries, nil
  1018. }
  1019. func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
  1020. entry := AlterClientQuotasEntry{
  1021. Entity: entity,
  1022. Ops: []ClientQuotasOp{op},
  1023. }
  1024. request := &AlterClientQuotasRequest{
  1025. Entries: []AlterClientQuotasEntry{entry},
  1026. ValidateOnly: validateOnly,
  1027. }
  1028. b, err := ca.Controller()
  1029. if err != nil {
  1030. return err
  1031. }
  1032. rsp, err := b.AlterClientQuotas(request)
  1033. if err != nil {
  1034. return err
  1035. }
  1036. for _, entry := range rsp.Entries {
  1037. if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
  1038. return errors.New(*entry.ErrorMsg)
  1039. }
  1040. if !errors.Is(entry.ErrorCode, ErrNoError) {
  1041. return entry.ErrorCode
  1042. }
  1043. }
  1044. return nil
  1045. }
  1046. func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
  1047. if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
  1048. return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
  1049. }
  1050. controller, err := ca.client.Coordinator(groupId)
  1051. if err != nil {
  1052. return nil, err
  1053. }
  1054. request := &LeaveGroupRequest{
  1055. Version: 3,
  1056. GroupId: groupId,
  1057. }
  1058. for _, instanceId := range groupInstanceIds {
  1059. groupInstanceId := instanceId
  1060. request.Members = append(request.Members, MemberIdentity{
  1061. GroupInstanceId: &groupInstanceId,
  1062. })
  1063. }
  1064. return controller.LeaveGroup(request)
  1065. }