12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271 |
- package sarama
- import (
- "errors"
- "fmt"
- "math/rand"
- "strconv"
- "sync"
- "time"
- )
- // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
- // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
- // Methods with stricter requirements will specify the minimum broker version required.
- // You MUST call Close() on a client to avoid leaks
- type ClusterAdmin interface {
- // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
- // It may take several seconds after CreateTopic returns success for all the brokers
- // to become aware that the topic has been created. During this time, listTopics
- // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
- CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
- // List the topics available in the cluster with the default options.
- ListTopics() (map[string]TopicDetail, error)
- // Describe some topics in the cluster.
- DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
- // Delete a topic. It may take several seconds after the DeleteTopic to returns success
- // and for all the brokers to become aware that the topics are gone.
- // During this time, listTopics may continue to return information about the deleted topic.
- // If delete.topic.enable is false on the brokers, deleteTopic will mark
- // the topic for deletion, but not actually delete them.
- // This operation is supported by brokers with version 0.10.1.0 or higher.
- DeleteTopic(topic string) error
- // Increase the number of partitions of the topics according to the corresponding values.
- // If partitions are increased for a topic that has a key, the partition logic or ordering of
- // the messages will be affected. It may take several seconds after this method returns
- // success for all the brokers to become aware that the partitions have been created.
- // During this time, ClusterAdmin#describeTopics may not return information about the
- // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
- CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
- // Alter the replica assignment for partitions.
- // This operation is supported by brokers with version 2.4.0.0 or higher.
- AlterPartitionReassignments(topic string, assignment [][]int32) error
- // Provides info on ongoing partitions replica reassignments.
- // This operation is supported by brokers with version 2.4.0.0 or higher.
- ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
- // Delete records whose offset is smaller than the given offset of the corresponding partition.
- // This operation is supported by brokers with version 0.11.0.0 or higher.
- DeleteRecords(topic string, partitionOffsets map[int32]int64) error
- // Get the configuration for the specified resources.
- // The returned configuration includes default values and the Default is true
- // can be used to distinguish them from user supplied values.
- // Config entries where ReadOnly is true cannot be updated.
- // The value of config entries where Sensitive is true is always nil so
- // sensitive information is not disclosed.
- // This operation is supported by brokers with version 0.11.0.0 or higher.
- DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
- // Update the configuration for the specified resources with the default options.
- // This operation is supported by brokers with version 0.11.0.0 or higher.
- // The resources with their configs (topic is the only resource type with configs
- // that can be updated currently Updates are not transactional so they may succeed
- // for some resources while fail for others. The configs for a particular resource are updated automatically.
- AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
- // IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
- // This operation is supported by brokers with version 2.3.0.0 or higher.
- // Updates are not transactional so they may succeed for some resources while fail for others.
- // The configs for a particular resource are updated automatically.
- IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
- // Creates an access control list (ACL) which is bound to a specific resource.
- // This operation is not transactional so it may succeed or fail.
- // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
- // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
- // Deprecated: Use CreateACLs instead.
- CreateACL(resource Resource, acl Acl) error
- // Creates access control lists (ACLs) which are bound to specific resources.
- // This operation is not transactional so it may succeed for some ACLs while fail for others.
- // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
- // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
- CreateACLs([]*ResourceAcls) error
- // Lists access control lists (ACLs) according to the supplied filter.
- // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
- // This operation is supported by brokers with version 0.11.0.0 or higher.
- ListAcls(filter AclFilter) ([]ResourceAcls, error)
- // Deletes access control lists (ACLs) according to the supplied filters.
- // This operation is not transactional so it may succeed for some ACLs while fail for others.
- // This operation is supported by brokers with version 0.11.0.0 or higher.
- DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
- // List the consumer groups available in the cluster.
- ListConsumerGroups() (map[string]string, error)
- // Describe the given consumer groups.
- DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
- // List the consumer group offsets available in the cluster.
- ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
- // Deletes a consumer group offset
- DeleteConsumerGroupOffset(group string, topic string, partition int32) error
- // Delete a consumer group.
- DeleteConsumerGroup(group string) error
- // Get information about the nodes in the cluster
- DescribeCluster() (brokers []*Broker, controllerID int32, err error)
- // Get information about all log directories on the given set of brokers
- DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
- // Get information about SCRAM users
- DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
- // Delete SCRAM users
- DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
- // Upsert SCRAM users
- UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
- // Get client quota configurations corresponding to the specified filter.
- // This operation is supported by brokers with version 2.6.0.0 or higher.
- DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
- // Alters client quota configurations with the specified alterations.
- // This operation is supported by brokers with version 2.6.0.0 or higher.
- AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
- // Controller returns the cluster controller broker. It will return a
- // locally cached value if it's available.
- Controller() (*Broker, error)
- // Remove members from the consumer group by given member identities.
- // This operation is supported by brokers with version 2.3 or higher
- // This is for static membership feature. KIP-345
- RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)
- // Close shuts down the admin and closes underlying client.
- Close() error
- }
- type clusterAdmin struct {
- client Client
- conf *Config
- }
- // NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
- func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
- client, err := NewClient(addrs, conf)
- if err != nil {
- return nil, err
- }
- admin, err := NewClusterAdminFromClient(client)
- if err != nil {
- client.Close()
- }
- return admin, err
- }
- // NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
- // Note that underlying client will also be closed on admin's Close() call.
- func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
- // make sure we can retrieve the controller
- _, err := client.Controller()
- if err != nil {
- return nil, err
- }
- ca := &clusterAdmin{
- client: client,
- conf: client.Config(),
- }
- return ca, nil
- }
- func (ca *clusterAdmin) Close() error {
- return ca.client.Close()
- }
- func (ca *clusterAdmin) Controller() (*Broker, error) {
- return ca.client.Controller()
- }
- func (ca *clusterAdmin) refreshController() (*Broker, error) {
- return ca.client.RefreshController()
- }
- // isErrNotController returns `true` if the given error type unwraps to an
- // `ErrNotController` response from Kafka
- func isErrNotController(err error) bool {
- return errors.Is(err, ErrNotController)
- }
- // retryOnError will repeatedly call the given (error-returning) func in the
- // case that its response is non-nil and retryable (as determined by the
- // provided retryable func) up to the maximum number of tries permitted by
- // the admin client configuration
- func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
- for attemptsRemaining := ca.conf.Admin.Retry.Max + 1; ; {
- err := fn()
- attemptsRemaining--
- if err == nil || attemptsRemaining <= 0 || !retryable(err) {
- return err
- }
- Logger.Printf(
- "admin/request retrying after %dms... (%d attempts remaining)\n",
- ca.conf.Admin.Retry.Backoff/time.Millisecond, attemptsRemaining)
- time.Sleep(ca.conf.Admin.Retry.Backoff)
- }
- }
- func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- if detail == nil {
- return errors.New("you must specify topic details")
- }
- topicDetails := make(map[string]*TopicDetail)
- topicDetails[topic] = detail
- request := &CreateTopicsRequest{
- TopicDetails: topicDetails,
- ValidateOnly: validateOnly,
- Timeout: ca.conf.Admin.Timeout,
- }
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- // Version 3 is the same as version 2 (brokers response before throttling)
- request.Version = 3
- } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
- // Version 2 is the same as version 1 (response has ThrottleTime)
- request.Version = 2
- } else if ca.conf.Version.IsAtLeast(V0_10_2_0) {
- // Version 1 adds validateOnly.
- request.Version = 1
- }
- return ca.retryOnError(isErrNotController, func() error {
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.CreateTopics(request)
- if err != nil {
- return err
- }
- topicErr, ok := rsp.TopicErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
- if !errors.Is(topicErr.Err, ErrNoError) {
- if errors.Is(topicErr.Err, ErrNotController) {
- _, _ = ca.refreshController()
- }
- return topicErr
- }
- return nil
- })
- }
- func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
- var response *MetadataResponse
- err = ca.retryOnError(isErrNotController, func() error {
- controller, err := ca.Controller()
- if err != nil {
- return err
- }
- request := NewMetadataRequest(ca.conf.Version, topics)
- response, err = controller.GetMetadata(request)
- if isErrNotController(err) {
- _, _ = ca.refreshController()
- }
- return err
- })
- if err != nil {
- return nil, err
- }
- return response.Topics, nil
- }
- func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
- var response *MetadataResponse
- err = ca.retryOnError(isErrNotController, func() error {
- controller, err := ca.Controller()
- if err != nil {
- return err
- }
- request := NewMetadataRequest(ca.conf.Version, nil)
- response, err = controller.GetMetadata(request)
- if isErrNotController(err) {
- _, _ = ca.refreshController()
- }
- return err
- })
- if err != nil {
- return nil, int32(0), err
- }
- return response.Brokers, response.ControllerID, nil
- }
- func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
- brokers := ca.client.Brokers()
- for _, b := range brokers {
- if b.ID() == id {
- return b, nil
- }
- }
- return nil, fmt.Errorf("could not find broker id %d", id)
- }
- func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
- brokers := ca.client.Brokers()
- if len(brokers) > 0 {
- index := rand.Intn(len(brokers))
- return brokers[index], nil
- }
- return nil, errors.New("no available broker")
- }
- func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
- // In order to build TopicDetails we need to first get the list of all
- // topics using a MetadataRequest and then get their configs using a
- // DescribeConfigsRequest request. To avoid sending many requests to the
- // broker, we use a single DescribeConfigsRequest.
- // Send the all-topic MetadataRequest
- b, err := ca.findAnyBroker()
- if err != nil {
- return nil, err
- }
- _ = b.Open(ca.client.Config())
- metadataReq := NewMetadataRequest(ca.conf.Version, nil)
- metadataResp, err := b.GetMetadata(metadataReq)
- if err != nil {
- return nil, err
- }
- topicsDetailsMap := make(map[string]TopicDetail)
- var describeConfigsResources []*ConfigResource
- for _, topic := range metadataResp.Topics {
- topicDetails := TopicDetail{
- NumPartitions: int32(len(topic.Partitions)),
- }
- if len(topic.Partitions) > 0 {
- topicDetails.ReplicaAssignment = map[int32][]int32{}
- for _, partition := range topic.Partitions {
- topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
- }
- topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
- }
- topicsDetailsMap[topic.Name] = topicDetails
- // we populate the resources we want to describe from the MetadataResponse
- topicResource := ConfigResource{
- Type: TopicResource,
- Name: topic.Name,
- }
- describeConfigsResources = append(describeConfigsResources, &topicResource)
- }
- // Send the DescribeConfigsRequest
- describeConfigsReq := &DescribeConfigsRequest{
- Resources: describeConfigsResources,
- }
- if ca.conf.Version.IsAtLeast(V1_1_0_0) {
- describeConfigsReq.Version = 1
- }
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- describeConfigsReq.Version = 2
- }
- describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
- if err != nil {
- return nil, err
- }
- for _, resource := range describeConfigsResp.Resources {
- topicDetails := topicsDetailsMap[resource.Name]
- topicDetails.ConfigEntries = make(map[string]*string)
- for _, entry := range resource.Configs {
- entry := entry
- // only include non-default non-sensitive config
- // (don't actually think topic config will ever be sensitive)
- if entry.Default || entry.Sensitive {
- continue
- }
- topicDetails.ConfigEntries[entry.Name] = &entry.Value
- }
- topicsDetailsMap[resource.Name] = topicDetails
- }
- return topicsDetailsMap, nil
- }
- func (ca *clusterAdmin) DeleteTopic(topic string) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- request := &DeleteTopicsRequest{
- Topics: []string{topic},
- Timeout: ca.conf.Admin.Timeout,
- }
- // Versions 0, 1, 2, and 3 are the same.
- if ca.conf.Version.IsAtLeast(V2_1_0_0) {
- request.Version = 3
- } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 2
- } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
- request.Version = 1
- }
- return ca.retryOnError(isErrNotController, func() error {
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.DeleteTopics(request)
- if err != nil {
- return err
- }
- topicErr, ok := rsp.TopicErrorCodes[topic]
- if !ok {
- return ErrIncompleteResponse
- }
- if !errors.Is(topicErr, ErrNoError) {
- if errors.Is(topicErr, ErrNotController) {
- _, _ = ca.refreshController()
- }
- return topicErr
- }
- return nil
- })
- }
- func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- topicPartitions := make(map[string]*TopicPartition)
- topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
- request := &CreatePartitionsRequest{
- TopicPartitions: topicPartitions,
- Timeout: ca.conf.Admin.Timeout,
- ValidateOnly: validateOnly,
- }
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- return ca.retryOnError(isErrNotController, func() error {
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.CreatePartitions(request)
- if err != nil {
- return err
- }
- topicErr, ok := rsp.TopicPartitionErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
- if !errors.Is(topicErr.Err, ErrNoError) {
- if errors.Is(topicErr.Err, ErrNotController) {
- _, _ = ca.refreshController()
- }
- return topicErr
- }
- return nil
- })
- }
- func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- request := &AlterPartitionReassignmentsRequest{
- TimeoutMs: int32(60000),
- Version: int16(0),
- }
- for i := 0; i < len(assignment); i++ {
- request.AddBlock(topic, int32(i), assignment[i])
- }
- return ca.retryOnError(isErrNotController, func() error {
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- errs := make([]error, 0)
- rsp, err := b.AlterPartitionReassignments(request)
- if err != nil {
- errs = append(errs, err)
- } else {
- if rsp.ErrorCode > 0 {
- errs = append(errs, rsp.ErrorCode)
- }
- for topic, topicErrors := range rsp.Errors {
- for partition, partitionError := range topicErrors {
- if !errors.Is(partitionError.errorCode, ErrNoError) {
- errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
- }
- }
- }
- }
- if len(errs) > 0 {
- return Wrap(ErrReassignPartitions, errs...)
- }
- return nil
- })
- }
- func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
- if topic == "" {
- return nil, ErrInvalidTopic
- }
- request := &ListPartitionReassignmentsRequest{
- TimeoutMs: int32(60000),
- Version: int16(0),
- }
- request.AddBlock(topic, partitions)
- var rsp *ListPartitionReassignmentsResponse
- err = ca.retryOnError(isErrNotController, func() error {
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- _ = b.Open(ca.client.Config())
- rsp, err = b.ListPartitionReassignments(request)
- if isErrNotController(err) {
- _, _ = ca.refreshController()
- }
- return err
- })
- if err == nil && rsp != nil {
- return rsp.TopicStatus, nil
- } else {
- return nil, err
- }
- }
- func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
- if topic == "" {
- return ErrInvalidTopic
- }
- errs := make([]error, 0)
- partitionPerBroker := make(map[*Broker][]int32)
- for partition := range partitionOffsets {
- broker, err := ca.client.Leader(topic, partition)
- if err != nil {
- errs = append(errs, err)
- continue
- }
- partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
- }
- for broker, partitions := range partitionPerBroker {
- topics := make(map[string]*DeleteRecordsRequestTopic)
- recordsToDelete := make(map[int32]int64)
- for _, p := range partitions {
- recordsToDelete[p] = partitionOffsets[p]
- }
- topics[topic] = &DeleteRecordsRequestTopic{
- PartitionOffsets: recordsToDelete,
- }
- request := &DeleteRecordsRequest{
- Topics: topics,
- Timeout: ca.conf.Admin.Timeout,
- }
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- rsp, err := broker.DeleteRecords(request)
- if err != nil {
- errs = append(errs, err)
- continue
- }
- deleteRecordsResponseTopic, ok := rsp.Topics[topic]
- if !ok {
- errs = append(errs, ErrIncompleteResponse)
- continue
- }
- for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
- if !errors.Is(deleteRecordsResponsePartition.Err, ErrNoError) {
- errs = append(errs, deleteRecordsResponsePartition.Err)
- continue
- }
- }
- }
- if len(errs) > 0 {
- return Wrap(ErrDeleteRecords, errs...)
- }
- // todo since we are dealing with couple of partitions it would be good if we return slice of errors
- // for each partition instead of one error
- return nil
- }
- // Returns a bool indicating whether the resource request needs to go to a
- // specific broker
- func dependsOnSpecificNode(resource ConfigResource) bool {
- return (resource.Type == BrokerResource && resource.Name != "") ||
- resource.Type == BrokerLoggerResource
- }
- func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
- var entries []ConfigEntry
- var resources []*ConfigResource
- resources = append(resources, &resource)
- request := &DescribeConfigsRequest{
- Resources: resources,
- }
- if ca.conf.Version.IsAtLeast(V1_1_0_0) {
- request.Version = 1
- }
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 2
- }
- var (
- b *Broker
- err error
- )
- // DescribeConfig of broker/broker logger must be sent to the broker in question
- if dependsOnSpecificNode(resource) {
- var id int64
- id, err = strconv.ParseInt(resource.Name, 10, 32)
- if err != nil {
- return nil, err
- }
- b, err = ca.findBroker(int32(id))
- } else {
- b, err = ca.findAnyBroker()
- }
- if err != nil {
- return nil, err
- }
- _ = b.Open(ca.client.Config())
- rsp, err := b.DescribeConfigs(request)
- if err != nil {
- return nil, err
- }
- for _, rspResource := range rsp.Resources {
- if rspResource.Name == resource.Name {
- if rspResource.ErrorCode != 0 {
- return nil, &DescribeConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
- }
- for _, cfgEntry := range rspResource.Configs {
- entries = append(entries, *cfgEntry)
- }
- }
- }
- return entries, nil
- }
- func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
- var resources []*AlterConfigsResource
- resources = append(resources, &AlterConfigsResource{
- Type: resourceType,
- Name: name,
- ConfigEntries: entries,
- })
- request := &AlterConfigsRequest{
- Resources: resources,
- ValidateOnly: validateOnly,
- }
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- var (
- b *Broker
- err error
- )
- // AlterConfig of broker/broker logger must be sent to the broker in question
- if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
- var id int64
- id, err = strconv.ParseInt(name, 10, 32)
- if err != nil {
- return err
- }
- b, err = ca.findBroker(int32(id))
- } else {
- b, err = ca.findAnyBroker()
- }
- if err != nil {
- return err
- }
- _ = b.Open(ca.client.Config())
- rsp, err := b.AlterConfigs(request)
- if err != nil {
- return err
- }
- for _, rspResource := range rsp.Resources {
- if rspResource.Name == name {
- if rspResource.ErrorCode != 0 {
- return &AlterConfigError{Err: KError(rspResource.ErrorCode), ErrMsg: rspResource.ErrorMsg}
- }
- }
- }
- return nil
- }
- func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
- var resources []*IncrementalAlterConfigsResource
- resources = append(resources, &IncrementalAlterConfigsResource{
- Type: resourceType,
- Name: name,
- ConfigEntries: entries,
- })
- request := &IncrementalAlterConfigsRequest{
- Resources: resources,
- ValidateOnly: validateOnly,
- }
- var (
- b *Broker
- err error
- )
- // AlterConfig of broker/broker logger must be sent to the broker in question
- if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
- var id int64
- id, err = strconv.ParseInt(name, 10, 32)
- if err != nil {
- return err
- }
- b, err = ca.findBroker(int32(id))
- } else {
- b, err = ca.findAnyBroker()
- }
- if err != nil {
- return err
- }
- _ = b.Open(ca.client.Config())
- rsp, err := b.IncrementalAlterConfigs(request)
- if err != nil {
- return err
- }
- for _, rspResource := range rsp.Resources {
- if rspResource.Name == name {
- if rspResource.ErrorMsg != "" {
- return errors.New(rspResource.ErrorMsg)
- }
- if rspResource.ErrorCode != 0 {
- return KError(rspResource.ErrorCode)
- }
- }
- }
- return nil
- }
- func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
- var acls []*AclCreation
- acls = append(acls, &AclCreation{resource, acl})
- request := &CreateAclsRequest{AclCreations: acls}
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- _, err = b.CreateAcls(request)
- return err
- }
- func (ca *clusterAdmin) CreateACLs(resourceACLs []*ResourceAcls) error {
- var acls []*AclCreation
- for _, resourceACL := range resourceACLs {
- for _, acl := range resourceACL.Acls {
- acls = append(acls, &AclCreation{resourceACL.Resource, *acl})
- }
- }
- request := &CreateAclsRequest{AclCreations: acls}
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- _, err = b.CreateAcls(request)
- return err
- }
- func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
- request := &DescribeAclsRequest{AclFilter: filter}
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- b, err := ca.Controller()
- if err != nil {
- return nil, err
- }
- rsp, err := b.DescribeAcls(request)
- if err != nil {
- return nil, err
- }
- var lAcls []ResourceAcls
- for _, rAcl := range rsp.ResourceAcls {
- lAcls = append(lAcls, *rAcl)
- }
- return lAcls, nil
- }
- func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
- var filters []*AclFilter
- filters = append(filters, &filter)
- request := &DeleteAclsRequest{Filters: filters}
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- b, err := ca.Controller()
- if err != nil {
- return nil, err
- }
- rsp, err := b.DeleteAcls(request)
- if err != nil {
- return nil, err
- }
- var mAcls []MatchingAcl
- for _, fr := range rsp.FilterResponses {
- for _, mACL := range fr.MatchingAcls {
- mAcls = append(mAcls, *mACL)
- }
- }
- return mAcls, nil
- }
- func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
- groupsPerBroker := make(map[*Broker][]string)
- for _, group := range groups {
- controller, err := ca.client.Coordinator(group)
- if err != nil {
- return nil, err
- }
- groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
- }
- for broker, brokerGroups := range groupsPerBroker {
- describeReq := &DescribeGroupsRequest{
- Groups: brokerGroups,
- }
- if ca.conf.Version.IsAtLeast(V2_4_0_0) {
- // Starting in version 4, the response will include group.instance.id info for members.
- describeReq.Version = 4
- } else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
- // Starting in version 3, authorized operations can be requested.
- describeReq.Version = 3
- } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- // Version 2 is the same as version 0.
- describeReq.Version = 2
- } else if ca.conf.Version.IsAtLeast(V1_1_0_0) {
- // Version 1 is the same as version 0.
- describeReq.Version = 1
- }
- response, err := broker.DescribeGroups(describeReq)
- if err != nil {
- return nil, err
- }
- result = append(result, response.Groups...)
- }
- return result, nil
- }
- func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
- allGroups = make(map[string]string)
- // Query brokers in parallel, since we have to query *all* brokers
- brokers := ca.client.Brokers()
- groupMaps := make(chan map[string]string, len(brokers))
- errChan := make(chan error, len(brokers))
- wg := sync.WaitGroup{}
- for _, b := range brokers {
- wg.Add(1)
- go func(b *Broker, conf *Config) {
- defer wg.Done()
- _ = b.Open(conf) // Ensure that broker is opened
- request := &ListGroupsRequest{}
- if ca.conf.Version.IsAtLeast(V2_6_0_0) {
- // Version 4 adds the StatesFilter field (KIP-518).
- request.Version = 4
- } else if ca.conf.Version.IsAtLeast(V2_4_0_0) {
- // Version 3 is the first flexible version.
- request.Version = 3
- } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- // Version 2 is the same as version 0.
- request.Version = 2
- } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
- // Version 1 is the same as version 0.
- request.Version = 1
- }
- response, err := b.ListGroups(request)
- if err != nil {
- errChan <- err
- return
- }
- groups := make(map[string]string)
- for group, typ := range response.Groups {
- groups[group] = typ
- }
- groupMaps <- groups
- }(b, ca.conf)
- }
- wg.Wait()
- close(groupMaps)
- close(errChan)
- for groupMap := range groupMaps {
- for group, protocolType := range groupMap {
- allGroups[group] = protocolType
- }
- }
- // Intentionally return only the first error for simplicity
- err = <-errChan
- return
- }
- func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
- coordinator, err := ca.client.Coordinator(group)
- if err != nil {
- return nil, err
- }
- request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions)
- return coordinator.FetchOffset(request)
- }
- func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
- coordinator, err := ca.client.Coordinator(group)
- if err != nil {
- return err
- }
- request := &DeleteOffsetsRequest{
- Group: group,
- partitions: map[string][]int32{
- topic: {partition},
- },
- }
- resp, err := coordinator.DeleteOffsets(request)
- if err != nil {
- return err
- }
- if !errors.Is(resp.ErrorCode, ErrNoError) {
- return resp.ErrorCode
- }
- if !errors.Is(resp.Errors[topic][partition], ErrNoError) {
- return resp.Errors[topic][partition]
- }
- return nil
- }
- func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
- coordinator, err := ca.client.Coordinator(group)
- if err != nil {
- return err
- }
- request := &DeleteGroupsRequest{
- Groups: []string{group},
- }
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- resp, err := coordinator.DeleteGroups(request)
- if err != nil {
- return err
- }
- groupErr, ok := resp.GroupErrorCodes[group]
- if !ok {
- return ErrIncompleteResponse
- }
- if !errors.Is(groupErr, ErrNoError) {
- return groupErr
- }
- return nil
- }
- func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
- allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
- // Query brokers in parallel, since we may have to query multiple brokers
- logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
- errChan := make(chan error, len(brokerIds))
- wg := sync.WaitGroup{}
- for _, b := range brokerIds {
- broker, err := ca.findBroker(b)
- if err != nil {
- Logger.Printf("Unable to find broker with ID = %v\n", b)
- continue
- }
- wg.Add(1)
- go func(b *Broker, conf *Config) {
- defer wg.Done()
- _ = b.Open(conf) // Ensure that broker is opened
- request := &DescribeLogDirsRequest{}
- if ca.conf.Version.IsAtLeast(V2_0_0_0) {
- request.Version = 1
- }
- response, err := b.DescribeLogDirs(request)
- if err != nil {
- errChan <- err
- return
- }
- logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
- logDirs[b.ID()] = response.LogDirs
- logDirsMaps <- logDirs
- }(broker, ca.conf)
- }
- wg.Wait()
- close(logDirsMaps)
- close(errChan)
- for logDirsMap := range logDirsMaps {
- for id, logDirs := range logDirsMap {
- allLogDirs[id] = logDirs
- }
- }
- // Intentionally return only the first error for simplicity
- err = <-errChan
- return
- }
- func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
- req := &DescribeUserScramCredentialsRequest{}
- for _, u := range users {
- req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
- Name: u,
- })
- }
- b, err := ca.Controller()
- if err != nil {
- return nil, err
- }
- rsp, err := b.DescribeUserScramCredentials(req)
- if err != nil {
- return nil, err
- }
- return rsp.Results, nil
- }
- func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
- res, err := ca.AlterUserScramCredentials(upsert, nil)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
- res, err := ca.AlterUserScramCredentials(nil, delete)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
- req := &AlterUserScramCredentialsRequest{
- Deletions: d,
- Upsertions: u,
- }
- var rsp *AlterUserScramCredentialsResponse
- err := ca.retryOnError(isErrNotController, func() error {
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err = b.AlterUserScramCredentials(req)
- return err
- })
- if err != nil {
- return nil, err
- }
- return rsp.Results, nil
- }
- // Describe All : use an empty/nil components slice + strict = false
- // Contains components: strict = false
- // Contains only components: strict = true
- func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
- request := &DescribeClientQuotasRequest{
- Components: components,
- Strict: strict,
- }
- b, err := ca.Controller()
- if err != nil {
- return nil, err
- }
- rsp, err := b.DescribeClientQuotas(request)
- if err != nil {
- return nil, err
- }
- if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
- return nil, errors.New(*rsp.ErrorMsg)
- }
- if !errors.Is(rsp.ErrorCode, ErrNoError) {
- return nil, rsp.ErrorCode
- }
- return rsp.Entries, nil
- }
- func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
- entry := AlterClientQuotasEntry{
- Entity: entity,
- Ops: []ClientQuotasOp{op},
- }
- request := &AlterClientQuotasRequest{
- Entries: []AlterClientQuotasEntry{entry},
- ValidateOnly: validateOnly,
- }
- b, err := ca.Controller()
- if err != nil {
- return err
- }
- rsp, err := b.AlterClientQuotas(request)
- if err != nil {
- return err
- }
- for _, entry := range rsp.Entries {
- if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
- return errors.New(*entry.ErrorMsg)
- }
- if !errors.Is(entry.ErrorCode, ErrNoError) {
- return entry.ErrorCode
- }
- }
- return nil
- }
- func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
- if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
- return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
- }
- controller, err := ca.client.Coordinator(groupId)
- if err != nil {
- return nil, err
- }
- request := &LeaveGroupRequest{
- Version: 3,
- GroupId: groupId,
- }
- for _, instanceId := range groupInstanceIds {
- groupInstanceId := instanceId
- request.Members = append(request.Members, MemberIdentity{
- GroupInstanceId: &groupInstanceId,
- })
- }
- return controller.LeaveGroup(request)
- }
|