connection.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. // Licensed to Elasticsearch B.V. under one or more contributor
  2. // license agreements. See the NOTICE file distributed with
  3. // this work for additional information regarding copyright
  4. // ownership. Elasticsearch B.V. licenses this file to you under
  5. // the Apache License, Version 2.0 (the "License"); you may
  6. // not use this file except in compliance with the License.
  7. // You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing,
  12. // software distributed under the License is distributed on an
  13. // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. // KIND, either express or implied. See the License for the
  15. // specific language governing permissions and limitations
  16. // under the License.
  17. package estransport
  18. import (
  19. "errors"
  20. "fmt"
  21. "math"
  22. "net/url"
  23. "sort"
  24. "sync"
  25. "time"
  26. )
  27. var (
  28. defaultResurrectTimeoutInitial = 60 * time.Second
  29. defaultResurrectTimeoutFactorCutoff = 5
  30. )
  31. // Selector defines the interface for selecting connections from the pool.
  32. //
  33. type Selector interface {
  34. Select([]*Connection) (*Connection, error)
  35. }
  36. // ConnectionPool defines the interface for the connection pool.
  37. //
  38. type ConnectionPool interface {
  39. Next() (*Connection, error) // Next returns the next available connection.
  40. OnSuccess(*Connection) error // OnSuccess reports that the connection was successful.
  41. OnFailure(*Connection) error // OnFailure reports that the connection failed.
  42. URLs() []*url.URL // URLs returns the list of URLs of available connections.
  43. }
  44. // Connection represents a connection to a node.
  45. //
  46. type Connection struct {
  47. sync.Mutex
  48. URL *url.URL
  49. IsDead bool
  50. DeadSince time.Time
  51. Failures int
  52. ID string
  53. Name string
  54. Roles []string
  55. Attributes map[string]interface{}
  56. }
  57. type singleConnectionPool struct {
  58. connection *Connection
  59. metrics *metrics
  60. }
  61. type statusConnectionPool struct {
  62. sync.Mutex
  63. live []*Connection // List of live connections
  64. dead []*Connection // List of dead connections
  65. selector Selector
  66. metrics *metrics
  67. }
  68. type roundRobinSelector struct {
  69. sync.Mutex
  70. curr int // Index of the current connection
  71. }
  72. // NewConnectionPool creates and returns a default connection pool.
  73. //
  74. func NewConnectionPool(conns []*Connection, selector Selector) (ConnectionPool, error) {
  75. if len(conns) == 1 {
  76. return &singleConnectionPool{connection: conns[0]}, nil
  77. }
  78. if selector == nil {
  79. selector = &roundRobinSelector{curr: -1}
  80. }
  81. return &statusConnectionPool{live: conns, selector: selector}, nil
  82. }
  83. // Next returns the connection from pool.
  84. //
  85. func (cp *singleConnectionPool) Next() (*Connection, error) {
  86. return cp.connection, nil
  87. }
  88. // OnSuccess is a no-op for single connection pool.
  89. func (cp *singleConnectionPool) OnSuccess(c *Connection) error { return nil }
  90. // OnFailure is a no-op for single connection pool.
  91. func (cp *singleConnectionPool) OnFailure(c *Connection) error { return nil }
  92. // URLs returns the list of URLs of available connections.
  93. func (cp *singleConnectionPool) URLs() []*url.URL { return []*url.URL{cp.connection.URL} }
  94. func (cp *singleConnectionPool) connections() []*Connection { return []*Connection{cp.connection} }
  95. // Next returns a connection from pool, or an error.
  96. //
  97. func (cp *statusConnectionPool) Next() (*Connection, error) {
  98. cp.Lock()
  99. defer cp.Unlock()
  100. // Return next live connection
  101. if len(cp.live) > 0 {
  102. return cp.selector.Select(cp.live)
  103. } else if len(cp.dead) > 0 {
  104. // No live connection is available, resurrect one of the dead ones.
  105. c := cp.dead[len(cp.dead)-1]
  106. cp.dead = cp.dead[:len(cp.dead)-1]
  107. c.Lock()
  108. defer c.Unlock()
  109. cp.resurrect(c, false)
  110. return c, nil
  111. }
  112. return nil, errors.New("no connection available")
  113. }
  114. // OnSuccess marks the connection as successful.
  115. //
  116. func (cp *statusConnectionPool) OnSuccess(c *Connection) error {
  117. // Short-circuit for live connection
  118. c.Lock()
  119. if !c.IsDead {
  120. c.Unlock()
  121. return nil
  122. }
  123. c.Unlock()
  124. cp.Lock()
  125. defer cp.Unlock()
  126. c.Lock()
  127. defer c.Unlock()
  128. if !c.IsDead {
  129. return nil
  130. }
  131. c.markAsHealthy()
  132. return cp.resurrect(c, true)
  133. }
  134. // OnFailure marks the connection as failed.
  135. func (cp *statusConnectionPool) OnFailure(c *Connection) error {
  136. cp.Lock()
  137. defer cp.Unlock()
  138. c.Lock()
  139. if c.IsDead {
  140. if debugLogger != nil {
  141. debugLogger.Logf("Already removed %s\n", c.URL)
  142. }
  143. c.Unlock()
  144. return nil
  145. }
  146. if debugLogger != nil {
  147. debugLogger.Logf("Removing %s...\n", c.URL)
  148. }
  149. c.markAsDead()
  150. cp.scheduleResurrect(c)
  151. c.Unlock()
  152. // Push item to dead list and sort slice by number of failures
  153. cp.dead = append(cp.dead, c)
  154. sort.Slice(cp.dead, func(i, j int) bool {
  155. c1 := cp.dead[i]
  156. c2 := cp.dead[j]
  157. c1.Lock()
  158. c2.Lock()
  159. defer c1.Unlock()
  160. defer c2.Unlock()
  161. res := c1.Failures > c2.Failures
  162. return res
  163. })
  164. // Check if connection exists in the list, return error if not.
  165. index := -1
  166. for i, conn := range cp.live {
  167. if conn == c {
  168. index = i
  169. }
  170. }
  171. if index < 0 {
  172. return errors.New("connection not in live list")
  173. }
  174. // Remove item; https://github.com/golang/go/wiki/SliceTricks
  175. copy(cp.live[index:], cp.live[index+1:])
  176. cp.live = cp.live[:len(cp.live)-1]
  177. return nil
  178. }
  179. // URLs returns the list of URLs of available connections.
  180. //
  181. func (cp *statusConnectionPool) URLs() []*url.URL {
  182. var urls []*url.URL
  183. cp.Lock()
  184. defer cp.Unlock()
  185. for _, c := range cp.live {
  186. urls = append(urls, c.URL)
  187. }
  188. return urls
  189. }
  190. func (cp *statusConnectionPool) connections() []*Connection {
  191. var conns []*Connection
  192. conns = append(conns, cp.live...)
  193. conns = append(conns, cp.dead...)
  194. return conns
  195. }
  196. // resurrect adds the connection to the list of available connections.
  197. // When removeDead is true, it also removes it from the dead list.
  198. // The calling code is responsible for locking.
  199. //
  200. func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error {
  201. if debugLogger != nil {
  202. debugLogger.Logf("Resurrecting %s\n", c.URL)
  203. }
  204. c.markAsLive()
  205. cp.live = append(cp.live, c)
  206. if removeDead {
  207. index := -1
  208. for i, conn := range cp.dead {
  209. if conn == c {
  210. index = i
  211. }
  212. }
  213. if index >= 0 {
  214. // Remove item; https://github.com/golang/go/wiki/SliceTricks
  215. copy(cp.dead[index:], cp.dead[index+1:])
  216. cp.dead = cp.dead[:len(cp.dead)-1]
  217. }
  218. }
  219. return nil
  220. }
  221. // scheduleResurrect schedules the connection to be resurrected.
  222. //
  223. func (cp *statusConnectionPool) scheduleResurrect(c *Connection) {
  224. factor := math.Min(float64(c.Failures-1), float64(defaultResurrectTimeoutFactorCutoff))
  225. timeout := time.Duration(defaultResurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second))
  226. if debugLogger != nil {
  227. debugLogger.Logf("Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", c.URL, c.Failures, factor, timeout, c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second))
  228. }
  229. time.AfterFunc(timeout, func() {
  230. cp.Lock()
  231. defer cp.Unlock()
  232. c.Lock()
  233. defer c.Unlock()
  234. if !c.IsDead {
  235. if debugLogger != nil {
  236. debugLogger.Logf("Already resurrected %s\n", c.URL)
  237. }
  238. return
  239. }
  240. cp.resurrect(c, true)
  241. })
  242. }
  243. // Select returns the connection in a round-robin fashion.
  244. //
  245. func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) {
  246. s.Lock()
  247. defer s.Unlock()
  248. s.curr = (s.curr + 1) % len(conns)
  249. return conns[s.curr], nil
  250. }
  251. // markAsDead marks the connection as dead.
  252. //
  253. func (c *Connection) markAsDead() {
  254. c.IsDead = true
  255. if c.DeadSince.IsZero() {
  256. c.DeadSince = time.Now().UTC()
  257. }
  258. c.Failures++
  259. }
  260. // markAsLive marks the connection as alive.
  261. //
  262. func (c *Connection) markAsLive() {
  263. c.IsDead = false
  264. }
  265. // markAsHealthy marks the connection as healthy.
  266. //
  267. func (c *Connection) markAsHealthy() {
  268. c.IsDead = false
  269. c.DeadSince = time.Time{}
  270. c.Failures = 0
  271. }
  272. // String returns a readable connection representation.
  273. //
  274. func (c *Connection) String() string {
  275. c.Lock()
  276. defer c.Unlock()
  277. return fmt.Sprintf("<%s> dead=%v failures=%d", c.URL, c.IsDead, c.Failures)
  278. }