kafka.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package abango
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "github.com/IBM/sarama"
  7. e "github.com/dabory/abango-rest/etc"
  8. )
  9. var (
  10. KAFKA_CONN string
  11. COMSUMER_TOPICS []string
  12. KAFKA_TIMEOUT string
  13. )
  14. func KafkaInit() {
  15. KAFKA_CONN = XConfig["KafkaConnString"]
  16. COMSUMER_TOPICS = strings.Split(strings.Replace(XConfig["ConsumerTopics"], " ", "", -1), ",")
  17. KAFKA_TIMEOUT = XConfig["KafkaTimeout"]
  18. e.OkLog("== KAFKA_CONN is : " + KAFKA_CONN + " ==")
  19. e.OkLog("== COMSUMER_TOPICS is : " + XConfig["ConsumerTopics"] + " ==")
  20. e.OkLog("== KAFKA_TIMEOUT is : " + KAFKA_TIMEOUT + " ==")
  21. }
  22. func KafkaProducer(key string, headers []*sarama.RecordHeader, message []byte, conCurr string, topic string) (int32, int64, error) {
  23. kfcf := sarama.NewConfig()
  24. kfcf.Producer.Retry.Max = 5
  25. kfcf.Producer.RequiredAcks = sarama.WaitForAll
  26. kfcf.Producer.Return.Successes = true
  27. conHeaders := e.ConvertKafkaHeaders(headers)
  28. if conCurr == "async" {
  29. if prd, err := sarama.NewAsyncProducer([]string{KAFKA_CONN}, kfcf); err == nil {
  30. prd.Input() <- &sarama.ProducerMessage{
  31. Topic: topic,
  32. Key: sarama.StringEncoder(key),
  33. Headers: conHeaders,
  34. Value: sarama.ByteEncoder(message),
  35. }
  36. return 0, 0, nil
  37. } else {
  38. return 0, 0, e.MyErr("QEJHDRTTRRW-Kafka-NewAsyncProducer-End", err, true)
  39. }
  40. } else if conCurr == "sync" {
  41. fmt.Println("conCurr:", conCurr)
  42. if prd, err := sarama.NewSyncProducer([]string{KAFKA_CONN}, kfcf); err == nil {
  43. msg := &sarama.ProducerMessage{
  44. Topic: topic,
  45. Key: sarama.StringEncoder(key),
  46. Headers: conHeaders,
  47. Value: sarama.ByteEncoder(message),
  48. }
  49. if part, offset, err := prd.SendMessage(msg); err == nil {
  50. return part, offset, nil
  51. } else {
  52. return 0, 0, e.MyErr("QEJIOPRTRRTRRW-Kafka-Sync-SendMessage", err, true)
  53. }
  54. } else {
  55. return 0, 0, e.MyErr("QEJHGTRSDRTTRRW-Kafka-NewSyncProducer-End", err, true)
  56. }
  57. } else {
  58. return 0, 0, e.MyErr("QEJHGTRSW-Kafka-ApiMethod Not available-End", nil, true)
  59. }
  60. }
  61. func KafkaConsumer(ConsumeHandler func(msg *sarama.ConsumerMessage), topic string) {
  62. // Create a new configuration for the consumer
  63. config := sarama.NewConfig()
  64. config.Consumer.Return.Errors = true
  65. // Specify the list of brokers
  66. brokers := []string{KAFKA_CONN}
  67. // Create a new consumer
  68. consumer, err := sarama.NewConsumer(brokers, config)
  69. if err != nil {
  70. e.OkLog("Failed to create consumer of topic : " + topic + " == : " + err.Error())
  71. }
  72. defer func() {
  73. if err := consumer.Close(); err != nil {
  74. e.OkLog("Error closing consumer: of topic : " + topic + " == : " + err.Error())
  75. }
  76. }()
  77. // Consume messages from each partition asynchronously
  78. partitions, err := consumer.Partitions(topic)
  79. if err != nil {
  80. e.OkLog("Failed to get partitions: " + topic + " == : " + err.Error())
  81. }
  82. var wg sync.WaitGroup
  83. wg.Add(len(partitions))
  84. for _, partition := range partitions {
  85. go func(partition int32) {
  86. defer wg.Done()
  87. // Create a new partition consumer
  88. partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
  89. if err != nil {
  90. e.OkLog("Failed to create partition for consumer: " + topic + " partition: " + e.NumToStr(partition) + " " + err.Error())
  91. return
  92. }
  93. defer func() {
  94. if err := partitionConsumer.Close(); err != nil {
  95. e.OkLog("Error closing partition for consumer: " + topic + " partition: " + e.NumToStr(partition) + " " + err.Error())
  96. }
  97. }()
  98. // Process messages
  99. for msg := range partitionConsumer.Messages() {
  100. ConsumeHandler(msg)
  101. e.OkLog("Consuming topic: " + topic + " partition: " + e.NumToStr(partition))
  102. // log.Printf("Partition-kk %d | Offset %d | Key: %s | Value: %s", message.Partition, message.Offset, string(message.Key), string(message.Value))
  103. }
  104. }(partition)
  105. }
  106. // Wait for the consumer to finish
  107. wg.Wait()
  108. // }
  109. }