gssapi_kerberos.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "strings"
  9. "time"
  10. "github.com/jcmturner/gofork/encoding/asn1"
  11. "github.com/jcmturner/gokrb5/v8/asn1tools"
  12. "github.com/jcmturner/gokrb5/v8/gssapi"
  13. "github.com/jcmturner/gokrb5/v8/iana/chksumtype"
  14. "github.com/jcmturner/gokrb5/v8/iana/keyusage"
  15. "github.com/jcmturner/gokrb5/v8/messages"
  16. "github.com/jcmturner/gokrb5/v8/types"
  17. )
  18. const (
  19. TOK_ID_KRB_AP_REQ = 256
  20. GSS_API_GENERIC_TAG = 0x60
  21. KRB5_USER_AUTH = 1
  22. KRB5_KEYTAB_AUTH = 2
  23. KRB5_CCACHE_AUTH = 3
  24. GSS_API_INITIAL = 1
  25. GSS_API_VERIFY = 2
  26. GSS_API_FINISH = 3
  27. )
  28. type GSSAPIConfig struct {
  29. AuthType int
  30. KeyTabPath string
  31. CCachePath string
  32. KerberosConfigPath string
  33. ServiceName string
  34. Username string
  35. Password string
  36. Realm string
  37. DisablePAFXFAST bool
  38. BuildSpn BuildSpnFunc
  39. }
  40. type GSSAPIKerberosAuth struct {
  41. Config *GSSAPIConfig
  42. ticket messages.Ticket
  43. encKey types.EncryptionKey
  44. NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
  45. step int
  46. }
  47. type KerberosClient interface {
  48. Login() error
  49. GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
  50. Domain() string
  51. CName() types.PrincipalName
  52. Destroy()
  53. }
  54. type BuildSpnFunc func(serviceName, host string) string
  55. // writePackage appends length in big endian before the payload, and sends it to kafka
  56. func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
  57. length := uint64(len(payload))
  58. size := length + 4 // 4 byte length header + payload
  59. if size > math.MaxInt32 {
  60. return 0, errors.New("payload too large, will overflow int32")
  61. }
  62. finalPackage := make([]byte, size)
  63. copy(finalPackage[4:], payload)
  64. binary.BigEndian.PutUint32(finalPackage, uint32(length))
  65. bytes, err := broker.conn.Write(finalPackage)
  66. if err != nil {
  67. return bytes, err
  68. }
  69. return bytes, nil
  70. }
  71. // readPackage reads payload length (4 bytes) and then reads the payload into []byte
  72. func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
  73. bytesRead := 0
  74. lengthInBytes := make([]byte, 4)
  75. bytes, err := io.ReadFull(broker.conn, lengthInBytes)
  76. if err != nil {
  77. return nil, bytesRead, err
  78. }
  79. bytesRead += bytes
  80. payloadLength := binary.BigEndian.Uint32(lengthInBytes)
  81. payloadBytes := make([]byte, payloadLength) // buffer for read..
  82. bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
  83. if err != nil {
  84. return payloadBytes, bytesRead, err
  85. }
  86. bytesRead += bytes
  87. return payloadBytes, bytesRead, nil
  88. }
  89. func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
  90. a := make([]byte, 24)
  91. flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
  92. binary.LittleEndian.PutUint32(a[:4], 16)
  93. for _, i := range flags {
  94. f := binary.LittleEndian.Uint32(a[20:24])
  95. f |= uint32(i)
  96. binary.LittleEndian.PutUint32(a[20:24], f)
  97. }
  98. return a
  99. }
  100. /*
  101. *
  102. * Construct Kerberos AP_REQ package, conforming to RFC-4120
  103. * https://tools.ietf.org/html/rfc4120#page-84
  104. *
  105. */
  106. func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
  107. domain string, cname types.PrincipalName,
  108. ticket messages.Ticket,
  109. sessionKey types.EncryptionKey) ([]byte, error) {
  110. auth, err := types.NewAuthenticator(domain, cname)
  111. if err != nil {
  112. return nil, err
  113. }
  114. auth.Cksum = types.Checksum{
  115. CksumType: chksumtype.GSSAPI,
  116. Checksum: krbAuth.newAuthenticatorChecksum(),
  117. }
  118. APReq, err := messages.NewAPReq(
  119. ticket,
  120. sessionKey,
  121. auth,
  122. )
  123. if err != nil {
  124. return nil, err
  125. }
  126. aprBytes := make([]byte, 2)
  127. binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
  128. tb, err := APReq.Marshal()
  129. if err != nil {
  130. return nil, err
  131. }
  132. aprBytes = append(aprBytes, tb...)
  133. return aprBytes, nil
  134. }
  135. /*
  136. *
  137. * Append the GSS-API header to the payload, conforming to RFC-2743
  138. * Section 3.1, Mechanism-Independent Token Format
  139. *
  140. * https://tools.ietf.org/html/rfc2743#page-81
  141. *
  142. * GSSAPIHeader + <specific mechanism payload>
  143. *
  144. */
  145. func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
  146. oidBytes, err := asn1.Marshal(gssapi.OIDKRB5.OID())
  147. if err != nil {
  148. return nil, err
  149. }
  150. tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
  151. GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
  152. GSSHeader = append(GSSHeader, oidBytes...)
  153. GSSPackage := append(GSSHeader, payload...)
  154. return GSSPackage, nil
  155. }
  156. func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
  157. switch krbAuth.step {
  158. case GSS_API_INITIAL:
  159. aprBytes, err := krbAuth.createKrb5Token(
  160. kerberosClient.Domain(),
  161. kerberosClient.CName(),
  162. krbAuth.ticket,
  163. krbAuth.encKey)
  164. if err != nil {
  165. return nil, err
  166. }
  167. krbAuth.step = GSS_API_VERIFY
  168. return krbAuth.appendGSSAPIHeader(aprBytes)
  169. case GSS_API_VERIFY:
  170. wrapTokenReq := gssapi.WrapToken{}
  171. if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
  172. return nil, err
  173. }
  174. // Validate response.
  175. isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
  176. if !isValid {
  177. return nil, err
  178. }
  179. wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
  180. if err != nil {
  181. return nil, err
  182. }
  183. krbAuth.step = GSS_API_FINISH
  184. return wrapTokenResponse.Marshal()
  185. }
  186. return nil, nil
  187. }
  188. /* This does the handshake for authorization */
  189. func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
  190. kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
  191. if err != nil {
  192. Logger.Printf("Kerberos client error: %s", err)
  193. return err
  194. }
  195. err = kerberosClient.Login()
  196. if err != nil {
  197. Logger.Printf("Kerberos client error: %s", err)
  198. return err
  199. }
  200. // Construct SPN using serviceName and host
  201. // default SPN format: <SERVICE>/<FQDN>
  202. host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
  203. var spn string
  204. if krbAuth.Config.BuildSpn != nil {
  205. spn = krbAuth.Config.BuildSpn(broker.conf.Net.SASL.GSSAPI.ServiceName, host)
  206. } else {
  207. spn = fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
  208. }
  209. ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
  210. if err != nil {
  211. Logger.Printf("Error getting Kerberos service ticket : %s", err)
  212. return err
  213. }
  214. krbAuth.ticket = ticket
  215. krbAuth.encKey = encKey
  216. krbAuth.step = GSS_API_INITIAL
  217. var receivedBytes []byte = nil
  218. defer kerberosClient.Destroy()
  219. for {
  220. packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
  221. if err != nil {
  222. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  223. return err
  224. }
  225. requestTime := time.Now()
  226. bytesWritten, err := krbAuth.writePackage(broker, packBytes)
  227. if err != nil {
  228. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  229. return err
  230. }
  231. broker.updateOutgoingCommunicationMetrics(bytesWritten)
  232. if krbAuth.step == GSS_API_VERIFY {
  233. bytesRead := 0
  234. receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
  235. requestLatency := time.Since(requestTime)
  236. broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  237. if err != nil {
  238. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  239. return err
  240. }
  241. } else if krbAuth.step == GSS_API_FINISH {
  242. return nil
  243. }
  244. }
  245. }