123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- package sarama
- import (
- "fmt"
- "time"
- )
- type ConfigSource int8
- func (s ConfigSource) String() string {
- switch s {
- case SourceUnknown:
- return "Unknown"
- case SourceTopic:
- return "Topic"
- case SourceDynamicBroker:
- return "DynamicBroker"
- case SourceDynamicDefaultBroker:
- return "DynamicDefaultBroker"
- case SourceStaticBroker:
- return "StaticBroker"
- case SourceDefault:
- return "Default"
- }
- return fmt.Sprintf("Source Invalid: %d", int(s))
- }
- const (
- SourceUnknown ConfigSource = iota
- SourceTopic
- SourceDynamicBroker
- SourceDynamicDefaultBroker
- SourceStaticBroker
- SourceDefault
- )
- type DescribeConfigError struct {
- Err KError
- ErrMsg string
- }
- func (c *DescribeConfigError) Error() string {
- text := c.Err.Error()
- if c.ErrMsg != "" {
- text = fmt.Sprintf("%s - %s", text, c.ErrMsg)
- }
- return text
- }
- type DescribeConfigsResponse struct {
- Version int16
- ThrottleTime time.Duration
- Resources []*ResourceResponse
- }
- type ResourceResponse struct {
- ErrorCode int16
- ErrorMsg string
- Type ConfigResourceType
- Name string
- Configs []*ConfigEntry
- }
- type ConfigEntry struct {
- Name string
- Value string
- ReadOnly bool
- Default bool
- Source ConfigSource
- Sensitive bool
- Synonyms []*ConfigSynonym
- }
- type ConfigSynonym struct {
- ConfigName string
- ConfigValue string
- Source ConfigSource
- }
- func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
- pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
- if err = pe.putArrayLength(len(r.Resources)); err != nil {
- return err
- }
- for _, c := range r.Resources {
- if err = c.encode(pe, r.Version); err != nil {
- return err
- }
- }
- return nil
- }
- func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
- r.Version = version
- throttleTime, err := pd.getInt32()
- if err != nil {
- return err
- }
- r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- r.Resources = make([]*ResourceResponse, n)
- for i := 0; i < n; i++ {
- rr := &ResourceResponse{}
- if err := rr.decode(pd, version); err != nil {
- return err
- }
- r.Resources[i] = rr
- }
- return nil
- }
- func (r *DescribeConfigsResponse) key() int16 {
- return 32
- }
- func (r *DescribeConfigsResponse) version() int16 {
- return r.Version
- }
- func (r *DescribeConfigsResponse) headerVersion() int16 {
- return 0
- }
- func (r *DescribeConfigsResponse) isValidVersion() bool {
- return r.Version >= 0 && r.Version <= 2
- }
- func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
- switch r.Version {
- case 2:
- return V2_0_0_0
- case 1:
- return V1_1_0_0
- case 0:
- return V0_11_0_0
- default:
- return V2_0_0_0
- }
- }
- func (r *DescribeConfigsResponse) throttleTime() time.Duration {
- return r.ThrottleTime
- }
- func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
- pe.putInt16(r.ErrorCode)
- if err = pe.putString(r.ErrorMsg); err != nil {
- return err
- }
- pe.putInt8(int8(r.Type))
- if err = pe.putString(r.Name); err != nil {
- return err
- }
- if err = pe.putArrayLength(len(r.Configs)); err != nil {
- return err
- }
- for _, c := range r.Configs {
- if err = c.encode(pe, version); err != nil {
- return err
- }
- }
- return nil
- }
- func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
- ec, err := pd.getInt16()
- if err != nil {
- return err
- }
- r.ErrorCode = ec
- em, err := pd.getString()
- if err != nil {
- return err
- }
- r.ErrorMsg = em
- t, err := pd.getInt8()
- if err != nil {
- return err
- }
- r.Type = ConfigResourceType(t)
- name, err := pd.getString()
- if err != nil {
- return err
- }
- r.Name = name
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- r.Configs = make([]*ConfigEntry, n)
- for i := 0; i < n; i++ {
- c := &ConfigEntry{}
- if err := c.decode(pd, version); err != nil {
- return err
- }
- r.Configs[i] = c
- }
- return nil
- }
- func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
- if err = pe.putString(r.Name); err != nil {
- return err
- }
- if err = pe.putString(r.Value); err != nil {
- return err
- }
- pe.putBool(r.ReadOnly)
- if version <= 0 {
- pe.putBool(r.Default)
- pe.putBool(r.Sensitive)
- } else {
- pe.putInt8(int8(r.Source))
- pe.putBool(r.Sensitive)
- if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
- return err
- }
- for _, c := range r.Synonyms {
- if err = c.encode(pe, version); err != nil {
- return err
- }
- }
- }
- return nil
- }
- // https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
- func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
- if version == 0 {
- r.Source = SourceUnknown
- }
- name, err := pd.getString()
- if err != nil {
- return err
- }
- r.Name = name
- value, err := pd.getString()
- if err != nil {
- return err
- }
- r.Value = value
- read, err := pd.getBool()
- if err != nil {
- return err
- }
- r.ReadOnly = read
- if version == 0 {
- defaultB, err := pd.getBool()
- if err != nil {
- return err
- }
- r.Default = defaultB
- if defaultB {
- r.Source = SourceDefault
- }
- } else {
- source, err := pd.getInt8()
- if err != nil {
- return err
- }
- r.Source = ConfigSource(source)
- r.Default = r.Source == SourceDefault
- }
- sensitive, err := pd.getBool()
- if err != nil {
- return err
- }
- r.Sensitive = sensitive
- if version > 0 {
- n, err := pd.getArrayLength()
- if err != nil {
- return err
- }
- r.Synonyms = make([]*ConfigSynonym, n)
- for i := 0; i < n; i++ {
- s := &ConfigSynonym{}
- if err := s.decode(pd, version); err != nil {
- return err
- }
- r.Synonyms[i] = s
- }
- }
- return nil
- }
- func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
- err = pe.putString(c.ConfigName)
- if err != nil {
- return err
- }
- err = pe.putString(c.ConfigValue)
- if err != nil {
- return err
- }
- pe.putInt8(int8(c.Source))
- return nil
- }
- func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
- name, err := pd.getString()
- if err != nil {
- return err
- }
- c.ConfigName = name
- value, err := pd.getString()
- if err != nil {
- return err
- }
- c.ConfigValue = value
- source, err := pd.getInt8()
- if err != nil {
- return err
- }
- c.Source = ConfigSource(source)
- return nil
- }
|