elasticsearch.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. // Licensed to Elasticsearch B.V. under one or more contributor
  2. // license agreements. See the NOTICE file distributed with
  3. // this work for additional information regarding copyright
  4. // ownership. Elasticsearch B.V. licenses this file to you under
  5. // the Apache License, Version 2.0 (the "License"); you may
  6. // not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. package elasticsearch
  18. import (
  19. "context"
  20. "encoding/base64"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "io/ioutil"
  26. "net/http"
  27. "net/url"
  28. "os"
  29. "regexp"
  30. "strconv"
  31. "strings"
  32. "sync"
  33. "time"
  34. "github.com/elastic/go-elasticsearch/v7/esapi"
  35. "github.com/elastic/go-elasticsearch/v7/estransport"
  36. "github.com/elastic/go-elasticsearch/v7/internal/version"
  37. )
  38. var (
  39. reVersion *regexp.Regexp
  40. )
  41. func init() {
  42. versionPattern := `^([0-9]+)\.([0-9]+)\.([0-9]+)`
  43. reVersion = regexp.MustCompile(versionPattern)
  44. }
  45. const (
  46. defaultURL = "http://localhost:9200"
  47. tagline = "You Know, for Search"
  48. unknownProduct = "the client noticed that the server is not Elasticsearch and we do not support this unknown product"
  49. unsupportedProduct = "the client noticed that the server is not a supported distribution of Elasticsearch"
  50. )
  51. // Version returns the package version as a string.
  52. //
  53. const Version = version.Client
  54. // Config represents the client configuration.
  55. //
  56. type Config struct {
  57. Addresses []string // A list of Elasticsearch nodes to use.
  58. Username string // Username for HTTP Basic Authentication.
  59. Password string // Password for HTTP Basic Authentication.
  60. CloudID string // Endpoint for the Elastic Service (https://elastic.co/cloud).
  61. APIKey string // Base64-encoded token for authorization; if set, overrides username/password and service token.
  62. ServiceToken string // Service token for authorization; if set, overrides username/password.
  63. CertificateFingerprint string // SHA256 hex fingerprint given by Elasticsearch on first launch.
  64. Header http.Header // Global HTTP request header.
  65. // PEM-encoded certificate authorities.
  66. // When set, an empty certificate pool will be created, and the certificates will be appended to it.
  67. // The option is only valid when the transport is not specified, or when it's http.Transport.
  68. CACert []byte
  69. RetryOnStatus []int // List of status codes for retry. Default: 502, 503, 504.
  70. DisableRetry bool // Default: false.
  71. EnableRetryOnTimeout bool // Default: false.
  72. MaxRetries int // Default: 3.
  73. CompressRequestBody bool // Default: false.
  74. DiscoverNodesOnStart bool // Discover nodes when initializing the client. Default: false.
  75. DiscoverNodesInterval time.Duration // Discover nodes periodically. Default: disabled.
  76. EnableMetrics bool // Enable the metrics collection.
  77. EnableDebugLogger bool // Enable the debug logging.
  78. EnableCompatibilityMode bool // Enable sends compatibility header
  79. DisableMetaHeader bool // Disable the additional "X-Elastic-Client-Meta" HTTP header.
  80. UseResponseCheckOnly bool
  81. RetryBackoff func(attempt int) time.Duration // Optional backoff duration. Default: nil.
  82. Transport http.RoundTripper // The HTTP transport object.
  83. Logger estransport.Logger // The logger object.
  84. Selector estransport.Selector // The selector object.
  85. // Optional constructor function for a custom ConnectionPool. Default: nil.
  86. ConnectionPoolFunc func([]*estransport.Connection, estransport.Selector) estransport.ConnectionPool
  87. }
  88. // Client represents the Elasticsearch client.
  89. //
  90. type Client struct {
  91. *esapi.API // Embeds the API methods
  92. Transport estransport.Interface
  93. useResponseCheckOnly bool
  94. productCheckMu sync.RWMutex
  95. productCheckSuccess bool
  96. }
  97. type esVersion struct {
  98. Number string `json:"number"`
  99. BuildFlavor string `json:"build_flavor"`
  100. }
  101. type info struct {
  102. Version esVersion `json:"version"`
  103. Tagline string `json:"tagline"`
  104. }
  105. // NewDefaultClient creates a new client with default options.
  106. //
  107. // It will use http://localhost:9200 as the default address.
  108. //
  109. // It will use the ELASTICSEARCH_URL environment variable, if set,
  110. // to configure the addresses; use a comma to separate multiple URLs.
  111. //
  112. func NewDefaultClient() (*Client, error) {
  113. return NewClient(Config{})
  114. }
  115. // NewClient creates a new client with configuration from cfg.
  116. //
  117. // It will use http://localhost:9200 as the default address.
  118. //
  119. // It will use the ELASTICSEARCH_URL environment variable, if set,
  120. // to configure the addresses; use a comma to separate multiple URLs.
  121. //
  122. // If either cfg.Addresses or cfg.CloudID is set, the ELASTICSEARCH_URL
  123. // environment variable is ignored.
  124. //
  125. // It's an error to set both cfg.Addresses and cfg.CloudID.
  126. //
  127. func NewClient(cfg Config) (*Client, error) {
  128. var addrs []string
  129. if len(cfg.Addresses) == 0 && cfg.CloudID == "" {
  130. addrs = addrsFromEnvironment()
  131. } else {
  132. if len(cfg.Addresses) > 0 && cfg.CloudID != "" {
  133. return nil, errors.New("cannot create client: both Addresses and CloudID are set")
  134. }
  135. if cfg.CloudID != "" {
  136. cloudAddr, err := addrFromCloudID(cfg.CloudID)
  137. if err != nil {
  138. return nil, fmt.Errorf("cannot create client: cannot parse CloudID: %s", err)
  139. }
  140. addrs = append(addrs, cloudAddr)
  141. }
  142. if len(cfg.Addresses) > 0 {
  143. addrs = append(addrs, cfg.Addresses...)
  144. }
  145. }
  146. urls, err := addrsToURLs(addrs)
  147. if err != nil {
  148. return nil, fmt.Errorf("cannot create client: %s", err)
  149. }
  150. if len(urls) == 0 {
  151. u, _ := url.Parse(defaultURL) // errcheck exclude
  152. urls = append(urls, u)
  153. }
  154. // TODO(karmi): Refactor
  155. if urls[0].User != nil {
  156. cfg.Username = urls[0].User.Username()
  157. pw, _ := urls[0].User.Password()
  158. cfg.Password = pw
  159. }
  160. tp, err := estransport.New(estransport.Config{
  161. URLs: urls,
  162. Username: cfg.Username,
  163. Password: cfg.Password,
  164. APIKey: cfg.APIKey,
  165. ServiceToken: cfg.ServiceToken,
  166. CertificateFingerprint: cfg.CertificateFingerprint,
  167. Header: cfg.Header,
  168. CACert: cfg.CACert,
  169. RetryOnStatus: cfg.RetryOnStatus,
  170. DisableRetry: cfg.DisableRetry,
  171. EnableRetryOnTimeout: cfg.EnableRetryOnTimeout,
  172. MaxRetries: cfg.MaxRetries,
  173. RetryBackoff: cfg.RetryBackoff,
  174. CompressRequestBody: cfg.CompressRequestBody,
  175. CompatibilityHeader: cfg.EnableCompatibilityMode,
  176. EnableMetrics: cfg.EnableMetrics,
  177. EnableDebugLogger: cfg.EnableDebugLogger,
  178. DisableMetaHeader: cfg.DisableMetaHeader,
  179. DiscoverNodesInterval: cfg.DiscoverNodesInterval,
  180. Transport: cfg.Transport,
  181. Logger: cfg.Logger,
  182. Selector: cfg.Selector,
  183. ConnectionPoolFunc: cfg.ConnectionPoolFunc,
  184. })
  185. if err != nil {
  186. return nil, fmt.Errorf("error creating transport: %s", err)
  187. }
  188. client := &Client{Transport: tp, useResponseCheckOnly: cfg.UseResponseCheckOnly}
  189. client.API = esapi.New(client)
  190. if cfg.DiscoverNodesOnStart {
  191. go client.DiscoverNodes()
  192. }
  193. return client, err
  194. }
  195. // genuineCheckHeader validates the presence of the X-Elastic-Product header
  196. //
  197. func genuineCheckHeader(header http.Header) error {
  198. if header.Get("X-Elastic-Product") != "Elasticsearch" {
  199. return errors.New(unknownProduct)
  200. }
  201. return nil
  202. }
  203. // genuineCheckInfo validates the informations given by Elasticsearch
  204. //
  205. func genuineCheckInfo(info info) error {
  206. major, minor, _, err := ParseElasticsearchVersion(info.Version.Number)
  207. if err != nil {
  208. return err
  209. }
  210. if major < 6 {
  211. return errors.New(unknownProduct)
  212. }
  213. if major < 7 {
  214. if info.Tagline != tagline {
  215. return errors.New(unknownProduct)
  216. }
  217. }
  218. if major >= 7 {
  219. if minor < 14 {
  220. if info.Tagline != tagline {
  221. return errors.New(unknownProduct)
  222. } else if info.Version.BuildFlavor != "default" {
  223. return errors.New(unsupportedProduct)
  224. }
  225. }
  226. }
  227. return nil
  228. }
  229. // ParseElasticsearchVersion returns an int64 representation of Elasticsearch version.
  230. //
  231. func ParseElasticsearchVersion(version string) (int64, int64, int64, error) {
  232. matches := reVersion.FindStringSubmatch(version)
  233. if len(matches) < 4 {
  234. return 0, 0, 0, fmt.Errorf("")
  235. }
  236. major, _ := strconv.ParseInt(matches[1], 10, 0)
  237. minor, _ := strconv.ParseInt(matches[2], 10, 0)
  238. patch, _ := strconv.ParseInt(matches[3], 10, 0)
  239. return major, minor, patch, nil
  240. }
  241. // Perform delegates to Transport to execute a request and return a response.
  242. //
  243. func (c *Client) Perform(req *http.Request) (*http.Response, error) {
  244. // ProductCheck validation. We skip this validation of we only want the
  245. // header validation. ResponseCheck path continues after original request.
  246. if !c.useResponseCheckOnly {
  247. // Launch product check for 7.x, request info, check header then payload.
  248. if err := c.doProductCheck(req.Context(), c.productCheck); err != nil {
  249. return nil, err
  250. }
  251. }
  252. // Retrieve the original request.
  253. res, err := c.Transport.Perform(req)
  254. // ResponseCheck path continues, we run the header check on the first answer from ES.
  255. if err == nil && (res.StatusCode >= 200 && res.StatusCode < 300){
  256. checkHeader := func(context.Context) error {
  257. return genuineCheckHeader(res.Header)
  258. }
  259. if err := c.doProductCheck(req.Context(), checkHeader); err != nil {
  260. res.Body.Close()
  261. return nil, err
  262. }
  263. }
  264. return res, err
  265. }
  266. // doProductCheck calls f if there as not been a prior successful call to doProductCheck,
  267. // returning nil otherwise.
  268. func (c *Client) doProductCheck(ctx context.Context, f func(context.Context) error) error {
  269. c.productCheckMu.RLock()
  270. productCheckSuccess := c.productCheckSuccess
  271. c.productCheckMu.RUnlock()
  272. if productCheckSuccess {
  273. return nil
  274. }
  275. c.productCheckMu.Lock()
  276. defer c.productCheckMu.Unlock()
  277. if c.productCheckSuccess {
  278. return nil
  279. }
  280. if err := f(ctx); err != nil {
  281. return err
  282. }
  283. c.productCheckSuccess = true
  284. return nil
  285. }
  286. // productCheck runs an esapi.Info query to retrieve informations of the current cluster
  287. // decodes the response and decides if the cluster is a genuine Elasticsearch product.
  288. func (c *Client) productCheck(ctx context.Context) error {
  289. req := esapi.InfoRequest{}
  290. res, err := req.Do(ctx, c.Transport)
  291. if err != nil {
  292. return err
  293. }
  294. defer res.Body.Close()
  295. if res.IsError() {
  296. _, err = io.Copy(ioutil.Discard, res.Body)
  297. if err != nil {
  298. return err
  299. }
  300. switch res.StatusCode {
  301. case http.StatusUnauthorized:
  302. return nil
  303. case http.StatusForbidden:
  304. return nil
  305. default:
  306. return fmt.Errorf("cannot retrieve informations from Elasticsearch")
  307. }
  308. }
  309. err = genuineCheckHeader(res.Header)
  310. if err != nil {
  311. var info info
  312. contentType := res.Header.Get("Content-Type")
  313. if strings.Contains(contentType, "json") {
  314. err = json.NewDecoder(res.Body).Decode(&info)
  315. if err != nil {
  316. return fmt.Errorf("error decoding Elasticsearch informations: %s", err)
  317. }
  318. }
  319. if info.Version.Number != "" {
  320. err = genuineCheckInfo(info)
  321. }
  322. }
  323. if err != nil {
  324. return err
  325. }
  326. return nil
  327. }
  328. // Metrics returns the client metrics.
  329. //
  330. func (c *Client) Metrics() (estransport.Metrics, error) {
  331. if mt, ok := c.Transport.(estransport.Measurable); ok {
  332. return mt.Metrics()
  333. }
  334. return estransport.Metrics{}, errors.New("transport is missing method Metrics()")
  335. }
  336. // DiscoverNodes reloads the client connections by fetching information from the cluster.
  337. //
  338. func (c *Client) DiscoverNodes() error {
  339. if dt, ok := c.Transport.(estransport.Discoverable); ok {
  340. return dt.DiscoverNodes()
  341. }
  342. return errors.New("transport is missing method DiscoverNodes()")
  343. }
  344. // addrsFromEnvironment returns a list of addresses by splitting
  345. // the ELASTICSEARCH_URL environment variable with comma, or an empty list.
  346. //
  347. func addrsFromEnvironment() []string {
  348. var addrs []string
  349. if envURLs, ok := os.LookupEnv("ELASTICSEARCH_URL"); ok && envURLs != "" {
  350. list := strings.Split(envURLs, ",")
  351. for _, u := range list {
  352. addrs = append(addrs, strings.TrimSpace(u))
  353. }
  354. }
  355. return addrs
  356. }
  357. // addrsToURLs creates a list of url.URL structures from url list.
  358. //
  359. func addrsToURLs(addrs []string) ([]*url.URL, error) {
  360. var urls []*url.URL
  361. for _, addr := range addrs {
  362. u, err := url.Parse(strings.TrimRight(addr, "/"))
  363. if err != nil {
  364. return nil, fmt.Errorf("cannot parse url: %v", err)
  365. }
  366. urls = append(urls, u)
  367. }
  368. return urls, nil
  369. }
  370. // addrFromCloudID extracts the Elasticsearch URL from CloudID.
  371. // See: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html
  372. //
  373. func addrFromCloudID(input string) (string, error) {
  374. var scheme = "https://"
  375. values := strings.Split(input, ":")
  376. if len(values) != 2 {
  377. return "", fmt.Errorf("unexpected format: %q", input)
  378. }
  379. data, err := base64.StdEncoding.DecodeString(values[1])
  380. if err != nil {
  381. return "", err
  382. }
  383. parts := strings.Split(string(data), "$")
  384. if len(parts) < 2 {
  385. return "", fmt.Errorf("invalid encoded value: %s", parts)
  386. }
  387. return fmt.Sprintf("%s%s.%s", scheme, parts[1], parts[0]), nil
  388. }