package sarama import ( "time" ) type CreateTopicsRequest struct { // Version defines the protocol version to use for encode and decode Version int16 // TopicDetails contains the topics to create. TopicDetails map[string]*TopicDetail // Timeout contains how long to wait before timing out the request. Timeout time.Duration // ValidateOnly if true, check that the topics can be created as specified, // but don't create anything. ValidateOnly bool } func (c *CreateTopicsRequest) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(c.TopicDetails)); err != nil { return err } for topic, detail := range c.TopicDetails { if err := pe.putString(topic); err != nil { return err } if err := detail.encode(pe); err != nil { return err } } pe.putInt32(int32(c.Timeout / time.Millisecond)) if c.Version >= 1 { pe.putBool(c.ValidateOnly) } return nil } func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) { n, err := pd.getArrayLength() if err != nil { return err } c.TopicDetails = make(map[string]*TopicDetail, n) for i := 0; i < n; i++ { topic, err := pd.getString() if err != nil { return err } c.TopicDetails[topic] = new(TopicDetail) if err = c.TopicDetails[topic].decode(pd, version); err != nil { return err } } timeout, err := pd.getInt32() if err != nil { return err } c.Timeout = time.Duration(timeout) * time.Millisecond if version >= 1 { c.ValidateOnly, err = pd.getBool() if err != nil { return err } c.Version = version } return nil } func (c *CreateTopicsRequest) key() int16 { return 19 } func (c *CreateTopicsRequest) version() int16 { return c.Version } func (r *CreateTopicsRequest) headerVersion() int16 { return 1 } func (c *CreateTopicsRequest) isValidVersion() bool { return c.Version >= 0 && c.Version <= 3 } func (c *CreateTopicsRequest) requiredVersion() KafkaVersion { switch c.Version { case 3: return V2_0_0_0 case 2: return V0_11_0_0 case 1: return V0_10_2_0 case 0: return V0_10_1_0 default: return V2_8_0_0 } } type TopicDetail struct { // NumPartitions contains the number of partitions to create in the topic, or // -1 if we are either specifying a manual partition assignment or using the // default partitions. NumPartitions int32 // ReplicationFactor contains the number of replicas to create for each // partition in the topic, or -1 if we are either specifying a manual // partition assignment or using the default replication factor. ReplicationFactor int16 // ReplicaAssignment contains the manual partition assignment, or the empty // array if we are using automatic assignment. ReplicaAssignment map[int32][]int32 // ConfigEntries contains the custom topic configurations to set. ConfigEntries map[string]*string } func (t *TopicDetail) encode(pe packetEncoder) error { pe.putInt32(t.NumPartitions) pe.putInt16(t.ReplicationFactor) if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil { return err } for partition, assignment := range t.ReplicaAssignment { pe.putInt32(partition) if err := pe.putInt32Array(assignment); err != nil { return err } } if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil { return err } for configKey, configValue := range t.ConfigEntries { if err := pe.putString(configKey); err != nil { return err } if err := pe.putNullableString(configValue); err != nil { return err } } return nil } func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) { if t.NumPartitions, err = pd.getInt32(); err != nil { return err } if t.ReplicationFactor, err = pd.getInt16(); err != nil { return err } n, err := pd.getArrayLength() if err != nil { return err } if n > 0 { t.ReplicaAssignment = make(map[int32][]int32, n) for i := 0; i < n; i++ { replica, err := pd.getInt32() if err != nil { return err } if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil { return err } } } n, err = pd.getArrayLength() if err != nil { return err } if n > 0 { t.ConfigEntries = make(map[string]*string, n) for i := 0; i < n; i++ { configKey, err := pd.getString() if err != nil { return err } if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil { return err } } } return nil }