utils.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package sarama
  2. import (
  3. "bufio"
  4. "fmt"
  5. "net"
  6. "regexp"
  7. )
  8. type none struct{}
  9. // make []int32 sortable so we can sort partition numbers
  10. type int32Slice []int32
  11. func (slice int32Slice) Len() int {
  12. return len(slice)
  13. }
  14. func (slice int32Slice) Less(i, j int) bool {
  15. return slice[i] < slice[j]
  16. }
  17. func (slice int32Slice) Swap(i, j int) {
  18. slice[i], slice[j] = slice[j], slice[i]
  19. }
  20. func dupInt32Slice(input []int32) []int32 {
  21. ret := make([]int32, 0, len(input))
  22. ret = append(ret, input...)
  23. return ret
  24. }
  25. func withRecover(fn func()) {
  26. defer func() {
  27. handler := PanicHandler
  28. if handler != nil {
  29. if err := recover(); err != nil {
  30. handler(err)
  31. }
  32. }
  33. }()
  34. fn()
  35. }
  36. func safeAsyncClose(b *Broker) {
  37. tmp := b // local var prevents clobbering in goroutine
  38. go withRecover(func() {
  39. if connected, _ := tmp.Connected(); connected {
  40. if err := tmp.Close(); err != nil {
  41. Logger.Println("Error closing broker", tmp.ID(), ":", err)
  42. }
  43. }
  44. })
  45. }
  46. // Encoder is a simple interface for any type that can be encoded as an array of bytes
  47. // in order to be sent as the key or value of a Kafka message. Length() is provided as an
  48. // optimization, and must return the same as len() on the result of Encode().
  49. type Encoder interface {
  50. Encode() ([]byte, error)
  51. Length() int
  52. }
  53. // make strings and byte slices encodable for convenience so they can be used as keys
  54. // and/or values in kafka messages
  55. // StringEncoder implements the Encoder interface for Go strings so that they can be used
  56. // as the Key or Value in a ProducerMessage.
  57. type StringEncoder string
  58. func (s StringEncoder) Encode() ([]byte, error) {
  59. return []byte(s), nil
  60. }
  61. func (s StringEncoder) Length() int {
  62. return len(s)
  63. }
  64. // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
  65. // as the Key or Value in a ProducerMessage.
  66. type ByteEncoder []byte
  67. func (b ByteEncoder) Encode() ([]byte, error) {
  68. return b, nil
  69. }
  70. func (b ByteEncoder) Length() int {
  71. return len(b)
  72. }
  73. // bufConn wraps a net.Conn with a buffer for reads to reduce the number of
  74. // reads that trigger syscalls.
  75. type bufConn struct {
  76. net.Conn
  77. buf *bufio.Reader
  78. }
  79. func newBufConn(conn net.Conn) *bufConn {
  80. return &bufConn{
  81. Conn: conn,
  82. buf: bufio.NewReader(conn),
  83. }
  84. }
  85. func (bc *bufConn) Read(b []byte) (n int, err error) {
  86. return bc.buf.Read(b)
  87. }
  88. // KafkaVersion instances represent versions of the upstream Kafka broker.
  89. type KafkaVersion struct {
  90. // it's a struct rather than just typing the array directly to make it opaque and stop people
  91. // generating their own arbitrary versions
  92. version [4]uint
  93. }
  94. func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
  95. return KafkaVersion{
  96. version: [4]uint{major, minor, veryMinor, patch},
  97. }
  98. }
  99. // IsAtLeast return true if and only if the version it is called on is
  100. // greater than or equal to the version passed in:
  101. //
  102. // V1.IsAtLeast(V2) // false
  103. // V2.IsAtLeast(V1) // true
  104. func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
  105. for i := range v.version {
  106. if v.version[i] > other.version[i] {
  107. return true
  108. } else if v.version[i] < other.version[i] {
  109. return false
  110. }
  111. }
  112. return true
  113. }
  114. // Effective constants defining the supported kafka versions.
  115. var (
  116. V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
  117. V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
  118. V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
  119. V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
  120. V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
  121. V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
  122. V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
  123. V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
  124. V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
  125. V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
  126. V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
  127. V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
  128. V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
  129. V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
  130. V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
  131. V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
  132. V1_0_1_0 = newKafkaVersion(1, 0, 1, 0)
  133. V1_0_2_0 = newKafkaVersion(1, 0, 2, 0)
  134. V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
  135. V1_1_1_0 = newKafkaVersion(1, 1, 1, 0)
  136. V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)
  137. V2_0_1_0 = newKafkaVersion(2, 0, 1, 0)
  138. V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
  139. V2_1_1_0 = newKafkaVersion(2, 1, 1, 0)
  140. V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
  141. V2_2_1_0 = newKafkaVersion(2, 2, 1, 0)
  142. V2_2_2_0 = newKafkaVersion(2, 2, 2, 0)
  143. V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
  144. V2_3_1_0 = newKafkaVersion(2, 3, 1, 0)
  145. V2_4_0_0 = newKafkaVersion(2, 4, 0, 0)
  146. V2_4_1_0 = newKafkaVersion(2, 4, 1, 0)
  147. V2_5_0_0 = newKafkaVersion(2, 5, 0, 0)
  148. V2_5_1_0 = newKafkaVersion(2, 5, 1, 0)
  149. V2_6_0_0 = newKafkaVersion(2, 6, 0, 0)
  150. V2_6_1_0 = newKafkaVersion(2, 6, 1, 0)
  151. V2_6_2_0 = newKafkaVersion(2, 6, 2, 0)
  152. V2_6_3_0 = newKafkaVersion(2, 6, 3, 0)
  153. V2_7_0_0 = newKafkaVersion(2, 7, 0, 0)
  154. V2_7_1_0 = newKafkaVersion(2, 7, 1, 0)
  155. V2_7_2_0 = newKafkaVersion(2, 7, 2, 0)
  156. V2_8_0_0 = newKafkaVersion(2, 8, 0, 0)
  157. V2_8_1_0 = newKafkaVersion(2, 8, 1, 0)
  158. V2_8_2_0 = newKafkaVersion(2, 8, 2, 0)
  159. V3_0_0_0 = newKafkaVersion(3, 0, 0, 0)
  160. V3_0_1_0 = newKafkaVersion(3, 0, 1, 0)
  161. V3_0_2_0 = newKafkaVersion(3, 0, 2, 0)
  162. V3_1_0_0 = newKafkaVersion(3, 1, 0, 0)
  163. V3_1_1_0 = newKafkaVersion(3, 1, 1, 0)
  164. V3_1_2_0 = newKafkaVersion(3, 1, 2, 0)
  165. V3_2_0_0 = newKafkaVersion(3, 2, 0, 0)
  166. V3_2_1_0 = newKafkaVersion(3, 2, 1, 0)
  167. V3_2_2_0 = newKafkaVersion(3, 2, 2, 0)
  168. V3_2_3_0 = newKafkaVersion(3, 2, 3, 0)
  169. V3_3_0_0 = newKafkaVersion(3, 3, 0, 0)
  170. V3_3_1_0 = newKafkaVersion(3, 3, 1, 0)
  171. V3_3_2_0 = newKafkaVersion(3, 3, 2, 0)
  172. V3_4_0_0 = newKafkaVersion(3, 4, 0, 0)
  173. V3_4_1_0 = newKafkaVersion(3, 4, 1, 0)
  174. V3_5_0_0 = newKafkaVersion(3, 5, 0, 0)
  175. V3_5_1_0 = newKafkaVersion(3, 5, 1, 0)
  176. V3_6_0_0 = newKafkaVersion(3, 6, 0, 0)
  177. SupportedVersions = []KafkaVersion{
  178. V0_8_2_0,
  179. V0_8_2_1,
  180. V0_8_2_2,
  181. V0_9_0_0,
  182. V0_9_0_1,
  183. V0_10_0_0,
  184. V0_10_0_1,
  185. V0_10_1_0,
  186. V0_10_1_1,
  187. V0_10_2_0,
  188. V0_10_2_1,
  189. V0_10_2_2,
  190. V0_11_0_0,
  191. V0_11_0_1,
  192. V0_11_0_2,
  193. V1_0_0_0,
  194. V1_0_1_0,
  195. V1_0_2_0,
  196. V1_1_0_0,
  197. V1_1_1_0,
  198. V2_0_0_0,
  199. V2_0_1_0,
  200. V2_1_0_0,
  201. V2_1_1_0,
  202. V2_2_0_0,
  203. V2_2_1_0,
  204. V2_2_2_0,
  205. V2_3_0_0,
  206. V2_3_1_0,
  207. V2_4_0_0,
  208. V2_4_1_0,
  209. V2_5_0_0,
  210. V2_5_1_0,
  211. V2_6_0_0,
  212. V2_6_1_0,
  213. V2_6_2_0,
  214. V2_7_0_0,
  215. V2_7_1_0,
  216. V2_8_0_0,
  217. V2_8_1_0,
  218. V2_8_2_0,
  219. V3_0_0_0,
  220. V3_0_1_0,
  221. V3_0_2_0,
  222. V3_1_0_0,
  223. V3_1_1_0,
  224. V3_1_2_0,
  225. V3_2_0_0,
  226. V3_2_1_0,
  227. V3_2_2_0,
  228. V3_2_3_0,
  229. V3_3_0_0,
  230. V3_3_1_0,
  231. V3_3_2_0,
  232. V3_4_0_0,
  233. V3_4_1_0,
  234. V3_5_0_0,
  235. V3_5_1_0,
  236. V3_6_0_0,
  237. }
  238. MinVersion = V0_8_2_0
  239. MaxVersion = V3_6_0_0
  240. DefaultVersion = V2_1_0_0
  241. // reduced set of protocol versions to matrix test
  242. fvtRangeVersions = []KafkaVersion{
  243. V0_8_2_2,
  244. V0_10_2_2,
  245. V1_0_2_0,
  246. V1_1_1_0,
  247. V2_0_1_0,
  248. V2_2_2_0,
  249. V2_4_1_0,
  250. V2_6_2_0,
  251. V2_8_2_0,
  252. V3_1_2_0,
  253. V3_3_2_0,
  254. V3_6_0_0,
  255. }
  256. )
  257. var (
  258. // This regex validates that a string complies with the pre kafka 1.0.0 format for version strings, for example 0.11.0.3
  259. validPreKafka1Version = regexp.MustCompile(`^0\.\d+\.\d+\.\d+$`)
  260. // This regex validates that a string complies with the post Kafka 1.0.0 format, for example 1.0.0
  261. validPostKafka1Version = regexp.MustCompile(`^\d+\.\d+\.\d+$`)
  262. )
  263. // ParseKafkaVersion parses and returns kafka version or error from a string
  264. func ParseKafkaVersion(s string) (KafkaVersion, error) {
  265. if len(s) < 5 {
  266. return DefaultVersion, fmt.Errorf("invalid version `%s`", s)
  267. }
  268. var major, minor, veryMinor, patch uint
  269. var err error
  270. if s[0] == '0' {
  271. err = scanKafkaVersion(s, validPreKafka1Version, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
  272. } else {
  273. err = scanKafkaVersion(s, validPostKafka1Version, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
  274. }
  275. if err != nil {
  276. return DefaultVersion, err
  277. }
  278. return newKafkaVersion(major, minor, veryMinor, patch), nil
  279. }
  280. func scanKafkaVersion(s string, pattern *regexp.Regexp, format string, v [3]*uint) error {
  281. if !pattern.MatchString(s) {
  282. return fmt.Errorf("invalid version `%s`", s)
  283. }
  284. _, err := fmt.Sscanf(s, format, v[0], v[1], v[2])
  285. return err
  286. }
  287. func (v KafkaVersion) String() string {
  288. if v.version[0] == 0 {
  289. return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3])
  290. }
  291. return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
  292. }