discovery.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Licensed to Elasticsearch B.V. under one or more contributor
  2. // license agreements. See the NOTICE file distributed with
  3. // this work for additional information regarding copyright
  4. // ownership. Elasticsearch B.V. licenses this file to you under
  5. // the Apache License, Version 2.0 (the "License"); you may
  6. // not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. package estransport
  18. import (
  19. "encoding/json"
  20. "fmt"
  21. "io/ioutil"
  22. "net/http"
  23. "net/url"
  24. "sort"
  25. "strings"
  26. "sync"
  27. "time"
  28. )
  29. // Discoverable defines the interface for transports supporting node discovery.
  30. //
  31. type Discoverable interface {
  32. DiscoverNodes() error
  33. }
  34. // nodeInfo represents the information about node in a cluster.
  35. //
  36. // See: https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-info.html
  37. //
  38. type nodeInfo struct {
  39. ID string
  40. Name string
  41. URL *url.URL
  42. Roles []string `json:"roles"`
  43. Attributes map[string]interface{}
  44. HTTP struct {
  45. PublishAddress string `json:"publish_address"`
  46. }
  47. }
  48. // DiscoverNodes reloads the client connections by fetching information from the cluster.
  49. //
  50. func (c *Client) DiscoverNodes() error {
  51. var conns []*Connection
  52. nodes, err := c.getNodesInfo()
  53. if err != nil {
  54. if debugLogger != nil {
  55. debugLogger.Logf("Error getting nodes info: %s\n", err)
  56. }
  57. return fmt.Errorf("discovery: get nodes: %s", err)
  58. }
  59. for _, node := range nodes {
  60. var (
  61. isMasterOnlyNode bool
  62. )
  63. roles := append(node.Roles[:0:0], node.Roles...)
  64. sort.Strings(roles)
  65. if len(roles) == 1 && roles[0] == "master" {
  66. isMasterOnlyNode = true
  67. }
  68. if debugLogger != nil {
  69. var skip string
  70. if isMasterOnlyNode {
  71. skip = "; [SKIP]"
  72. }
  73. debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip)
  74. }
  75. // Skip master only nodes
  76. // TODO(karmi): Move logic to Selector?
  77. if isMasterOnlyNode {
  78. continue
  79. }
  80. conns = append(conns, &Connection{
  81. URL: node.URL,
  82. ID: node.ID,
  83. Name: node.Name,
  84. Roles: node.Roles,
  85. Attributes: node.Attributes,
  86. })
  87. }
  88. c.Lock()
  89. defer c.Unlock()
  90. if lockable, ok := c.pool.(sync.Locker); ok {
  91. lockable.Lock()
  92. defer lockable.Unlock()
  93. }
  94. if c.poolFunc != nil {
  95. c.pool = c.poolFunc(conns, c.selector)
  96. } else {
  97. // TODO(karmi): Replace only live connections, leave dead scheduled for resurrect?
  98. c.pool, err = NewConnectionPool(conns, c.selector)
  99. if err != nil {
  100. return err
  101. }
  102. }
  103. return nil
  104. }
  105. func (c *Client) getNodesInfo() ([]nodeInfo, error) {
  106. var (
  107. out []nodeInfo
  108. scheme = c.urls[0].Scheme
  109. )
  110. req, err := http.NewRequest("GET", "/_nodes/http", nil)
  111. if err != nil {
  112. return out, err
  113. }
  114. c.Lock()
  115. conn, err := c.pool.Next()
  116. c.Unlock()
  117. // TODO(karmi): If no connection is returned, fallback to original URLs
  118. if err != nil {
  119. return out, err
  120. }
  121. c.setReqURL(conn.URL, req)
  122. c.setReqAuth(conn.URL, req)
  123. c.setReqUserAgent(req)
  124. if c.disableMetaHeader == false {
  125. c.setMetaHeader(req)
  126. }
  127. res, err := c.transport.RoundTrip(req)
  128. if err != nil {
  129. return out, err
  130. }
  131. defer res.Body.Close()
  132. if res.StatusCode > 200 {
  133. body, _ := ioutil.ReadAll(res.Body)
  134. return out, fmt.Errorf("server error: %s: %s", res.Status, body)
  135. }
  136. var env map[string]json.RawMessage
  137. if err := json.NewDecoder(res.Body).Decode(&env); err != nil {
  138. return out, err
  139. }
  140. var nodes map[string]nodeInfo
  141. if err := json.Unmarshal(env["nodes"], &nodes); err != nil {
  142. return out, err
  143. }
  144. for id, node := range nodes {
  145. node.ID = id
  146. u, err := c.getNodeURL(node, scheme)
  147. if err != nil {
  148. return out, err
  149. }
  150. node.URL = u
  151. out = append(out, node)
  152. }
  153. return out, nil
  154. }
  155. func (c *Client) getNodeURL(node nodeInfo, scheme string) (*url.URL, error) {
  156. var (
  157. host string
  158. port string
  159. addrs = strings.Split(node.HTTP.PublishAddress, "/")
  160. ports = strings.Split(node.HTTP.PublishAddress, ":")
  161. )
  162. if len(addrs) > 1 {
  163. host = addrs[0]
  164. } else {
  165. host = strings.Split(addrs[0], ":")[0]
  166. }
  167. port = ports[len(ports)-1]
  168. u := &url.URL{
  169. Scheme: scheme,
  170. Host: host + ":" + port,
  171. }
  172. return u, nil
  173. }
  174. func (c *Client) scheduleDiscoverNodes(d time.Duration) {
  175. go c.DiscoverNodes()
  176. c.Lock()
  177. defer c.Unlock()
  178. if c.discoverNodesTimer != nil {
  179. c.discoverNodesTimer.Stop()
  180. }
  181. c.discoverNodesTimer = time.AfterFunc(c.discoverNodesInterval, func() {
  182. c.scheduleDiscoverNodes(c.discoverNodesInterval)
  183. })
  184. }