broker.go 51 KB


  1. package sarama
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "math/rand"
  9. "net"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/rcrowley/go-metrics"
  17. )
  18. // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
  19. type Broker struct {
  20. conf *Config
  21. rack *string
  22. id int32
  23. addr string
  24. correlationID int32
  25. conn net.Conn
  26. connErr error
  27. lock sync.Mutex
  28. opened int32
  29. responses chan *responsePromise
  30. done chan bool
  31. metricRegistry metrics.Registry
  32. incomingByteRate metrics.Meter
  33. requestRate metrics.Meter
  34. fetchRate metrics.Meter
  35. requestSize metrics.Histogram
  36. requestLatency metrics.Histogram
  37. outgoingByteRate metrics.Meter
  38. responseRate metrics.Meter
  39. responseSize metrics.Histogram
  40. requestsInFlight metrics.Counter
  41. protocolRequestsRate map[int16]metrics.Meter
  42. brokerIncomingByteRate metrics.Meter
  43. brokerRequestRate metrics.Meter
  44. brokerFetchRate metrics.Meter
  45. brokerRequestSize metrics.Histogram
  46. brokerRequestLatency metrics.Histogram
  47. brokerOutgoingByteRate metrics.Meter
  48. brokerResponseRate metrics.Meter
  49. brokerResponseSize metrics.Histogram
  50. brokerRequestsInFlight metrics.Counter
  51. brokerThrottleTime metrics.Histogram
  52. brokerProtocolRequestsRate map[int16]metrics.Meter
  53. kerberosAuthenticator GSSAPIKerberosAuth
  54. clientSessionReauthenticationTimeMs int64
  55. throttleTimer *time.Timer
  56. throttleTimerLock sync.Mutex
  57. }
  58. // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
  59. type SASLMechanism string
  60. const (
  61. // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
  62. SASLTypeOAuth = "OAUTHBEARER"
  63. // SASLTypePlaintext represents the SASL/PLAIN mechanism
  64. SASLTypePlaintext = "PLAIN"
  65. // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
  66. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
  67. // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
  68. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
  69. SASLTypeGSSAPI = "GSSAPI"
  70. // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
  71. // server negotiate SASL auth using opaque packets.
  72. SASLHandshakeV0 = int16(0)
  73. // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
  74. // server negotiate SASL by wrapping tokens with Kafka protocol headers.
  75. SASLHandshakeV1 = int16(1)
  76. // SASLExtKeyAuth is the reserved extension key name sent as part of the
  77. // SASL/OAUTHBEARER initial client response
  78. SASLExtKeyAuth = "auth"
  79. )
  80. // AccessToken contains an access token used to authenticate a
  81. // SASL/OAUTHBEARER client along with associated metadata.
  82. type AccessToken struct {
  83. // Token is the access token payload.
  84. Token string
  85. // Extensions is a optional map of arbitrary key-value pairs that can be
  86. // sent with the SASL/OAUTHBEARER initial client response. These values are
  87. // ignored by the SASL server if they are unexpected. This feature is only
  88. // supported by Kafka >= 2.1.0.
  89. Extensions map[string]string
  90. }
  91. // AccessTokenProvider is the interface that encapsulates how implementors
  92. // can generate access tokens for Kafka broker authentication.
  93. type AccessTokenProvider interface {
  94. // Token returns an access token. The implementation should ensure token
  95. // reuse so that multiple calls at connect time do not create multiple
  96. // tokens. The implementation should also periodically refresh the token in
  97. // order to guarantee that each call returns an unexpired token. This
  98. // method should not block indefinitely--a timeout error should be returned
  99. // after a short period of inactivity so that the broker connection logic
  100. // can log debugging information and retry.
  101. Token() (*AccessToken, error)
  102. }
  103. // SCRAMClient is a an interface to a SCRAM
  104. // client implementation.
  105. type SCRAMClient interface {
  106. // Begin prepares the client for the SCRAM exchange
  107. // with the server with a user name and a password
  108. Begin(userName, password, authzID string) error
  109. // Step steps client through the SCRAM exchange. It is
  110. // called repeatedly until it errors or `Done` returns true.
  111. Step(challenge string) (response string, err error)
  112. // Done should return true when the SCRAM conversation
  113. // is over.
  114. Done() bool
  115. }
  116. type responsePromise struct {
  117. requestTime time.Time
  118. correlationID int32
  119. headerVersion int16
  120. handler func([]byte, error)
  121. packets chan []byte
  122. errors chan error
  123. }
  124. func (p *responsePromise) handle(packets []byte, err error) {
  125. // Use callback when provided
  126. if p.handler != nil {
  127. p.handler(packets, err)
  128. return
  129. }
  130. // Otherwise fallback to using channels
  131. if err != nil {
  132. p.errors <- err
  133. return
  134. }
  135. p.packets <- packets
  136. }
  137. // NewBroker creates and returns a Broker targeting the given host:port address.
  138. // This does not attempt to actually connect, you have to call Open() for that.
  139. func NewBroker(addr string) *Broker {
  140. return &Broker{id: -1, addr: addr}
  141. }
  142. // Open tries to connect to the Broker if it is not already connected or connecting, but does not block
  143. // waiting for the connection to complete. This means that any subsequent operations on the broker will
  144. // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
  145. // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
  146. // AlreadyConnected. If conf is nil, the result of NewConfig() is used.
  147. func (b *Broker) Open(conf *Config) error {
  148. if !atomic.CompareAndSwapInt32(&b.opened, 0, 1) {
  149. return ErrAlreadyConnected
  150. }
  151. if conf == nil {
  152. conf = NewConfig()
  153. }
  154. err := conf.Validate()
  155. if err != nil {
  156. return err
  157. }
  158. usingApiVersionsRequests := conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest
  159. b.lock.Lock()
  160. if b.metricRegistry == nil {
  161. b.metricRegistry = newCleanupRegistry(conf.MetricRegistry)
  162. }
  163. go withRecover(func() {
  164. defer func() {
  165. b.lock.Unlock()
  166. // Send an ApiVersionsRequest to identify the client (KIP-511).
  167. // Ideally Sarama would use the response to control protocol versions,
  168. // but for now just fire-and-forget just to send
  169. if usingApiVersionsRequests {
  170. _, err = b.ApiVersions(&ApiVersionsRequest{
  171. Version: 3,
  172. ClientSoftwareName: defaultClientSoftwareName,
  173. ClientSoftwareVersion: version(),
  174. })
  175. if err != nil {
  176. Logger.Printf("Error while sending ApiVersionsRequest to broker %s: %s\n", b.addr, err)
  177. }
  178. }
  179. }()
  180. dialer := conf.getDialer()
  181. b.conn, b.connErr = dialer.Dial("tcp", b.addr)
  182. if b.connErr != nil {
  183. Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
  184. b.conn = nil
  185. atomic.StoreInt32(&b.opened, 0)
  186. return
  187. }
  188. if conf.Net.TLS.Enable {
  189. b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
  190. }
  191. b.conn = newBufConn(b.conn)
  192. b.conf = conf
  193. // Create or reuse the global metrics shared between brokers
  194. b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", b.metricRegistry)
  195. b.requestRate = metrics.GetOrRegisterMeter("request-rate", b.metricRegistry)
  196. b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", b.metricRegistry)
  197. b.requestSize = getOrRegisterHistogram("request-size", b.metricRegistry)
  198. b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", b.metricRegistry)
  199. b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", b.metricRegistry)
  200. b.responseRate = metrics.GetOrRegisterMeter("response-rate", b.metricRegistry)
  201. b.responseSize = getOrRegisterHistogram("response-size", b.metricRegistry)
  202. b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", b.metricRegistry)
  203. b.protocolRequestsRate = map[int16]metrics.Meter{}
  204. // Do not gather metrics for seeded broker (only used during bootstrap) because they share
  205. // the same id (-1) and are already exposed through the global metrics above
  206. if b.id >= 0 && !metrics.UseNilMetrics {
  207. b.registerMetrics()
  208. }
  209. if conf.Net.SASL.Mechanism == SASLTypeOAuth && conf.Net.SASL.Version == SASLHandshakeV0 {
  210. conf.Net.SASL.Version = SASLHandshakeV1
  211. }
  212. useSaslV0 := conf.Net.SASL.Version == SASLHandshakeV0 || conf.Net.SASL.Mechanism == SASLTypeGSSAPI
  213. if conf.Net.SASL.Enable && useSaslV0 {
  214. b.connErr = b.authenticateViaSASLv0()
  215. if b.connErr != nil {
  216. err = b.conn.Close()
  217. if err == nil {
  218. DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
  219. } else {
  220. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  221. }
  222. b.conn = nil
  223. atomic.StoreInt32(&b.opened, 0)
  224. return
  225. }
  226. }
  227. b.done = make(chan bool)
  228. b.responses = make(chan *responsePromise, b.conf.Net.MaxOpenRequests-1)
  229. go withRecover(b.responseReceiver)
  230. if conf.Net.SASL.Enable && !useSaslV0 {
  231. b.connErr = b.authenticateViaSASLv1()
  232. if b.connErr != nil {
  233. close(b.responses)
  234. <-b.done
  235. err = b.conn.Close()
  236. if err == nil {
  237. DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
  238. } else {
  239. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  240. }
  241. b.conn = nil
  242. atomic.StoreInt32(&b.opened, 0)
  243. return
  244. }
  245. }
  246. if b.id >= 0 {
  247. DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
  248. } else {
  249. DebugLogger.Printf("Connected to broker at %s (unregistered)\n", b.addr)
  250. }
  251. })
  252. return nil
  253. }
  254. func (b *Broker) ResponseSize() int {
  255. b.lock.Lock()
  256. defer b.lock.Unlock()
  257. return len(b.responses)
  258. }
  259. // Connected returns true if the broker is connected and false otherwise. If the broker is not
  260. // connected but it had tried to connect, the error from that connection attempt is also returned.
  261. func (b *Broker) Connected() (bool, error) {
  262. b.lock.Lock()
  263. defer b.lock.Unlock()
  264. return b.conn != nil, b.connErr
  265. }
  266. // TLSConnectionState returns the client's TLS connection state. The second return value is false if this is not a tls connection or the connection has not yet been established.
  267. func (b *Broker) TLSConnectionState() (state tls.ConnectionState, ok bool) {
  268. b.lock.Lock()
  269. defer b.lock.Unlock()
  270. if b.conn == nil {
  271. return state, false
  272. }
  273. conn := b.conn
  274. if bconn, ok := b.conn.(*bufConn); ok {
  275. conn = bconn.Conn
  276. }
  277. if tc, ok := conn.(*tls.Conn); ok {
  278. return tc.ConnectionState(), true
  279. }
  280. return state, false
  281. }
  282. // Close closes the broker resources
  283. func (b *Broker) Close() error {
  284. b.lock.Lock()
  285. defer b.lock.Unlock()
  286. if b.conn == nil {
  287. return ErrNotConnected
  288. }
  289. close(b.responses)
  290. <-b.done
  291. err := b.conn.Close()
  292. b.conn = nil
  293. b.connErr = nil
  294. b.done = nil
  295. b.responses = nil
  296. b.metricRegistry.UnregisterAll()
  297. if err == nil {
  298. DebugLogger.Printf("Closed connection to broker %s\n", b.addr)
  299. } else {
  300. Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
  301. }
  302. atomic.StoreInt32(&b.opened, 0)
  303. return err
  304. }
  305. // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
  306. func (b *Broker) ID() int32 {
  307. return b.id
  308. }
  309. // Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.
  310. func (b *Broker) Addr() string {
  311. return b.addr
  312. }
  313. // Rack returns the broker's rack as retrieved from Kafka's metadata or the
  314. // empty string if it is not known. The returned value corresponds to the
  315. // broker's broker.rack configuration setting. Requires protocol version to be
  316. // at least v0.10.0.0.
  317. func (b *Broker) Rack() string {
  318. if b.rack == nil {
  319. return ""
  320. }
  321. return *b.rack
  322. }
  323. // GetMetadata send a metadata request and returns a metadata response or error
  324. func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
  325. response := new(MetadataResponse)
  326. response.Version = request.Version // Required to ensure use of the correct response header version
  327. err := b.sendAndReceive(request, response)
  328. if err != nil {
  329. return nil, err
  330. }
  331. return response, nil
  332. }
  333. // GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
  334. func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
  335. response := new(ConsumerMetadataResponse)
  336. err := b.sendAndReceive(request, response)
  337. if err != nil {
  338. return nil, err
  339. }
  340. return response, nil
  341. }
  342. // FindCoordinator sends a find coordinate request and returns a response or error
  343. func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  344. response := new(FindCoordinatorResponse)
  345. err := b.sendAndReceive(request, response)
  346. if err != nil {
  347. return nil, err
  348. }
  349. return response, nil
  350. }
  351. // GetAvailableOffsets return an offset response or error
  352. func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
  353. response := new(OffsetResponse)
  354. err := b.sendAndReceive(request, response)
  355. if err != nil {
  356. return nil, err
  357. }
  358. return response, nil
  359. }
  360. // ProduceCallback function is called once the produce response has been parsed
  361. // or could not be read.
  362. type ProduceCallback func(*ProduceResponse, error)
  363. // AsyncProduce sends a produce request and eventually call the provided callback
  364. // with a produce response or an error.
  365. //
  366. // Waiting for the response is generally not blocking on the contrary to using Produce.
  367. // If the maximum number of in flight request configured is reached then
  368. // the request will be blocked till a previous response is received.
  369. //
  370. // When configured with RequiredAcks == NoResponse, the callback will not be invoked.
  371. // If an error is returned because the request could not be sent then the callback
  372. // will not be invoked either.
  373. //
  374. // Make sure not to Close the broker in the callback as it will lead to a deadlock.
  375. func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
  376. b.lock.Lock()
  377. defer b.lock.Unlock()
  378. needAcks := request.RequiredAcks != NoResponse
  379. // Use a nil promise when no acks is required
  380. var promise *responsePromise
  381. if needAcks {
  382. metricRegistry := b.metricRegistry
  383. // Create ProduceResponse early to provide the header version
  384. res := new(ProduceResponse)
  385. promise = &responsePromise{
  386. headerVersion: res.headerVersion(),
  387. // Packets will be converted to a ProduceResponse in the responseReceiver goroutine
  388. handler: func(packets []byte, err error) {
  389. if err != nil {
  390. // Failed request
  391. cb(nil, err)
  392. return
  393. }
  394. if err := versionedDecode(packets, res, request.version(), metricRegistry); err != nil {
  395. // Malformed response
  396. cb(nil, err)
  397. return
  398. }
  399. // Well-formed response
  400. b.handleThrottledResponse(res)
  401. cb(res, nil)
  402. },
  403. }
  404. }
  405. return b.sendWithPromise(request, promise)
  406. }
  407. // Produce returns a produce response or error
  408. func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
  409. var (
  410. response *ProduceResponse
  411. err error
  412. )
  413. if request.RequiredAcks == NoResponse {
  414. err = b.sendAndReceive(request, nil)
  415. } else {
  416. response = new(ProduceResponse)
  417. err = b.sendAndReceive(request, response)
  418. }
  419. if err != nil {
  420. return nil, err
  421. }
  422. return response, nil
  423. }
  424. // Fetch returns a FetchResponse or error
  425. func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  426. defer func() {
  427. if b.fetchRate != nil {
  428. b.fetchRate.Mark(1)
  429. }
  430. if b.brokerFetchRate != nil {
  431. b.brokerFetchRate.Mark(1)
  432. }
  433. }()
  434. response := new(FetchResponse)
  435. err := b.sendAndReceive(request, response)
  436. if err != nil {
  437. return nil, err
  438. }
  439. return response, nil
  440. }
  441. // CommitOffset return an Offset commit response or error
  442. func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
  443. response := new(OffsetCommitResponse)
  444. err := b.sendAndReceive(request, response)
  445. if err != nil {
  446. return nil, err
  447. }
  448. return response, nil
  449. }
  450. // FetchOffset returns an offset fetch response or error
  451. func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
  452. response := new(OffsetFetchResponse)
  453. response.Version = request.Version // needed to handle the two header versions
  454. err := b.sendAndReceive(request, response)
  455. if err != nil {
  456. return nil, err
  457. }
  458. return response, nil
  459. }
  460. // JoinGroup returns a join group response or error
  461. func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
  462. response := new(JoinGroupResponse)
  463. err := b.sendAndReceive(request, response)
  464. if err != nil {
  465. return nil, err
  466. }
  467. return response, nil
  468. }
  469. // SyncGroup returns a sync group response or error
  470. func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
  471. response := new(SyncGroupResponse)
  472. err := b.sendAndReceive(request, response)
  473. if err != nil {
  474. return nil, err
  475. }
  476. return response, nil
  477. }
  478. // LeaveGroup return a leave group response or error
  479. func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  480. response := new(LeaveGroupResponse)
  481. err := b.sendAndReceive(request, response)
  482. if err != nil {
  483. return nil, err
  484. }
  485. return response, nil
  486. }
  487. // Heartbeat returns a heartbeat response or error
  488. func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
  489. response := new(HeartbeatResponse)
  490. err := b.sendAndReceive(request, response)
  491. if err != nil {
  492. return nil, err
  493. }
  494. return response, nil
  495. }
  496. // ListGroups return a list group response or error
  497. func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
  498. response := new(ListGroupsResponse)
  499. response.Version = request.Version // Required to ensure use of the correct response header version
  500. err := b.sendAndReceive(request, response)
  501. if err != nil {
  502. return nil, err
  503. }
  504. return response, nil
  505. }
  506. // DescribeGroups return describe group response or error
  507. func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
  508. response := new(DescribeGroupsResponse)
  509. err := b.sendAndReceive(request, response)
  510. if err != nil {
  511. return nil, err
  512. }
  513. return response, nil
  514. }
  515. // ApiVersions return api version response or error
  516. func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
  517. response := new(ApiVersionsResponse)
  518. err := b.sendAndReceive(request, response)
  519. if err != nil {
  520. return nil, err
  521. }
  522. return response, nil
  523. }
  524. // CreateTopics send a create topic request and returns create topic response
  525. func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  526. response := new(CreateTopicsResponse)
  527. err := b.sendAndReceive(request, response)
  528. if err != nil {
  529. return nil, err
  530. }
  531. return response, nil
  532. }
  533. // DeleteTopics sends a delete topic request and returns delete topic response
  534. func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  535. response := new(DeleteTopicsResponse)
  536. err := b.sendAndReceive(request, response)
  537. if err != nil {
  538. return nil, err
  539. }
  540. return response, nil
  541. }
  542. // CreatePartitions sends a create partition request and returns create
  543. // partitions response or error
  544. func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
  545. response := new(CreatePartitionsResponse)
  546. err := b.sendAndReceive(request, response)
  547. if err != nil {
  548. return nil, err
  549. }
  550. return response, nil
  551. }
  552. // AlterPartitionReassignments sends a alter partition reassignments request and
  553. // returns alter partition reassignments response
  554. func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
  555. response := new(AlterPartitionReassignmentsResponse)
  556. err := b.sendAndReceive(request, response)
  557. if err != nil {
  558. return nil, err
  559. }
  560. return response, nil
  561. }
  562. // ListPartitionReassignments sends a list partition reassignments request and
  563. // returns list partition reassignments response
  564. func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
  565. response := new(ListPartitionReassignmentsResponse)
  566. err := b.sendAndReceive(request, response)
  567. if err != nil {
  568. return nil, err
  569. }
  570. return response, nil
  571. }
  572. // DeleteRecords send a request to delete records and return delete record
  573. // response or error
  574. func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
  575. response := new(DeleteRecordsResponse)
  576. err := b.sendAndReceive(request, response)
  577. if err != nil {
  578. return nil, err
  579. }
  580. return response, nil
  581. }
  582. // DescribeAcls sends a describe acl request and returns a response or error
  583. func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
  584. response := new(DescribeAclsResponse)
  585. err := b.sendAndReceive(request, response)
  586. if err != nil {
  587. return nil, err
  588. }
  589. return response, nil
  590. }
  591. // CreateAcls sends a create acl request and returns a response or error
  592. func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
  593. response := new(CreateAclsResponse)
  594. err := b.sendAndReceive(request, response)
  595. if err != nil {
  596. return nil, err
  597. }
  598. errs := make([]error, 0)
  599. for _, res := range response.AclCreationResponses {
  600. if !errors.Is(res.Err, ErrNoError) {
  601. errs = append(errs, res.Err)
  602. }
  603. }
  604. if len(errs) > 0 {
  605. return response, Wrap(ErrCreateACLs, errs...)
  606. }
  607. return response, nil
  608. }
  609. // DeleteAcls sends a delete acl request and returns a response or error
  610. func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
  611. response := new(DeleteAclsResponse)
  612. err := b.sendAndReceive(request, response)
  613. if err != nil {
  614. return nil, err
  615. }
  616. return response, nil
  617. }
  618. // InitProducerID sends an init producer request and returns a response or error
  619. func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
  620. response := new(InitProducerIDResponse)
  621. response.Version = request.version()
  622. err := b.sendAndReceive(request, response)
  623. if err != nil {
  624. return nil, err
  625. }
  626. return response, nil
  627. }
  628. // AddPartitionsToTxn send a request to add partition to txn and returns
  629. // a response or error
  630. func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
  631. response := new(AddPartitionsToTxnResponse)
  632. err := b.sendAndReceive(request, response)
  633. if err != nil {
  634. return nil, err
  635. }
  636. return response, nil
  637. }
  638. // AddOffsetsToTxn sends a request to add offsets to txn and returns a response
  639. // or error
  640. func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
  641. response := new(AddOffsetsToTxnResponse)
  642. err := b.sendAndReceive(request, response)
  643. if err != nil {
  644. return nil, err
  645. }
  646. return response, nil
  647. }
  648. // EndTxn sends a request to end txn and returns a response or error
  649. func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
  650. response := new(EndTxnResponse)
  651. err := b.sendAndReceive(request, response)
  652. if err != nil {
  653. return nil, err
  654. }
  655. return response, nil
  656. }
  657. // TxnOffsetCommit sends a request to commit transaction offsets and returns
  658. // a response or error
  659. func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
  660. response := new(TxnOffsetCommitResponse)
  661. err := b.sendAndReceive(request, response)
  662. if err != nil {
  663. return nil, err
  664. }
  665. return response, nil
  666. }
  667. // DescribeConfigs sends a request to describe config and returns a response or
  668. // error
  669. func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
  670. response := new(DescribeConfigsResponse)
  671. err := b.sendAndReceive(request, response)
  672. if err != nil {
  673. return nil, err
  674. }
  675. return response, nil
  676. }
  677. // AlterConfigs sends a request to alter config and return a response or error
  678. func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
  679. response := new(AlterConfigsResponse)
  680. err := b.sendAndReceive(request, response)
  681. if err != nil {
  682. return nil, err
  683. }
  684. return response, nil
  685. }
  686. // IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
  687. func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
  688. response := new(IncrementalAlterConfigsResponse)
  689. err := b.sendAndReceive(request, response)
  690. if err != nil {
  691. return nil, err
  692. }
  693. return response, nil
  694. }
  695. // DeleteGroups sends a request to delete groups and returns a response or error
  696. func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
  697. response := new(DeleteGroupsResponse)
  698. if err := b.sendAndReceive(request, response); err != nil {
  699. return nil, err
  700. }
  701. return response, nil
  702. }
  703. // DeleteOffsets sends a request to delete group offsets and returns a response or error
  704. func (b *Broker) DeleteOffsets(request *DeleteOffsetsRequest) (*DeleteOffsetsResponse, error) {
  705. response := new(DeleteOffsetsResponse)
  706. if err := b.sendAndReceive(request, response); err != nil {
  707. return nil, err
  708. }
  709. return response, nil
  710. }
  711. // DescribeLogDirs sends a request to get the broker's log dir paths and sizes
  712. func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
  713. response := new(DescribeLogDirsResponse)
  714. err := b.sendAndReceive(request, response)
  715. if err != nil {
  716. return nil, err
  717. }
  718. return response, nil
  719. }
  720. // DescribeUserScramCredentials sends a request to get SCRAM users
  721. func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
  722. res := new(DescribeUserScramCredentialsResponse)
  723. err := b.sendAndReceive(req, res)
  724. if err != nil {
  725. return nil, err
  726. }
  727. return res, err
  728. }
  729. func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
  730. res := new(AlterUserScramCredentialsResponse)
  731. err := b.sendAndReceive(req, res)
  732. if err != nil {
  733. return nil, err
  734. }
  735. return res, nil
  736. }
  737. // DescribeClientQuotas sends a request to get the broker's quotas
  738. func (b *Broker) DescribeClientQuotas(request *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) {
  739. response := new(DescribeClientQuotasResponse)
  740. err := b.sendAndReceive(request, response)
  741. if err != nil {
  742. return nil, err
  743. }
  744. return response, nil
  745. }
  746. // AlterClientQuotas sends a request to alter the broker's quotas
  747. func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
  748. response := new(AlterClientQuotasResponse)
  749. err := b.sendAndReceive(request, response)
  750. if err != nil {
  751. return nil, err
  752. }
  753. return response, nil
  754. }
  755. // readFull ensures the conn ReadDeadline has been setup before making a
  756. // call to io.ReadFull
  757. func (b *Broker) readFull(buf []byte) (n int, err error) {
  758. if err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)); err != nil {
  759. return 0, err
  760. }
  761. return io.ReadFull(b.conn, buf)
  762. }
  763. // write ensures the conn WriteDeadline has been setup before making a
  764. // call to conn.Write
  765. func (b *Broker) write(buf []byte) (n int, err error) {
  766. if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
  767. return 0, err
  768. }
  769. return b.conn.Write(buf)
  770. }
  771. // b.lock must be held by caller
  772. func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
  773. var promise *responsePromise
  774. if promiseResponse {
  775. // Packets or error will be sent to the following channels
  776. // once the response is received
  777. promise = makeResponsePromise(responseHeaderVersion)
  778. }
  779. if err := b.sendWithPromise(rb, promise); err != nil {
  780. return nil, err
  781. }
  782. return promise, nil
  783. }
  784. func makeResponsePromise(responseHeaderVersion int16) *responsePromise {
  785. promise := &responsePromise{
  786. headerVersion: responseHeaderVersion,
  787. packets: make(chan []byte),
  788. errors: make(chan error),
  789. }
  790. return promise
  791. }
  792. // b.lock must be held by caller
  793. func (b *Broker) sendWithPromise(rb protocolBody, promise *responsePromise) error {
  794. if b.conn == nil {
  795. if b.connErr != nil {
  796. return b.connErr
  797. }
  798. return ErrNotConnected
  799. }
  800. if b.clientSessionReauthenticationTimeMs > 0 && currentUnixMilli() > b.clientSessionReauthenticationTimeMs {
  801. err := b.authenticateViaSASLv1()
  802. if err != nil {
  803. return err
  804. }
  805. }
  806. return b.sendInternal(rb, promise)
  807. }
  808. // b.lock must be held by caller
  809. func (b *Broker) sendInternal(rb protocolBody, promise *responsePromise) error {
  810. if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
  811. return ErrUnsupportedVersion
  812. }
  813. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  814. buf, err := encode(req, b.metricRegistry)
  815. if err != nil {
  816. return err
  817. }
  818. // check and wait if throttled
  819. b.waitIfThrottled()
  820. requestTime := time.Now()
  821. // Will be decremented in responseReceiver (except error or request with NoResponse)
  822. b.addRequestInFlightMetrics(1)
  823. bytes, err := b.write(buf)
  824. b.updateOutgoingCommunicationMetrics(bytes)
  825. b.updateProtocolMetrics(rb)
  826. if err != nil {
  827. b.addRequestInFlightMetrics(-1)
  828. return err
  829. }
  830. b.correlationID++
  831. if promise == nil {
  832. // Record request latency without the response
  833. b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
  834. return nil
  835. }
  836. promise.requestTime = requestTime
  837. promise.correlationID = req.correlationID
  838. b.responses <- promise
  839. return nil
  840. }
  841. func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
  842. b.lock.Lock()
  843. defer b.lock.Unlock()
  844. responseHeaderVersion := int16(-1)
  845. if res != nil {
  846. responseHeaderVersion = res.headerVersion()
  847. }
  848. promise, err := b.send(req, res != nil, responseHeaderVersion)
  849. if err != nil {
  850. return err
  851. }
  852. if promise == nil {
  853. return nil
  854. }
  855. err = handleResponsePromise(req, res, promise, b.metricRegistry)
  856. if err != nil {
  857. return err
  858. }
  859. if res != nil {
  860. b.handleThrottledResponse(res)
  861. }
  862. return nil
  863. }
  864. func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise, metricRegistry metrics.Registry) error {
  865. select {
  866. case buf := <-promise.packets:
  867. return versionedDecode(buf, res, req.version(), metricRegistry)
  868. case err := <-promise.errors:
  869. return err
  870. }
  871. }
  872. func (b *Broker) decode(pd packetDecoder, version int16) (err error) {
  873. b.id, err = pd.getInt32()
  874. if err != nil {
  875. return err
  876. }
  877. var host string
  878. if version < 9 {
  879. host, err = pd.getString()
  880. } else {
  881. host, err = pd.getCompactString()
  882. }
  883. if err != nil {
  884. return err
  885. }
  886. port, err := pd.getInt32()
  887. if err != nil {
  888. return err
  889. }
  890. if version >= 1 && version < 9 {
  891. b.rack, err = pd.getNullableString()
  892. } else if version >= 9 {
  893. b.rack, err = pd.getCompactNullableString()
  894. }
  895. if err != nil {
  896. return err
  897. }
  898. b.addr = net.JoinHostPort(host, fmt.Sprint(port))
  899. if _, _, err := net.SplitHostPort(b.addr); err != nil {
  900. return err
  901. }
  902. if version >= 9 {
  903. _, err := pd.getEmptyTaggedFieldArray()
  904. if err != nil {
  905. return err
  906. }
  907. }
  908. return nil
  909. }
  910. func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
  911. host, portstr, err := net.SplitHostPort(b.addr)
  912. if err != nil {
  913. return err
  914. }
  915. port, err := strconv.ParseInt(portstr, 10, 32)
  916. if err != nil {
  917. return err
  918. }
  919. pe.putInt32(b.id)
  920. if version < 9 {
  921. err = pe.putString(host)
  922. } else {
  923. err = pe.putCompactString(host)
  924. }
  925. if err != nil {
  926. return err
  927. }
  928. pe.putInt32(int32(port))
  929. if version >= 1 {
  930. if version < 9 {
  931. err = pe.putNullableString(b.rack)
  932. } else {
  933. err = pe.putNullableCompactString(b.rack)
  934. }
  935. if err != nil {
  936. return err
  937. }
  938. }
  939. if version >= 9 {
  940. pe.putEmptyTaggedFieldArray()
  941. }
  942. return nil
  943. }
  944. func (b *Broker) responseReceiver() {
  945. var dead error
  946. for response := range b.responses {
  947. if dead != nil {
  948. // This was previously incremented in send() and
  949. // we are not calling updateIncomingCommunicationMetrics()
  950. b.addRequestInFlightMetrics(-1)
  951. response.handle(nil, dead)
  952. continue
  953. }
  954. headerLength := getHeaderLength(response.headerVersion)
  955. header := make([]byte, headerLength)
  956. bytesReadHeader, err := b.readFull(header)
  957. requestLatency := time.Since(response.requestTime)
  958. if err != nil {
  959. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  960. dead = err
  961. response.handle(nil, err)
  962. continue
  963. }
  964. decodedHeader := responseHeader{}
  965. err = versionedDecode(header, &decodedHeader, response.headerVersion, b.metricRegistry)
  966. if err != nil {
  967. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  968. dead = err
  969. response.handle(nil, err)
  970. continue
  971. }
  972. if decodedHeader.correlationID != response.correlationID {
  973. b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
  974. // TODO if decoded ID < cur ID, discard until we catch up
  975. // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
  976. dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
  977. response.handle(nil, dead)
  978. continue
  979. }
  980. buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
  981. bytesReadBody, err := b.readFull(buf)
  982. b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
  983. if err != nil {
  984. dead = err
  985. response.handle(nil, err)
  986. continue
  987. }
  988. response.handle(buf, nil)
  989. }
  990. close(b.done)
  991. }
  992. func getHeaderLength(headerVersion int16) int8 {
  993. if headerVersion < 1 {
  994. return 8
  995. } else {
  996. // header contains additional tagged field length (0), we don't support actual tags yet.
  997. return 9
  998. }
  999. }
  1000. func (b *Broker) authenticateViaSASLv0() error {
  1001. switch b.conf.Net.SASL.Mechanism {
  1002. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  1003. return b.sendAndReceiveSASLSCRAMv0()
  1004. case SASLTypeGSSAPI:
  1005. return b.sendAndReceiveKerberos()
  1006. default:
  1007. return b.sendAndReceiveSASLPlainAuthV0()
  1008. }
  1009. }
  1010. func (b *Broker) authenticateViaSASLv1() error {
  1011. metricRegistry := b.metricRegistry
  1012. if b.conf.Net.SASL.Handshake {
  1013. handshakeRequest := &SaslHandshakeRequest{Mechanism: string(b.conf.Net.SASL.Mechanism), Version: b.conf.Net.SASL.Version}
  1014. handshakeResponse := new(SaslHandshakeResponse)
  1015. prom := makeResponsePromise(handshakeResponse.version())
  1016. handshakeErr := b.sendInternal(handshakeRequest, prom)
  1017. if handshakeErr != nil {
  1018. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  1019. return handshakeErr
  1020. }
  1021. handshakeErr = handleResponsePromise(handshakeRequest, handshakeResponse, prom, metricRegistry)
  1022. if handshakeErr != nil {
  1023. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  1024. return handshakeErr
  1025. }
  1026. if !errors.Is(handshakeResponse.Err, ErrNoError) {
  1027. return handshakeResponse.Err
  1028. }
  1029. }
  1030. authSendReceiver := func(authBytes []byte) (*SaslAuthenticateResponse, error) {
  1031. authenticateRequest := b.createSaslAuthenticateRequest(authBytes)
  1032. authenticateResponse := new(SaslAuthenticateResponse)
  1033. prom := makeResponsePromise(authenticateResponse.version())
  1034. authErr := b.sendInternal(authenticateRequest, prom)
  1035. if authErr != nil {
  1036. Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
  1037. return nil, authErr
  1038. }
  1039. authErr = handleResponsePromise(authenticateRequest, authenticateResponse, prom, metricRegistry)
  1040. if authErr != nil {
  1041. Logger.Printf("Error while performing SASL Auth %s\n", b.addr)
  1042. return nil, authErr
  1043. }
  1044. if !errors.Is(authenticateResponse.Err, ErrNoError) {
  1045. var err error = authenticateResponse.Err
  1046. if authenticateResponse.ErrorMessage != nil {
  1047. err = Wrap(authenticateResponse.Err, errors.New(*authenticateResponse.ErrorMessage))
  1048. }
  1049. return nil, err
  1050. }
  1051. b.computeSaslSessionLifetime(authenticateResponse)
  1052. return authenticateResponse, nil
  1053. }
  1054. switch b.conf.Net.SASL.Mechanism {
  1055. case SASLTypeOAuth:
  1056. provider := b.conf.Net.SASL.TokenProvider
  1057. return b.sendAndReceiveSASLOAuth(authSendReceiver, provider)
  1058. case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
  1059. return b.sendAndReceiveSASLSCRAMv1(authSendReceiver, b.conf.Net.SASL.SCRAMClientGeneratorFunc())
  1060. default:
  1061. return b.sendAndReceiveSASLPlainAuthV1(authSendReceiver)
  1062. }
  1063. }
  1064. func (b *Broker) sendAndReceiveKerberos() error {
  1065. b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
  1066. if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
  1067. b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
  1068. }
  1069. return b.kerberosAuthenticator.Authorize(b)
  1070. }
  1071. func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
  1072. rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
  1073. req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
  1074. buf, err := encode(req, b.metricRegistry)
  1075. if err != nil {
  1076. return err
  1077. }
  1078. requestTime := time.Now()
  1079. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  1080. b.addRequestInFlightMetrics(1)
  1081. bytes, err := b.write(buf)
  1082. b.updateOutgoingCommunicationMetrics(bytes)
  1083. if err != nil {
  1084. b.addRequestInFlightMetrics(-1)
  1085. Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
  1086. return err
  1087. }
  1088. b.correlationID++
  1089. header := make([]byte, 8) // response header
  1090. _, err = b.readFull(header)
  1091. if err != nil {
  1092. b.addRequestInFlightMetrics(-1)
  1093. Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
  1094. return err
  1095. }
  1096. length := binary.BigEndian.Uint32(header[:4])
  1097. payload := make([]byte, length-4)
  1098. n, err := b.readFull(payload)
  1099. if err != nil {
  1100. b.addRequestInFlightMetrics(-1)
  1101. Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
  1102. return err
  1103. }
  1104. b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
  1105. res := &SaslHandshakeResponse{}
  1106. err = versionedDecode(payload, res, 0, b.metricRegistry)
  1107. if err != nil {
  1108. Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
  1109. return err
  1110. }
  1111. if !errors.Is(res.Err, ErrNoError) {
  1112. Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
  1113. return res.Err
  1114. }
  1115. DebugLogger.Print("Completed pre-auth SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
  1116. return nil
  1117. }
  1118. //
  1119. // In SASL Plain, Kafka expects the auth header to be in the following format
  1120. // Message format (from https://tools.ietf.org/html/rfc4616):
  1121. //
  1122. // message = [authzid] UTF8NUL authcid UTF8NUL passwd
  1123. // authcid = 1*SAFE ; MUST accept up to 255 octets
  1124. // authzid = 1*SAFE ; MUST accept up to 255 octets
  1125. // passwd = 1*SAFE ; MUST accept up to 255 octets
  1126. // UTF8NUL = %x00 ; UTF-8 encoded NUL character
  1127. //
  1128. // SAFE = UTF1 / UTF2 / UTF3 / UTF4
  1129. // ;; any UTF-8 encoded Unicode character except NUL
  1130. //
  1131. //
  1132. // Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
  1133. // sendAndReceiveSASLPlainAuthV0 flows the v0 sasl auth NOT wrapped in the kafka protocol
  1134. //
  1135. // With SASL v0 handshake and auth then:
  1136. // When credentials are valid, Kafka returns a 4 byte array of null characters.
  1137. // When credentials are invalid, Kafka closes the connection.
  1138. func (b *Broker) sendAndReceiveSASLPlainAuthV0() error {
  1139. // default to V0 to allow for backward compatibility when SASL is enabled
  1140. // but not the handshake
  1141. if b.conf.Net.SASL.Handshake {
  1142. handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
  1143. if handshakeErr != nil {
  1144. Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
  1145. return handshakeErr
  1146. }
  1147. }
  1148. length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
  1149. authBytes := make([]byte, length+4) // 4 byte length header + auth data
  1150. binary.BigEndian.PutUint32(authBytes, uint32(length))
  1151. copy(authBytes[4:], b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password)
  1152. requestTime := time.Now()
  1153. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  1154. b.addRequestInFlightMetrics(1)
  1155. bytesWritten, err := b.write(authBytes)
  1156. b.updateOutgoingCommunicationMetrics(bytesWritten)
  1157. if err != nil {
  1158. b.addRequestInFlightMetrics(-1)
  1159. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  1160. return err
  1161. }
  1162. header := make([]byte, 4)
  1163. n, err := b.readFull(header)
  1164. b.updateIncomingCommunicationMetrics(n, time.Since(requestTime))
  1165. // If the credentials are valid, we would get a 4 byte response filled with null characters.
  1166. // Otherwise, the broker closes the connection and we get an EOF
  1167. if err != nil {
  1168. Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  1169. return err
  1170. }
  1171. DebugLogger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
  1172. return nil
  1173. }
  1174. // Kafka 1.x.x onward added a SaslAuthenticate request/response message which
  1175. // wraps the SASL flow in the Kafka protocol, which allows for returning
  1176. // meaningful errors on authentication failure.
  1177. func (b *Broker) sendAndReceiveSASLPlainAuthV1(authSendReceiver func(authBytes []byte) (*SaslAuthenticateResponse, error)) error {
  1178. authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
  1179. _, err := authSendReceiver(authBytes)
  1180. if err != nil {
  1181. return err
  1182. }
  1183. return err
  1184. }
  1185. func currentUnixMilli() int64 {
  1186. return time.Now().UnixNano() / int64(time.Millisecond)
  1187. }
  1188. // sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
  1189. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
  1190. func (b *Broker) sendAndReceiveSASLOAuth(authSendReceiver func(authBytes []byte) (*SaslAuthenticateResponse, error), provider AccessTokenProvider) error {
  1191. token, err := provider.Token()
  1192. if err != nil {
  1193. return err
  1194. }
  1195. message, err := buildClientFirstMessage(token)
  1196. if err != nil {
  1197. return err
  1198. }
  1199. res, err := authSendReceiver(message)
  1200. if err != nil {
  1201. return err
  1202. }
  1203. isChallenge := len(res.SaslAuthBytes) > 0
  1204. if isChallenge {
  1205. // Abort the token exchange. The broker returns the failure code.
  1206. _, err = authSendReceiver([]byte(`\x01`))
  1207. }
  1208. return err
  1209. }
  1210. func (b *Broker) sendAndReceiveSASLSCRAMv0() error {
  1211. if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV0); err != nil {
  1212. return err
  1213. }
  1214. scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
  1215. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  1216. return fmt.Errorf("failed to start SCRAM exchange with the server: %w", err)
  1217. }
  1218. msg, err := scramClient.Step("")
  1219. if err != nil {
  1220. return fmt.Errorf("failed to advance the SCRAM exchange: %w", err)
  1221. }
  1222. for !scramClient.Done() {
  1223. requestTime := time.Now()
  1224. // Will be decremented in updateIncomingCommunicationMetrics (except error)
  1225. b.addRequestInFlightMetrics(1)
  1226. length := len(msg)
  1227. authBytes := make([]byte, length+4) // 4 byte length header + auth data
  1228. binary.BigEndian.PutUint32(authBytes, uint32(length))
  1229. copy(authBytes[4:], msg)
  1230. _, err := b.write(authBytes)
  1231. b.updateOutgoingCommunicationMetrics(length + 4)
  1232. if err != nil {
  1233. b.addRequestInFlightMetrics(-1)
  1234. Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
  1235. return err
  1236. }
  1237. b.correlationID++
  1238. header := make([]byte, 4)
  1239. _, err = b.readFull(header)
  1240. if err != nil {
  1241. b.addRequestInFlightMetrics(-1)
  1242. Logger.Printf("Failed to read response header while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  1243. return err
  1244. }
  1245. payload := make([]byte, int32(binary.BigEndian.Uint32(header)))
  1246. n, err := b.readFull(payload)
  1247. if err != nil {
  1248. b.addRequestInFlightMetrics(-1)
  1249. Logger.Printf("Failed to read response payload while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
  1250. return err
  1251. }
  1252. b.updateIncomingCommunicationMetrics(n+4, time.Since(requestTime))
  1253. msg, err = scramClient.Step(string(payload))
  1254. if err != nil {
  1255. Logger.Println("SASL authentication failed", err)
  1256. return err
  1257. }
  1258. }
  1259. DebugLogger.Println("SASL authentication succeeded")
  1260. return nil
  1261. }
  1262. func (b *Broker) sendAndReceiveSASLSCRAMv1(authSendReceiver func(authBytes []byte) (*SaslAuthenticateResponse, error), scramClient SCRAMClient) error {
  1263. if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
  1264. return fmt.Errorf("failed to start SCRAM exchange with the server: %w", err)
  1265. }
  1266. msg, err := scramClient.Step("")
  1267. if err != nil {
  1268. return fmt.Errorf("failed to advance the SCRAM exchange: %w", err)
  1269. }
  1270. for !scramClient.Done() {
  1271. res, err := authSendReceiver([]byte(msg))
  1272. if err != nil {
  1273. return err
  1274. }
  1275. msg, err = scramClient.Step(string(res.SaslAuthBytes))
  1276. if err != nil {
  1277. Logger.Println("SASL authentication failed", err)
  1278. return err
  1279. }
  1280. }
  1281. DebugLogger.Println("SASL authentication succeeded")
  1282. return nil
  1283. }
  1284. func (b *Broker) createSaslAuthenticateRequest(msg []byte) *SaslAuthenticateRequest {
  1285. authenticateRequest := SaslAuthenticateRequest{SaslAuthBytes: msg}
  1286. if b.conf.Version.IsAtLeast(V2_2_0_0) {
  1287. authenticateRequest.Version = 1
  1288. }
  1289. return &authenticateRequest
  1290. }
  1291. // Build SASL/OAUTHBEARER initial client response as described by RFC-7628
  1292. // https://tools.ietf.org/html/rfc7628
  1293. func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
  1294. var ext string
  1295. if token.Extensions != nil && len(token.Extensions) > 0 {
  1296. if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
  1297. return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
  1298. }
  1299. ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
  1300. }
  1301. resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
  1302. return resp, nil
  1303. }
  1304. // mapToString returns a list of key-value pairs ordered by key.
  1305. // keyValSep separates the key from the value. elemSep separates each pair.
  1306. func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
  1307. buf := make([]string, 0, len(extensions))
  1308. for k, v := range extensions {
  1309. buf = append(buf, k+keyValSep+v)
  1310. }
  1311. sort.Strings(buf)
  1312. return strings.Join(buf, elemSep)
  1313. }
  1314. func (b *Broker) computeSaslSessionLifetime(res *SaslAuthenticateResponse) {
  1315. if res.SessionLifetimeMs > 0 {
  1316. // Follows the Java Kafka implementation from SaslClientAuthenticator.ReauthInfo#setAuthenticationEndAndSessionReauthenticationTimes
  1317. // pick a random percentage between 85% and 95% for session re-authentication
  1318. positiveSessionLifetimeMs := res.SessionLifetimeMs
  1319. authenticationEndMs := currentUnixMilli()
  1320. pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount := 0.85
  1321. pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously := 0.10
  1322. pctToUse := pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount + rand.Float64()*pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously
  1323. sessionLifetimeMsToUse := int64(float64(positiveSessionLifetimeMs) * pctToUse)
  1324. DebugLogger.Printf("Session expiration in %d ms and session re-authentication on or after %d ms", positiveSessionLifetimeMs, sessionLifetimeMsToUse)
  1325. b.clientSessionReauthenticationTimeMs = authenticationEndMs + sessionLifetimeMsToUse
  1326. } else {
  1327. b.clientSessionReauthenticationTimeMs = 0
  1328. }
  1329. }
  1330. func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
  1331. b.updateRequestLatencyAndInFlightMetrics(requestLatency)
  1332. b.responseRate.Mark(1)
  1333. if b.brokerResponseRate != nil {
  1334. b.brokerResponseRate.Mark(1)
  1335. }
  1336. responseSize := int64(bytes)
  1337. b.incomingByteRate.Mark(responseSize)
  1338. if b.brokerIncomingByteRate != nil {
  1339. b.brokerIncomingByteRate.Mark(responseSize)
  1340. }
  1341. b.responseSize.Update(responseSize)
  1342. if b.brokerResponseSize != nil {
  1343. b.brokerResponseSize.Update(responseSize)
  1344. }
  1345. }
  1346. func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
  1347. requestLatencyInMs := int64(requestLatency / time.Millisecond)
  1348. b.requestLatency.Update(requestLatencyInMs)
  1349. if b.brokerRequestLatency != nil {
  1350. b.brokerRequestLatency.Update(requestLatencyInMs)
  1351. }
  1352. b.addRequestInFlightMetrics(-1)
  1353. }
  1354. func (b *Broker) addRequestInFlightMetrics(i int64) {
  1355. b.requestsInFlight.Inc(i)
  1356. if b.brokerRequestsInFlight != nil {
  1357. b.brokerRequestsInFlight.Inc(i)
  1358. }
  1359. }
  1360. func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
  1361. b.requestRate.Mark(1)
  1362. if b.brokerRequestRate != nil {
  1363. b.brokerRequestRate.Mark(1)
  1364. }
  1365. requestSize := int64(bytes)
  1366. b.outgoingByteRate.Mark(requestSize)
  1367. if b.brokerOutgoingByteRate != nil {
  1368. b.brokerOutgoingByteRate.Mark(requestSize)
  1369. }
  1370. b.requestSize.Update(requestSize)
  1371. if b.brokerRequestSize != nil {
  1372. b.brokerRequestSize.Update(requestSize)
  1373. }
  1374. }
  1375. func (b *Broker) updateProtocolMetrics(rb protocolBody) {
  1376. protocolRequestsRate := b.protocolRequestsRate[rb.key()]
  1377. if protocolRequestsRate == nil {
  1378. protocolRequestsRate = metrics.GetOrRegisterMeter(fmt.Sprintf("protocol-requests-rate-%d", rb.key()), b.metricRegistry)
  1379. b.protocolRequestsRate[rb.key()] = protocolRequestsRate
  1380. }
  1381. protocolRequestsRate.Mark(1)
  1382. if b.brokerProtocolRequestsRate != nil {
  1383. brokerProtocolRequestsRate := b.brokerProtocolRequestsRate[rb.key()]
  1384. if brokerProtocolRequestsRate == nil {
  1385. brokerProtocolRequestsRate = b.registerMeter(fmt.Sprintf("protocol-requests-rate-%d", rb.key()))
  1386. b.brokerProtocolRequestsRate[rb.key()] = brokerProtocolRequestsRate
  1387. }
  1388. brokerProtocolRequestsRate.Mark(1)
  1389. }
  1390. }
  1391. type throttleSupport interface {
  1392. throttleTime() time.Duration
  1393. }
  1394. func (b *Broker) handleThrottledResponse(resp protocolBody) {
  1395. throttledResponse, ok := resp.(throttleSupport)
  1396. if !ok {
  1397. return
  1398. }
  1399. throttleTime := throttledResponse.throttleTime()
  1400. if throttleTime == time.Duration(0) {
  1401. return
  1402. }
  1403. DebugLogger.Printf(
  1404. "broker/%d %T throttled %v\n", b.ID(), resp, throttleTime)
  1405. b.setThrottle(throttleTime)
  1406. b.updateThrottleMetric(throttleTime)
  1407. }
  1408. func (b *Broker) setThrottle(throttleTime time.Duration) {
  1409. b.throttleTimerLock.Lock()
  1410. defer b.throttleTimerLock.Unlock()
  1411. if b.throttleTimer != nil {
  1412. // if there is an existing timer stop/clear it
  1413. if !b.throttleTimer.Stop() {
  1414. <-b.throttleTimer.C
  1415. }
  1416. }
  1417. b.throttleTimer = time.NewTimer(throttleTime)
  1418. }
  1419. func (b *Broker) waitIfThrottled() {
  1420. b.throttleTimerLock.Lock()
  1421. defer b.throttleTimerLock.Unlock()
  1422. if b.throttleTimer != nil {
  1423. DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID())
  1424. <-b.throttleTimer.C
  1425. b.throttleTimer = nil
  1426. }
  1427. }
  1428. func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
  1429. if b.brokerThrottleTime != nil {
  1430. throttleTimeInMs := int64(throttleTime / time.Millisecond)
  1431. b.brokerThrottleTime.Update(throttleTimeInMs)
  1432. }
  1433. }
  1434. func (b *Broker) registerMetrics() {
  1435. b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
  1436. b.brokerRequestRate = b.registerMeter("request-rate")
  1437. b.brokerFetchRate = b.registerMeter("consumer-fetch-rate")
  1438. b.brokerRequestSize = b.registerHistogram("request-size")
  1439. b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
  1440. b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
  1441. b.brokerResponseRate = b.registerMeter("response-rate")
  1442. b.brokerResponseSize = b.registerHistogram("response-size")
  1443. b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
  1444. b.brokerThrottleTime = b.registerHistogram("throttle-time-in-ms")
  1445. b.brokerProtocolRequestsRate = map[int16]metrics.Meter{}
  1446. }
  1447. func (b *Broker) registerMeter(name string) metrics.Meter {
  1448. nameForBroker := getMetricNameForBroker(name, b)
  1449. return metrics.GetOrRegisterMeter(nameForBroker, b.metricRegistry)
  1450. }
  1451. func (b *Broker) registerHistogram(name string) metrics.Histogram {
  1452. nameForBroker := getMetricNameForBroker(name, b)
  1453. return getOrRegisterHistogram(nameForBroker, b.metricRegistry)
  1454. }
  1455. func (b *Broker) registerCounter(name string) metrics.Counter {
  1456. nameForBroker := getMetricNameForBroker(name, b)
  1457. return metrics.GetOrRegisterCounter(nameForBroker, b.metricRegistry)
  1458. }
  1459. func validServerNameTLS(addr string, cfg *tls.Config) *tls.Config {
  1460. if cfg == nil {
  1461. cfg = &tls.Config{
  1462. MinVersion: tls.VersionTLS12,
  1463. }
  1464. }
  1465. if cfg.ServerName != "" {
  1466. return cfg
  1467. }
  1468. c := cfg.Clone()
  1469. sn, _, err := net.SplitHostPort(addr)
  1470. if err != nil {
  1471. Logger.Println(fmt.Errorf("failed to get ServerName from addr %w", err))
  1472. }
  1473. c.ServerName = sn
  1474. return c
  1475. }