// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package estransport import ( "errors" "fmt" "math" "net/url" "sort" "sync" "time" ) var ( defaultResurrectTimeoutInitial = 60 * time.Second defaultResurrectTimeoutFactorCutoff = 5 ) // Selector defines the interface for selecting connections from the pool. // type Selector interface { Select([]*Connection) (*Connection, error) } // ConnectionPool defines the interface for the connection pool. // type ConnectionPool interface { Next() (*Connection, error) // Next returns the next available connection. OnSuccess(*Connection) error // OnSuccess reports that the connection was successful. OnFailure(*Connection) error // OnFailure reports that the connection failed. URLs() []*url.URL // URLs returns the list of URLs of available connections. } // Connection represents a connection to a node. // type Connection struct { sync.Mutex URL *url.URL IsDead bool DeadSince time.Time Failures int ID string Name string Roles []string Attributes map[string]interface{} } type singleConnectionPool struct { connection *Connection metrics *metrics } type statusConnectionPool struct { sync.Mutex live []*Connection // List of live connections dead []*Connection // List of dead connections selector Selector metrics *metrics } type roundRobinSelector struct { sync.Mutex curr int // Index of the current connection } // NewConnectionPool creates and returns a default connection pool. // func NewConnectionPool(conns []*Connection, selector Selector) (ConnectionPool, error) { if len(conns) == 1 { return &singleConnectionPool{connection: conns[0]}, nil } if selector == nil { selector = &roundRobinSelector{curr: -1} } return &statusConnectionPool{live: conns, selector: selector}, nil } // Next returns the connection from pool. // func (cp *singleConnectionPool) Next() (*Connection, error) { return cp.connection, nil } // OnSuccess is a no-op for single connection pool. func (cp *singleConnectionPool) OnSuccess(c *Connection) error { return nil } // OnFailure is a no-op for single connection pool. func (cp *singleConnectionPool) OnFailure(c *Connection) error { return nil } // URLs returns the list of URLs of available connections. func (cp *singleConnectionPool) URLs() []*url.URL { return []*url.URL{cp.connection.URL} } func (cp *singleConnectionPool) connections() []*Connection { return []*Connection{cp.connection} } // Next returns a connection from pool, or an error. // func (cp *statusConnectionPool) Next() (*Connection, error) { cp.Lock() defer cp.Unlock() // Return next live connection if len(cp.live) > 0 { return cp.selector.Select(cp.live) } else if len(cp.dead) > 0 { // No live connection is available, resurrect one of the dead ones. c := cp.dead[len(cp.dead)-1] cp.dead = cp.dead[:len(cp.dead)-1] c.Lock() defer c.Unlock() cp.resurrect(c, false) return c, nil } return nil, errors.New("no connection available") } // OnSuccess marks the connection as successful. // func (cp *statusConnectionPool) OnSuccess(c *Connection) error { // Short-circuit for live connection c.Lock() if !c.IsDead { c.Unlock() return nil } c.Unlock() cp.Lock() defer cp.Unlock() c.Lock() defer c.Unlock() if !c.IsDead { return nil } c.markAsHealthy() return cp.resurrect(c, true) } // OnFailure marks the connection as failed. func (cp *statusConnectionPool) OnFailure(c *Connection) error { cp.Lock() defer cp.Unlock() c.Lock() if c.IsDead { if debugLogger != nil { debugLogger.Logf("Already removed %s\n", c.URL) } c.Unlock() return nil } if debugLogger != nil { debugLogger.Logf("Removing %s...\n", c.URL) } c.markAsDead() cp.scheduleResurrect(c) c.Unlock() // Push item to dead list and sort slice by number of failures cp.dead = append(cp.dead, c) sort.Slice(cp.dead, func(i, j int) bool { c1 := cp.dead[i] c2 := cp.dead[j] c1.Lock() c2.Lock() defer c1.Unlock() defer c2.Unlock() res := c1.Failures > c2.Failures return res }) // Check if connection exists in the list, return error if not. index := -1 for i, conn := range cp.live { if conn == c { index = i } } if index < 0 { return errors.New("connection not in live list") } // Remove item; https://github.com/golang/go/wiki/SliceTricks copy(cp.live[index:], cp.live[index+1:]) cp.live = cp.live[:len(cp.live)-1] return nil } // URLs returns the list of URLs of available connections. // func (cp *statusConnectionPool) URLs() []*url.URL { var urls []*url.URL cp.Lock() defer cp.Unlock() for _, c := range cp.live { urls = append(urls, c.URL) } return urls } func (cp *statusConnectionPool) connections() []*Connection { var conns []*Connection conns = append(conns, cp.live...) conns = append(conns, cp.dead...) return conns } // resurrect adds the connection to the list of available connections. // When removeDead is true, it also removes it from the dead list. // The calling code is responsible for locking. // func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error { if debugLogger != nil { debugLogger.Logf("Resurrecting %s\n", c.URL) } c.markAsLive() cp.live = append(cp.live, c) if removeDead { index := -1 for i, conn := range cp.dead { if conn == c { index = i } } if index >= 0 { // Remove item; https://github.com/golang/go/wiki/SliceTricks copy(cp.dead[index:], cp.dead[index+1:]) cp.dead = cp.dead[:len(cp.dead)-1] } } return nil } // scheduleResurrect schedules the connection to be resurrected. // func (cp *statusConnectionPool) scheduleResurrect(c *Connection) { factor := math.Min(float64(c.Failures-1), float64(defaultResurrectTimeoutFactorCutoff)) timeout := time.Duration(defaultResurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second)) if debugLogger != nil { debugLogger.Logf("Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", c.URL, c.Failures, factor, timeout, c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second)) } time.AfterFunc(timeout, func() { cp.Lock() defer cp.Unlock() c.Lock() defer c.Unlock() if !c.IsDead { if debugLogger != nil { debugLogger.Logf("Already resurrected %s\n", c.URL) } return } cp.resurrect(c, true) }) } // Select returns the connection in a round-robin fashion. // func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) { s.Lock() defer s.Unlock() s.curr = (s.curr + 1) % len(conns) return conns[s.curr], nil } // markAsDead marks the connection as dead. // func (c *Connection) markAsDead() { c.IsDead = true if c.DeadSince.IsZero() { c.DeadSince = time.Now().UTC() } c.Failures++ } // markAsLive marks the connection as alive. // func (c *Connection) markAsLive() { c.IsDead = false } // markAsHealthy marks the connection as healthy. // func (c *Connection) markAsHealthy() { c.IsDead = false c.DeadSince = time.Time{} c.Failures = 0 } // String returns a readable connection representation. // func (c *Connection) String() string { c.Lock() defer c.Unlock() return fmt.Sprintf("<%s> dead=%v failures=%d", c.URL, c.IsDead, c.Failures) }