sarama.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. /*
  2. Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level
  3. API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level
  4. API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.
  5. To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel
  6. and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases.
  7. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be
  8. useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees
  9. depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the
  10. SyncProducer can still sometimes be lost.
  11. To consume messages, use Consumer or Consumer-Group API.
  12. For lower-level needs, the Broker and Request/Response objects permit precise control over each connection
  13. and message sent on the wire; the Client provides higher-level metadata management that is shared between
  14. the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up
  15. exactly with the protocol fields documented by Kafka at
  16. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  17. Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.
  18. Broker related metrics:
  19. +---------------------------------------------------------+------------+---------------------------------------------------------------+
  20. | Name | Type | Description |
  21. +---------------------------------------------------------+------------+---------------------------------------------------------------+
  22. | incoming-byte-rate | meter | Bytes/second read off all brokers |
  23. | incoming-byte-rate-for-broker-<broker-id> | meter | Bytes/second read off a given broker |
  24. | outgoing-byte-rate | meter | Bytes/second written off all brokers |
  25. | outgoing-byte-rate-for-broker-<broker-id> | meter | Bytes/second written off a given broker |
  26. | request-rate | meter | Requests/second sent to all brokers |
  27. | request-rate-for-broker-<broker-id> | meter | Requests/second sent to a given broker |
  28. | request-size | histogram | Distribution of the request size in bytes for all brokers |
  29. | request-size-for-broker-<broker-id> | histogram | Distribution of the request size in bytes for a given broker |
  30. | request-latency-in-ms | histogram | Distribution of the request latency in ms for all brokers |
  31. | request-latency-in-ms-for-broker-<broker-id> | histogram | Distribution of the request latency in ms for a given broker |
  32. | response-rate | meter | Responses/second received from all brokers |
  33. | response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker |
  34. | response-size | histogram | Distribution of the response size in bytes for all brokers |
  35. | response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker |
  36. | requests-in-flight | counter | The current number of in-flight requests awaiting a response |
  37. | | | for all brokers |
  38. | requests-in-flight-for-broker-<broker-id> | counter | The current number of in-flight requests awaiting a response |
  39. | | | for a given broker |
  40. | protocol-requests-rate-<api-key> | meter | Number of api requests sent to the brokers for all brokers |
  41. | | | https://kafka.apache.org/protocol.html#protocol_api_keys | |
  42. | protocol-requests-rate-<api-key>-for-broker-<broker-id> | meter | Number of packets sent to the brokers by api-key for a given |
  43. | | | broker |
  44. +---------------------------------------------------------+------------+---------------------------------------------------------------+
  45. Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
  46. Producer related metrics:
  47. +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
  48. | Name | Type | Description |
  49. +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
  50. | batch-size | histogram | Distribution of the number of bytes sent per partition per request for all topics |
  51. | batch-size-for-topic-<topic> | histogram | Distribution of the number of bytes sent per partition per request for a given topic |
  52. | record-send-rate | meter | Records/second sent to all topics |
  53. | record-send-rate-for-topic-<topic> | meter | Records/second sent to a given topic |
  54. | records-per-request | histogram | Distribution of the number of records sent per request for all topics |
  55. | records-per-request-for-topic-<topic> | histogram | Distribution of the number of records sent per request for a given topic |
  56. | compression-ratio | histogram | Distribution of the compression ratio times 100 of record batches for all topics |
  57. | compression-ratio-for-topic-<topic> | histogram | Distribution of the compression ratio times 100 of record batches for a given topic |
  58. +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
  59. Consumer related metrics:
  60. +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
  61. | Name | Type | Description |
  62. +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
  63. | consumer-batch-size | histogram | Distribution of the number of messages in a batch |
  64. | consumer-fetch-rate | meter | Fetch requests/second sent to all brokers |
  65. | consumer-fetch-rate-for-broker-<broker> | meter | Fetch requests/second sent to a given broker |
  66. | consumer-fetch-rate-for-topic-<topic> | meter | Fetch requests/second sent for a given topic |
  67. | consumer-fetch-response-size | histogram | Distribution of the fetch response size in bytes |
  68. | consumer-group-join-total-<GroupID> | counter | Total count of consumer group join attempts |
  69. | consumer-group-join-failed-<GroupID> | counter | Total count of consumer group join failures |
  70. | consumer-group-sync-total-<GroupID> | counter | Total count of consumer group sync attempts |
  71. | consumer-group-sync-failed-<GroupID> | counter | Total count of consumer group sync failures |
  72. +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
  73. */
  74. package sarama
  75. import (
  76. "io"
  77. "log"
  78. )
  79. var (
  80. // Logger is the instance of a StdLogger interface that Sarama writes connection
  81. // management events to. By default it is set to discard all log messages via io.Discard,
  82. // but you can set it to redirect wherever you want.
  83. Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)
  84. // PanicHandler is called for recovering from panics spawned internally to the library (and thus
  85. // not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
  86. PanicHandler func(interface{})
  87. // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
  88. // to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
  89. // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
  90. // to process.
  91. MaxRequestSize int32 = 100 * 1024 * 1024
  92. // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
  93. // a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
  94. // protect the client from running out of memory. Please note that brokers do not have any natural limit on
  95. // the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
  96. // (see https://issues.apache.org/jira/browse/KAFKA-2063).
  97. MaxResponseSize int32 = 100 * 1024 * 1024
  98. )
  99. // StdLogger is used to log error messages.
  100. type StdLogger interface {
  101. Print(v ...interface{})
  102. Printf(format string, v ...interface{})
  103. Println(v ...interface{})
  104. }
  105. type debugLogger struct{}
  106. func (d *debugLogger) Print(v ...interface{}) {
  107. Logger.Print(v...)
  108. }
  109. func (d *debugLogger) Printf(format string, v ...interface{}) {
  110. Logger.Printf(format, v...)
  111. }
  112. func (d *debugLogger) Println(v ...interface{}) {
  113. Logger.Println(v...)
  114. }
  115. // DebugLogger is the instance of a StdLogger that Sarama writes more verbose
  116. // debug information to. By default it is set to redirect all debug to the
  117. // default Logger above, but you can optionally set it to another StdLogger
  118. // instance to (e.g.,) discard debug information
  119. var DebugLogger StdLogger = &debugLogger{}