request.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. )
  7. type protocolBody interface {
  8. encoder
  9. versionedDecoder
  10. key() int16
  11. version() int16
  12. headerVersion() int16
  13. isValidVersion() bool
  14. requiredVersion() KafkaVersion
  15. }
  16. type request struct {
  17. correlationID int32
  18. clientID string
  19. body protocolBody
  20. }
  21. func (r *request) encode(pe packetEncoder) error {
  22. pe.push(&lengthField{})
  23. pe.putInt16(r.body.key())
  24. pe.putInt16(r.body.version())
  25. pe.putInt32(r.correlationID)
  26. if r.body.headerVersion() >= 1 {
  27. err := pe.putString(r.clientID)
  28. if err != nil {
  29. return err
  30. }
  31. }
  32. if r.body.headerVersion() >= 2 {
  33. // we don't use tag headers at the moment so we just put an array length of 0
  34. pe.putUVarint(0)
  35. }
  36. err := r.body.encode(pe)
  37. if err != nil {
  38. return err
  39. }
  40. return pe.pop()
  41. }
  42. func (r *request) decode(pd packetDecoder) (err error) {
  43. key, err := pd.getInt16()
  44. if err != nil {
  45. return err
  46. }
  47. version, err := pd.getInt16()
  48. if err != nil {
  49. return err
  50. }
  51. r.correlationID, err = pd.getInt32()
  52. if err != nil {
  53. return err
  54. }
  55. r.clientID, err = pd.getString()
  56. if err != nil {
  57. return err
  58. }
  59. r.body = allocateBody(key, version)
  60. if r.body == nil {
  61. return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
  62. }
  63. if r.body.headerVersion() >= 2 {
  64. // tagged field
  65. _, err = pd.getUVarint()
  66. if err != nil {
  67. return err
  68. }
  69. }
  70. return r.body.decode(pd, version)
  71. }
  72. func decodeRequest(r io.Reader) (*request, int, error) {
  73. var (
  74. bytesRead int
  75. lengthBytes = make([]byte, 4)
  76. )
  77. if _, err := io.ReadFull(r, lengthBytes); err != nil {
  78. return nil, bytesRead, err
  79. }
  80. bytesRead += len(lengthBytes)
  81. length := int32(binary.BigEndian.Uint32(lengthBytes))
  82. if length <= 4 || length > MaxRequestSize {
  83. return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
  84. }
  85. encodedReq := make([]byte, length)
  86. if _, err := io.ReadFull(r, encodedReq); err != nil {
  87. return nil, bytesRead, err
  88. }
  89. bytesRead += len(encodedReq)
  90. req := &request{}
  91. if err := decode(encodedReq, req, nil); err != nil {
  92. return nil, bytesRead, err
  93. }
  94. return req, bytesRead, nil
  95. }
  96. func allocateBody(key, version int16) protocolBody {
  97. switch key {
  98. case 0:
  99. return &ProduceRequest{Version: version}
  100. case 1:
  101. return &FetchRequest{Version: version}
  102. case 2:
  103. return &OffsetRequest{Version: version}
  104. case 3:
  105. return &MetadataRequest{Version: version}
  106. // 4: LeaderAndIsrRequest
  107. // 5: StopReplicaRequest
  108. // 6: UpdateMetadataRequest
  109. // 7: ControlledShutdownRequest
  110. case 8:
  111. return &OffsetCommitRequest{Version: version}
  112. case 9:
  113. return &OffsetFetchRequest{Version: version}
  114. case 10:
  115. return &FindCoordinatorRequest{Version: version}
  116. case 11:
  117. return &JoinGroupRequest{Version: version}
  118. case 12:
  119. return &HeartbeatRequest{Version: version}
  120. case 13:
  121. return &LeaveGroupRequest{Version: version}
  122. case 14:
  123. return &SyncGroupRequest{Version: version}
  124. case 15:
  125. return &DescribeGroupsRequest{Version: version}
  126. case 16:
  127. return &ListGroupsRequest{Version: version}
  128. case 17:
  129. return &SaslHandshakeRequest{Version: version}
  130. case 18:
  131. return &ApiVersionsRequest{Version: version}
  132. case 19:
  133. return &CreateTopicsRequest{Version: version}
  134. case 20:
  135. return &DeleteTopicsRequest{Version: version}
  136. case 21:
  137. return &DeleteRecordsRequest{Version: version}
  138. case 22:
  139. return &InitProducerIDRequest{Version: version}
  140. // 23: OffsetForLeaderEpochRequest
  141. case 24:
  142. return &AddPartitionsToTxnRequest{Version: version}
  143. case 25:
  144. return &AddOffsetsToTxnRequest{Version: version}
  145. case 26:
  146. return &EndTxnRequest{Version: version}
  147. // 27: WriteTxnMarkersRequest
  148. case 28:
  149. return &TxnOffsetCommitRequest{Version: version}
  150. case 29:
  151. return &DescribeAclsRequest{Version: int(version)}
  152. case 30:
  153. return &CreateAclsRequest{Version: version}
  154. case 31:
  155. return &DeleteAclsRequest{Version: int(version)}
  156. case 32:
  157. return &DescribeConfigsRequest{Version: version}
  158. case 33:
  159. return &AlterConfigsRequest{Version: version}
  160. // 34: AlterReplicaLogDirsRequest
  161. case 35:
  162. return &DescribeLogDirsRequest{Version: version}
  163. case 36:
  164. return &SaslAuthenticateRequest{Version: version}
  165. case 37:
  166. return &CreatePartitionsRequest{Version: version}
  167. // 38: CreateDelegationTokenRequest
  168. // 39: RenewDelegationTokenRequest
  169. // 40: ExpireDelegationTokenRequest
  170. // 41: DescribeDelegationTokenRequest
  171. case 42:
  172. return &DeleteGroupsRequest{Version: version}
  173. // 43: ElectLeadersRequest
  174. case 44:
  175. return &IncrementalAlterConfigsRequest{Version: version}
  176. case 45:
  177. return &AlterPartitionReassignmentsRequest{Version: version}
  178. case 46:
  179. return &ListPartitionReassignmentsRequest{Version: version}
  180. case 47:
  181. return &DeleteOffsetsRequest{Version: version}
  182. case 48:
  183. return &DescribeClientQuotasRequest{Version: version}
  184. case 49:
  185. return &AlterClientQuotasRequest{Version: version}
  186. case 50:
  187. return &DescribeUserScramCredentialsRequest{Version: version}
  188. case 51:
  189. return &AlterUserScramCredentialsRequest{Version: version}
  190. // 52: VoteRequest
  191. // 53: BeginQuorumEpochRequest
  192. // 54: EndQuorumEpochRequest
  193. // 55: DescribeQuorumRequest
  194. // 56: AlterPartitionRequest
  195. // 57: UpdateFeaturesRequest
  196. // 58: EnvelopeRequest
  197. // 59: FetchSnapshotRequest
  198. // 60: DescribeClusterRequest
  199. // 61: DescribeProducersRequest
  200. // 62: BrokerRegistrationRequest
  201. // 63: BrokerHeartbeatRequest
  202. // 64: UnregisterBrokerRequest
  203. // 65: DescribeTransactionsRequest
  204. // 66: ListTransactionsRequest
  205. // 67: AllocateProducerIdsRequest
  206. // 68: ConsumerGroupHeartbeatRequest
  207. }
  208. return nil
  209. }