123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- // Licensed to Elasticsearch B.V. under one or more contributor
- // license agreements. See the NOTICE file distributed with
- // this work for additional information regarding copyright
- // ownership. Elasticsearch B.V. licenses this file to you under
- // the Apache License, Version 2.0 (the "License"); you may
- // not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing,
- // software distributed under the License is distributed on an
- // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- // KIND, either express or implied. See the License for the
- // specific language governing permissions and limitations
- // under the License.
- package estransport
- import (
- "errors"
- "fmt"
- "math"
- "net/url"
- "sort"
- "sync"
- "time"
- )
- var (
- defaultResurrectTimeoutInitial = 60 * time.Second
- defaultResurrectTimeoutFactorCutoff = 5
- )
- // Selector defines the interface for selecting connections from the pool.
- //
- type Selector interface {
- Select([]*Connection) (*Connection, error)
- }
- // ConnectionPool defines the interface for the connection pool.
- //
- type ConnectionPool interface {
- Next() (*Connection, error) // Next returns the next available connection.
- OnSuccess(*Connection) error // OnSuccess reports that the connection was successful.
- OnFailure(*Connection) error // OnFailure reports that the connection failed.
- URLs() []*url.URL // URLs returns the list of URLs of available connections.
- }
- // Connection represents a connection to a node.
- //
- type Connection struct {
- sync.Mutex
- URL *url.URL
- IsDead bool
- DeadSince time.Time
- Failures int
- ID string
- Name string
- Roles []string
- Attributes map[string]interface{}
- }
- type singleConnectionPool struct {
- connection *Connection
- metrics *metrics
- }
- type statusConnectionPool struct {
- sync.Mutex
- live []*Connection // List of live connections
- dead []*Connection // List of dead connections
- selector Selector
- metrics *metrics
- }
- type roundRobinSelector struct {
- sync.Mutex
- curr int // Index of the current connection
- }
- // NewConnectionPool creates and returns a default connection pool.
- //
- func NewConnectionPool(conns []*Connection, selector Selector) (ConnectionPool, error) {
- if len(conns) == 1 {
- return &singleConnectionPool{connection: conns[0]}, nil
- }
- if selector == nil {
- selector = &roundRobinSelector{curr: -1}
- }
- return &statusConnectionPool{live: conns, selector: selector}, nil
- }
- // Next returns the connection from pool.
- //
- func (cp *singleConnectionPool) Next() (*Connection, error) {
- return cp.connection, nil
- }
- // OnSuccess is a no-op for single connection pool.
- func (cp *singleConnectionPool) OnSuccess(c *Connection) error { return nil }
- // OnFailure is a no-op for single connection pool.
- func (cp *singleConnectionPool) OnFailure(c *Connection) error { return nil }
- // URLs returns the list of URLs of available connections.
- func (cp *singleConnectionPool) URLs() []*url.URL { return []*url.URL{cp.connection.URL} }
- func (cp *singleConnectionPool) connections() []*Connection { return []*Connection{cp.connection} }
- // Next returns a connection from pool, or an error.
- //
- func (cp *statusConnectionPool) Next() (*Connection, error) {
- cp.Lock()
- defer cp.Unlock()
- // Return next live connection
- if len(cp.live) > 0 {
- return cp.selector.Select(cp.live)
- } else if len(cp.dead) > 0 {
- // No live connection is available, resurrect one of the dead ones.
- c := cp.dead[len(cp.dead)-1]
- cp.dead = cp.dead[:len(cp.dead)-1]
- c.Lock()
- defer c.Unlock()
- cp.resurrect(c, false)
- return c, nil
- }
- return nil, errors.New("no connection available")
- }
- // OnSuccess marks the connection as successful.
- //
- func (cp *statusConnectionPool) OnSuccess(c *Connection) error {
- // Short-circuit for live connection
- c.Lock()
- if !c.IsDead {
- c.Unlock()
- return nil
- }
- c.Unlock()
- cp.Lock()
- defer cp.Unlock()
- c.Lock()
- defer c.Unlock()
- if !c.IsDead {
- return nil
- }
- c.markAsHealthy()
- return cp.resurrect(c, true)
- }
- // OnFailure marks the connection as failed.
- func (cp *statusConnectionPool) OnFailure(c *Connection) error {
- cp.Lock()
- defer cp.Unlock()
- c.Lock()
- if c.IsDead {
- if debugLogger != nil {
- debugLogger.Logf("Already removed %s\n", c.URL)
- }
- c.Unlock()
- return nil
- }
- if debugLogger != nil {
- debugLogger.Logf("Removing %s...\n", c.URL)
- }
- c.markAsDead()
- cp.scheduleResurrect(c)
- c.Unlock()
- // Push item to dead list and sort slice by number of failures
- cp.dead = append(cp.dead, c)
- sort.Slice(cp.dead, func(i, j int) bool {
- c1 := cp.dead[i]
- c2 := cp.dead[j]
- c1.Lock()
- c2.Lock()
- defer c1.Unlock()
- defer c2.Unlock()
- res := c1.Failures > c2.Failures
- return res
- })
- // Check if connection exists in the list, return error if not.
- index := -1
- for i, conn := range cp.live {
- if conn == c {
- index = i
- }
- }
- if index < 0 {
- return errors.New("connection not in live list")
- }
- // Remove item; https://github.com/golang/go/wiki/SliceTricks
- copy(cp.live[index:], cp.live[index+1:])
- cp.live = cp.live[:len(cp.live)-1]
- return nil
- }
- // URLs returns the list of URLs of available connections.
- //
- func (cp *statusConnectionPool) URLs() []*url.URL {
- var urls []*url.URL
- cp.Lock()
- defer cp.Unlock()
- for _, c := range cp.live {
- urls = append(urls, c.URL)
- }
- return urls
- }
- func (cp *statusConnectionPool) connections() []*Connection {
- var conns []*Connection
- conns = append(conns, cp.live...)
- conns = append(conns, cp.dead...)
- return conns
- }
- // resurrect adds the connection to the list of available connections.
- // When removeDead is true, it also removes it from the dead list.
- // The calling code is responsible for locking.
- //
- func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error {
- if debugLogger != nil {
- debugLogger.Logf("Resurrecting %s\n", c.URL)
- }
- c.markAsLive()
- cp.live = append(cp.live, c)
- if removeDead {
- index := -1
- for i, conn := range cp.dead {
- if conn == c {
- index = i
- }
- }
- if index >= 0 {
- // Remove item; https://github.com/golang/go/wiki/SliceTricks
- copy(cp.dead[index:], cp.dead[index+1:])
- cp.dead = cp.dead[:len(cp.dead)-1]
- }
- }
- return nil
- }
- // scheduleResurrect schedules the connection to be resurrected.
- //
- func (cp *statusConnectionPool) scheduleResurrect(c *Connection) {
- factor := math.Min(float64(c.Failures-1), float64(defaultResurrectTimeoutFactorCutoff))
- timeout := time.Duration(defaultResurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second))
- if debugLogger != nil {
- debugLogger.Logf("Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", c.URL, c.Failures, factor, timeout, c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second))
- }
- time.AfterFunc(timeout, func() {
- cp.Lock()
- defer cp.Unlock()
- c.Lock()
- defer c.Unlock()
- if !c.IsDead {
- if debugLogger != nil {
- debugLogger.Logf("Already resurrected %s\n", c.URL)
- }
- return
- }
- cp.resurrect(c, true)
- })
- }
- // Select returns the connection in a round-robin fashion.
- //
- func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) {
- s.Lock()
- defer s.Unlock()
- s.curr = (s.curr + 1) % len(conns)
- return conns[s.curr], nil
- }
- // markAsDead marks the connection as dead.
- //
- func (c *Connection) markAsDead() {
- c.IsDead = true
- if c.DeadSince.IsZero() {
- c.DeadSince = time.Now().UTC()
- }
- c.Failures++
- }
- // markAsLive marks the connection as alive.
- //
- func (c *Connection) markAsLive() {
- c.IsDead = false
- }
- // markAsHealthy marks the connection as healthy.
- //
- func (c *Connection) markAsHealthy() {
- c.IsDead = false
- c.DeadSince = time.Time{}
- c.Failures = 0
- }
- // String returns a readable connection representation.
- //
- func (c *Connection) String() string {
- c.Lock()
- defer c.Unlock()
- return fmt.Sprintf("<%s> dead=%v failures=%d", c.URL, c.IsDead, c.Failures)
- }
|