cluster.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750
  1. package redis
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "math"
  7. "net"
  8. "runtime"
  9. "sort"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "github.com/go-redis/redis/v8/internal"
  14. "github.com/go-redis/redis/v8/internal/hashtag"
  15. "github.com/go-redis/redis/v8/internal/pool"
  16. "github.com/go-redis/redis/v8/internal/proto"
  17. "github.com/go-redis/redis/v8/internal/rand"
  18. )
  19. var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
  20. // ClusterOptions are used to configure a cluster client and should be
  21. // passed to NewClusterClient.
  22. type ClusterOptions struct {
  23. // A seed list of host:port addresses of cluster nodes.
  24. Addrs []string
  25. // NewClient creates a cluster node client with provided name and options.
  26. NewClient func(opt *Options) *Client
  27. // The maximum number of retries before giving up. Command is retried
  28. // on network errors and MOVED/ASK redirects.
  29. // Default is 3 retries.
  30. MaxRedirects int
  31. // Enables read-only commands on slave nodes.
  32. ReadOnly bool
  33. // Allows routing read-only commands to the closest master or slave node.
  34. // It automatically enables ReadOnly.
  35. RouteByLatency bool
  36. // Allows routing read-only commands to the random master or slave node.
  37. // It automatically enables ReadOnly.
  38. RouteRandomly bool
  39. // Optional function that returns cluster slots information.
  40. // It is useful to manually create cluster of standalone Redis servers
  41. // and load-balance read/write operations between master and slaves.
  42. // It can use service like ZooKeeper to maintain configuration information
  43. // and Cluster.ReloadState to manually trigger state reloading.
  44. ClusterSlots func(context.Context) ([]ClusterSlot, error)
  45. // Following options are copied from Options struct.
  46. Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
  47. OnConnect func(ctx context.Context, cn *Conn) error
  48. Username string
  49. Password string
  50. MaxRetries int
  51. MinRetryBackoff time.Duration
  52. MaxRetryBackoff time.Duration
  53. DialTimeout time.Duration
  54. ReadTimeout time.Duration
  55. WriteTimeout time.Duration
  56. // PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
  57. PoolFIFO bool
  58. // PoolSize applies per cluster node and not for the whole cluster.
  59. PoolSize int
  60. MinIdleConns int
  61. MaxConnAge time.Duration
  62. PoolTimeout time.Duration
  63. IdleTimeout time.Duration
  64. IdleCheckFrequency time.Duration
  65. TLSConfig *tls.Config
  66. }
  67. func (opt *ClusterOptions) init() {
  68. if opt.MaxRedirects == -1 {
  69. opt.MaxRedirects = 0
  70. } else if opt.MaxRedirects == 0 {
  71. opt.MaxRedirects = 3
  72. }
  73. if opt.RouteByLatency || opt.RouteRandomly {
  74. opt.ReadOnly = true
  75. }
  76. if opt.PoolSize == 0 {
  77. opt.PoolSize = 5 * runtime.GOMAXPROCS(0)
  78. }
  79. switch opt.ReadTimeout {
  80. case -1:
  81. opt.ReadTimeout = 0
  82. case 0:
  83. opt.ReadTimeout = 3 * time.Second
  84. }
  85. switch opt.WriteTimeout {
  86. case -1:
  87. opt.WriteTimeout = 0
  88. case 0:
  89. opt.WriteTimeout = opt.ReadTimeout
  90. }
  91. if opt.MaxRetries == 0 {
  92. opt.MaxRetries = -1
  93. }
  94. switch opt.MinRetryBackoff {
  95. case -1:
  96. opt.MinRetryBackoff = 0
  97. case 0:
  98. opt.MinRetryBackoff = 8 * time.Millisecond
  99. }
  100. switch opt.MaxRetryBackoff {
  101. case -1:
  102. opt.MaxRetryBackoff = 0
  103. case 0:
  104. opt.MaxRetryBackoff = 512 * time.Millisecond
  105. }
  106. if opt.NewClient == nil {
  107. opt.NewClient = NewClient
  108. }
  109. }
  110. func (opt *ClusterOptions) clientOptions() *Options {
  111. const disableIdleCheck = -1
  112. return &Options{
  113. Dialer: opt.Dialer,
  114. OnConnect: opt.OnConnect,
  115. Username: opt.Username,
  116. Password: opt.Password,
  117. MaxRetries: opt.MaxRetries,
  118. MinRetryBackoff: opt.MinRetryBackoff,
  119. MaxRetryBackoff: opt.MaxRetryBackoff,
  120. DialTimeout: opt.DialTimeout,
  121. ReadTimeout: opt.ReadTimeout,
  122. WriteTimeout: opt.WriteTimeout,
  123. PoolFIFO: opt.PoolFIFO,
  124. PoolSize: opt.PoolSize,
  125. MinIdleConns: opt.MinIdleConns,
  126. MaxConnAge: opt.MaxConnAge,
  127. PoolTimeout: opt.PoolTimeout,
  128. IdleTimeout: opt.IdleTimeout,
  129. IdleCheckFrequency: disableIdleCheck,
  130. TLSConfig: opt.TLSConfig,
  131. // If ClusterSlots is populated, then we probably have an artificial
  132. // cluster whose nodes are not in clustering mode (otherwise there isn't
  133. // much use for ClusterSlots config). This means we cannot execute the
  134. // READONLY command against that node -- setting readOnly to false in such
  135. // situations in the options below will prevent that from happening.
  136. readOnly: opt.ReadOnly && opt.ClusterSlots == nil,
  137. }
  138. }
  139. //------------------------------------------------------------------------------
  140. type clusterNode struct {
  141. Client *Client
  142. latency uint32 // atomic
  143. generation uint32 // atomic
  144. failing uint32 // atomic
  145. }
  146. func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
  147. opt := clOpt.clientOptions()
  148. opt.Addr = addr
  149. node := clusterNode{
  150. Client: clOpt.NewClient(opt),
  151. }
  152. node.latency = math.MaxUint32
  153. if clOpt.RouteByLatency {
  154. go node.updateLatency()
  155. }
  156. return &node
  157. }
  158. func (n *clusterNode) String() string {
  159. return n.Client.String()
  160. }
  161. func (n *clusterNode) Close() error {
  162. return n.Client.Close()
  163. }
  164. func (n *clusterNode) updateLatency() {
  165. const numProbe = 10
  166. var dur uint64
  167. for i := 0; i < numProbe; i++ {
  168. time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
  169. start := time.Now()
  170. n.Client.Ping(context.TODO())
  171. dur += uint64(time.Since(start) / time.Microsecond)
  172. }
  173. latency := float64(dur) / float64(numProbe)
  174. atomic.StoreUint32(&n.latency, uint32(latency+0.5))
  175. }
  176. func (n *clusterNode) Latency() time.Duration {
  177. latency := atomic.LoadUint32(&n.latency)
  178. return time.Duration(latency) * time.Microsecond
  179. }
  180. func (n *clusterNode) MarkAsFailing() {
  181. atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
  182. }
  183. func (n *clusterNode) Failing() bool {
  184. const timeout = 15 // 15 seconds
  185. failing := atomic.LoadUint32(&n.failing)
  186. if failing == 0 {
  187. return false
  188. }
  189. if time.Now().Unix()-int64(failing) < timeout {
  190. return true
  191. }
  192. atomic.StoreUint32(&n.failing, 0)
  193. return false
  194. }
  195. func (n *clusterNode) Generation() uint32 {
  196. return atomic.LoadUint32(&n.generation)
  197. }
  198. func (n *clusterNode) SetGeneration(gen uint32) {
  199. for {
  200. v := atomic.LoadUint32(&n.generation)
  201. if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
  202. break
  203. }
  204. }
  205. }
  206. //------------------------------------------------------------------------------
  207. type clusterNodes struct {
  208. opt *ClusterOptions
  209. mu sync.RWMutex
  210. addrs []string
  211. nodes map[string]*clusterNode
  212. activeAddrs []string
  213. closed bool
  214. _generation uint32 // atomic
  215. }
  216. func newClusterNodes(opt *ClusterOptions) *clusterNodes {
  217. return &clusterNodes{
  218. opt: opt,
  219. addrs: opt.Addrs,
  220. nodes: make(map[string]*clusterNode),
  221. }
  222. }
  223. func (c *clusterNodes) Close() error {
  224. c.mu.Lock()
  225. defer c.mu.Unlock()
  226. if c.closed {
  227. return nil
  228. }
  229. c.closed = true
  230. var firstErr error
  231. for _, node := range c.nodes {
  232. if err := node.Client.Close(); err != nil && firstErr == nil {
  233. firstErr = err
  234. }
  235. }
  236. c.nodes = nil
  237. c.activeAddrs = nil
  238. return firstErr
  239. }
  240. func (c *clusterNodes) Addrs() ([]string, error) {
  241. var addrs []string
  242. c.mu.RLock()
  243. closed := c.closed //nolint:ifshort
  244. if !closed {
  245. if len(c.activeAddrs) > 0 {
  246. addrs = c.activeAddrs
  247. } else {
  248. addrs = c.addrs
  249. }
  250. }
  251. c.mu.RUnlock()
  252. if closed {
  253. return nil, pool.ErrClosed
  254. }
  255. if len(addrs) == 0 {
  256. return nil, errClusterNoNodes
  257. }
  258. return addrs, nil
  259. }
  260. func (c *clusterNodes) NextGeneration() uint32 {
  261. return atomic.AddUint32(&c._generation, 1)
  262. }
  263. // GC removes unused nodes.
  264. func (c *clusterNodes) GC(generation uint32) {
  265. //nolint:prealloc
  266. var collected []*clusterNode
  267. c.mu.Lock()
  268. c.activeAddrs = c.activeAddrs[:0]
  269. for addr, node := range c.nodes {
  270. if node.Generation() >= generation {
  271. c.activeAddrs = append(c.activeAddrs, addr)
  272. if c.opt.RouteByLatency {
  273. go node.updateLatency()
  274. }
  275. continue
  276. }
  277. delete(c.nodes, addr)
  278. collected = append(collected, node)
  279. }
  280. c.mu.Unlock()
  281. for _, node := range collected {
  282. _ = node.Client.Close()
  283. }
  284. }
  285. func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
  286. node, err := c.get(addr)
  287. if err != nil {
  288. return nil, err
  289. }
  290. if node != nil {
  291. return node, nil
  292. }
  293. c.mu.Lock()
  294. defer c.mu.Unlock()
  295. if c.closed {
  296. return nil, pool.ErrClosed
  297. }
  298. node, ok := c.nodes[addr]
  299. if ok {
  300. return node, nil
  301. }
  302. node = newClusterNode(c.opt, addr)
  303. c.addrs = appendIfNotExists(c.addrs, addr)
  304. c.nodes[addr] = node
  305. return node, nil
  306. }
  307. func (c *clusterNodes) get(addr string) (*clusterNode, error) {
  308. var node *clusterNode
  309. var err error
  310. c.mu.RLock()
  311. if c.closed {
  312. err = pool.ErrClosed
  313. } else {
  314. node = c.nodes[addr]
  315. }
  316. c.mu.RUnlock()
  317. return node, err
  318. }
  319. func (c *clusterNodes) All() ([]*clusterNode, error) {
  320. c.mu.RLock()
  321. defer c.mu.RUnlock()
  322. if c.closed {
  323. return nil, pool.ErrClosed
  324. }
  325. cp := make([]*clusterNode, 0, len(c.nodes))
  326. for _, node := range c.nodes {
  327. cp = append(cp, node)
  328. }
  329. return cp, nil
  330. }
  331. func (c *clusterNodes) Random() (*clusterNode, error) {
  332. addrs, err := c.Addrs()
  333. if err != nil {
  334. return nil, err
  335. }
  336. n := rand.Intn(len(addrs))
  337. return c.GetOrCreate(addrs[n])
  338. }
  339. //------------------------------------------------------------------------------
  340. type clusterSlot struct {
  341. start, end int
  342. nodes []*clusterNode
  343. }
  344. type clusterSlotSlice []*clusterSlot
  345. func (p clusterSlotSlice) Len() int {
  346. return len(p)
  347. }
  348. func (p clusterSlotSlice) Less(i, j int) bool {
  349. return p[i].start < p[j].start
  350. }
  351. func (p clusterSlotSlice) Swap(i, j int) {
  352. p[i], p[j] = p[j], p[i]
  353. }
  354. type clusterState struct {
  355. nodes *clusterNodes
  356. Masters []*clusterNode
  357. Slaves []*clusterNode
  358. slots []*clusterSlot
  359. generation uint32
  360. createdAt time.Time
  361. }
  362. func newClusterState(
  363. nodes *clusterNodes, slots []ClusterSlot, origin string,
  364. ) (*clusterState, error) {
  365. c := clusterState{
  366. nodes: nodes,
  367. slots: make([]*clusterSlot, 0, len(slots)),
  368. generation: nodes.NextGeneration(),
  369. createdAt: time.Now(),
  370. }
  371. originHost, _, _ := net.SplitHostPort(origin)
  372. isLoopbackOrigin := isLoopback(originHost)
  373. for _, slot := range slots {
  374. var nodes []*clusterNode
  375. for i, slotNode := range slot.Nodes {
  376. addr := slotNode.Addr
  377. if !isLoopbackOrigin {
  378. addr = replaceLoopbackHost(addr, originHost)
  379. }
  380. node, err := c.nodes.GetOrCreate(addr)
  381. if err != nil {
  382. return nil, err
  383. }
  384. node.SetGeneration(c.generation)
  385. nodes = append(nodes, node)
  386. if i == 0 {
  387. c.Masters = appendUniqueNode(c.Masters, node)
  388. } else {
  389. c.Slaves = appendUniqueNode(c.Slaves, node)
  390. }
  391. }
  392. c.slots = append(c.slots, &clusterSlot{
  393. start: slot.Start,
  394. end: slot.End,
  395. nodes: nodes,
  396. })
  397. }
  398. sort.Sort(clusterSlotSlice(c.slots))
  399. time.AfterFunc(time.Minute, func() {
  400. nodes.GC(c.generation)
  401. })
  402. return &c, nil
  403. }
  404. func replaceLoopbackHost(nodeAddr, originHost string) string {
  405. nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
  406. if err != nil {
  407. return nodeAddr
  408. }
  409. nodeIP := net.ParseIP(nodeHost)
  410. if nodeIP == nil {
  411. return nodeAddr
  412. }
  413. if !nodeIP.IsLoopback() {
  414. return nodeAddr
  415. }
  416. // Use origin host which is not loopback and node port.
  417. return net.JoinHostPort(originHost, nodePort)
  418. }
  419. func isLoopback(host string) bool {
  420. ip := net.ParseIP(host)
  421. if ip == nil {
  422. return true
  423. }
  424. return ip.IsLoopback()
  425. }
  426. func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
  427. nodes := c.slotNodes(slot)
  428. if len(nodes) > 0 {
  429. return nodes[0], nil
  430. }
  431. return c.nodes.Random()
  432. }
  433. func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
  434. nodes := c.slotNodes(slot)
  435. switch len(nodes) {
  436. case 0:
  437. return c.nodes.Random()
  438. case 1:
  439. return nodes[0], nil
  440. case 2:
  441. if slave := nodes[1]; !slave.Failing() {
  442. return slave, nil
  443. }
  444. return nodes[0], nil
  445. default:
  446. var slave *clusterNode
  447. for i := 0; i < 10; i++ {
  448. n := rand.Intn(len(nodes)-1) + 1
  449. slave = nodes[n]
  450. if !slave.Failing() {
  451. return slave, nil
  452. }
  453. }
  454. // All slaves are loading - use master.
  455. return nodes[0], nil
  456. }
  457. }
  458. func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
  459. nodes := c.slotNodes(slot)
  460. if len(nodes) == 0 {
  461. return c.nodes.Random()
  462. }
  463. var node *clusterNode
  464. for _, n := range nodes {
  465. if n.Failing() {
  466. continue
  467. }
  468. if node == nil || n.Latency() < node.Latency() {
  469. node = n
  470. }
  471. }
  472. if node != nil {
  473. return node, nil
  474. }
  475. // If all nodes are failing - return random node
  476. return c.nodes.Random()
  477. }
  478. func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
  479. nodes := c.slotNodes(slot)
  480. if len(nodes) == 0 {
  481. return c.nodes.Random()
  482. }
  483. if len(nodes) == 1 {
  484. return nodes[0], nil
  485. }
  486. randomNodes := rand.Perm(len(nodes))
  487. for _, idx := range randomNodes {
  488. if node := nodes[idx]; !node.Failing() {
  489. return node, nil
  490. }
  491. }
  492. return nodes[randomNodes[0]], nil
  493. }
  494. func (c *clusterState) slotNodes(slot int) []*clusterNode {
  495. i := sort.Search(len(c.slots), func(i int) bool {
  496. return c.slots[i].end >= slot
  497. })
  498. if i >= len(c.slots) {
  499. return nil
  500. }
  501. x := c.slots[i]
  502. if slot >= x.start && slot <= x.end {
  503. return x.nodes
  504. }
  505. return nil
  506. }
  507. //------------------------------------------------------------------------------
  508. type clusterStateHolder struct {
  509. load func(ctx context.Context) (*clusterState, error)
  510. state atomic.Value
  511. reloading uint32 // atomic
  512. }
  513. func newClusterStateHolder(fn func(ctx context.Context) (*clusterState, error)) *clusterStateHolder {
  514. return &clusterStateHolder{
  515. load: fn,
  516. }
  517. }
  518. func (c *clusterStateHolder) Reload(ctx context.Context) (*clusterState, error) {
  519. state, err := c.load(ctx)
  520. if err != nil {
  521. return nil, err
  522. }
  523. c.state.Store(state)
  524. return state, nil
  525. }
  526. func (c *clusterStateHolder) LazyReload() {
  527. if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
  528. return
  529. }
  530. go func() {
  531. defer atomic.StoreUint32(&c.reloading, 0)
  532. _, err := c.Reload(context.Background())
  533. if err != nil {
  534. return
  535. }
  536. time.Sleep(200 * time.Millisecond)
  537. }()
  538. }
  539. func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {
  540. v := c.state.Load()
  541. if v == nil {
  542. return c.Reload(ctx)
  543. }
  544. state := v.(*clusterState)
  545. if time.Since(state.createdAt) > 10*time.Second {
  546. c.LazyReload()
  547. }
  548. return state, nil
  549. }
  550. func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, error) {
  551. state, err := c.Reload(ctx)
  552. if err == nil {
  553. return state, nil
  554. }
  555. return c.Get(ctx)
  556. }
  557. //------------------------------------------------------------------------------
  558. type clusterClient struct {
  559. opt *ClusterOptions
  560. nodes *clusterNodes
  561. state *clusterStateHolder //nolint:structcheck
  562. cmdsInfoCache *cmdsInfoCache //nolint:structcheck
  563. }
  564. // ClusterClient is a Redis Cluster client representing a pool of zero
  565. // or more underlying connections. It's safe for concurrent use by
  566. // multiple goroutines.
  567. type ClusterClient struct {
  568. *clusterClient
  569. cmdable
  570. hooks
  571. ctx context.Context
  572. }
  573. // NewClusterClient returns a Redis Cluster client as described in
  574. // http://redis.io/topics/cluster-spec.
  575. func NewClusterClient(opt *ClusterOptions) *ClusterClient {
  576. opt.init()
  577. c := &ClusterClient{
  578. clusterClient: &clusterClient{
  579. opt: opt,
  580. nodes: newClusterNodes(opt),
  581. },
  582. ctx: context.Background(),
  583. }
  584. c.state = newClusterStateHolder(c.loadState)
  585. c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
  586. c.cmdable = c.Process
  587. if opt.IdleCheckFrequency > 0 {
  588. go c.reaper(opt.IdleCheckFrequency)
  589. }
  590. return c
  591. }
  592. func (c *ClusterClient) Context() context.Context {
  593. return c.ctx
  594. }
  595. func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
  596. if ctx == nil {
  597. panic("nil context")
  598. }
  599. clone := *c
  600. clone.cmdable = clone.Process
  601. clone.hooks.lock()
  602. clone.ctx = ctx
  603. return &clone
  604. }
  605. // Options returns read-only Options that were used to create the client.
  606. func (c *ClusterClient) Options() *ClusterOptions {
  607. return c.opt
  608. }
  609. // ReloadState reloads cluster state. If available it calls ClusterSlots func
  610. // to get cluster slots information.
  611. func (c *ClusterClient) ReloadState(ctx context.Context) {
  612. c.state.LazyReload()
  613. }
  614. // Close closes the cluster client, releasing any open resources.
  615. //
  616. // It is rare to Close a ClusterClient, as the ClusterClient is meant
  617. // to be long-lived and shared between many goroutines.
  618. func (c *ClusterClient) Close() error {
  619. return c.nodes.Close()
  620. }
  621. // Do creates a Cmd from the args and processes the cmd.
  622. func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
  623. cmd := NewCmd(ctx, args...)
  624. _ = c.Process(ctx, cmd)
  625. return cmd
  626. }
  627. func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
  628. return c.hooks.process(ctx, cmd, c.process)
  629. }
  630. func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
  631. cmdInfo := c.cmdInfo(cmd.Name())
  632. slot := c.cmdSlot(cmd)
  633. var node *clusterNode
  634. var ask bool
  635. var lastErr error
  636. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  637. if attempt > 0 {
  638. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  639. return err
  640. }
  641. }
  642. if node == nil {
  643. var err error
  644. node, err = c.cmdNode(ctx, cmdInfo, slot)
  645. if err != nil {
  646. return err
  647. }
  648. }
  649. if ask {
  650. pipe := node.Client.Pipeline()
  651. _ = pipe.Process(ctx, NewCmd(ctx, "asking"))
  652. _ = pipe.Process(ctx, cmd)
  653. _, lastErr = pipe.Exec(ctx)
  654. _ = pipe.Close()
  655. ask = false
  656. } else {
  657. lastErr = node.Client.Process(ctx, cmd)
  658. }
  659. // If there is no error - we are done.
  660. if lastErr == nil {
  661. return nil
  662. }
  663. if isReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
  664. if isReadOnly {
  665. c.state.LazyReload()
  666. }
  667. node = nil
  668. continue
  669. }
  670. // If slave is loading - pick another node.
  671. if c.opt.ReadOnly && isLoadingError(lastErr) {
  672. node.MarkAsFailing()
  673. node = nil
  674. continue
  675. }
  676. var moved bool
  677. var addr string
  678. moved, ask, addr = isMovedError(lastErr)
  679. if moved || ask {
  680. c.state.LazyReload()
  681. var err error
  682. node, err = c.nodes.GetOrCreate(addr)
  683. if err != nil {
  684. return err
  685. }
  686. continue
  687. }
  688. if shouldRetry(lastErr, cmd.readTimeout() == nil) {
  689. // First retry the same node.
  690. if attempt == 0 {
  691. continue
  692. }
  693. // Second try another node.
  694. node.MarkAsFailing()
  695. node = nil
  696. continue
  697. }
  698. return lastErr
  699. }
  700. return lastErr
  701. }
  702. // ForEachMaster concurrently calls the fn on each master node in the cluster.
  703. // It returns the first error if any.
  704. func (c *ClusterClient) ForEachMaster(
  705. ctx context.Context,
  706. fn func(ctx context.Context, client *Client) error,
  707. ) error {
  708. state, err := c.state.ReloadOrGet(ctx)
  709. if err != nil {
  710. return err
  711. }
  712. var wg sync.WaitGroup
  713. errCh := make(chan error, 1)
  714. for _, master := range state.Masters {
  715. wg.Add(1)
  716. go func(node *clusterNode) {
  717. defer wg.Done()
  718. err := fn(ctx, node.Client)
  719. if err != nil {
  720. select {
  721. case errCh <- err:
  722. default:
  723. }
  724. }
  725. }(master)
  726. }
  727. wg.Wait()
  728. select {
  729. case err := <-errCh:
  730. return err
  731. default:
  732. return nil
  733. }
  734. }
  735. // ForEachSlave concurrently calls the fn on each slave node in the cluster.
  736. // It returns the first error if any.
  737. func (c *ClusterClient) ForEachSlave(
  738. ctx context.Context,
  739. fn func(ctx context.Context, client *Client) error,
  740. ) error {
  741. state, err := c.state.ReloadOrGet(ctx)
  742. if err != nil {
  743. return err
  744. }
  745. var wg sync.WaitGroup
  746. errCh := make(chan error, 1)
  747. for _, slave := range state.Slaves {
  748. wg.Add(1)
  749. go func(node *clusterNode) {
  750. defer wg.Done()
  751. err := fn(ctx, node.Client)
  752. if err != nil {
  753. select {
  754. case errCh <- err:
  755. default:
  756. }
  757. }
  758. }(slave)
  759. }
  760. wg.Wait()
  761. select {
  762. case err := <-errCh:
  763. return err
  764. default:
  765. return nil
  766. }
  767. }
  768. // ForEachShard concurrently calls the fn on each known node in the cluster.
  769. // It returns the first error if any.
  770. func (c *ClusterClient) ForEachShard(
  771. ctx context.Context,
  772. fn func(ctx context.Context, client *Client) error,
  773. ) error {
  774. state, err := c.state.ReloadOrGet(ctx)
  775. if err != nil {
  776. return err
  777. }
  778. var wg sync.WaitGroup
  779. errCh := make(chan error, 1)
  780. worker := func(node *clusterNode) {
  781. defer wg.Done()
  782. err := fn(ctx, node.Client)
  783. if err != nil {
  784. select {
  785. case errCh <- err:
  786. default:
  787. }
  788. }
  789. }
  790. for _, node := range state.Masters {
  791. wg.Add(1)
  792. go worker(node)
  793. }
  794. for _, node := range state.Slaves {
  795. wg.Add(1)
  796. go worker(node)
  797. }
  798. wg.Wait()
  799. select {
  800. case err := <-errCh:
  801. return err
  802. default:
  803. return nil
  804. }
  805. }
  806. // PoolStats returns accumulated connection pool stats.
  807. func (c *ClusterClient) PoolStats() *PoolStats {
  808. var acc PoolStats
  809. state, _ := c.state.Get(context.TODO())
  810. if state == nil {
  811. return &acc
  812. }
  813. for _, node := range state.Masters {
  814. s := node.Client.connPool.Stats()
  815. acc.Hits += s.Hits
  816. acc.Misses += s.Misses
  817. acc.Timeouts += s.Timeouts
  818. acc.TotalConns += s.TotalConns
  819. acc.IdleConns += s.IdleConns
  820. acc.StaleConns += s.StaleConns
  821. }
  822. for _, node := range state.Slaves {
  823. s := node.Client.connPool.Stats()
  824. acc.Hits += s.Hits
  825. acc.Misses += s.Misses
  826. acc.Timeouts += s.Timeouts
  827. acc.TotalConns += s.TotalConns
  828. acc.IdleConns += s.IdleConns
  829. acc.StaleConns += s.StaleConns
  830. }
  831. return &acc
  832. }
  833. func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
  834. if c.opt.ClusterSlots != nil {
  835. slots, err := c.opt.ClusterSlots(ctx)
  836. if err != nil {
  837. return nil, err
  838. }
  839. return newClusterState(c.nodes, slots, "")
  840. }
  841. addrs, err := c.nodes.Addrs()
  842. if err != nil {
  843. return nil, err
  844. }
  845. var firstErr error
  846. for _, idx := range rand.Perm(len(addrs)) {
  847. addr := addrs[idx]
  848. node, err := c.nodes.GetOrCreate(addr)
  849. if err != nil {
  850. if firstErr == nil {
  851. firstErr = err
  852. }
  853. continue
  854. }
  855. slots, err := node.Client.ClusterSlots(ctx).Result()
  856. if err != nil {
  857. if firstErr == nil {
  858. firstErr = err
  859. }
  860. continue
  861. }
  862. return newClusterState(c.nodes, slots, node.Client.opt.Addr)
  863. }
  864. /*
  865. * No node is connectable. It's possible that all nodes' IP has changed.
  866. * Clear activeAddrs to let client be able to re-connect using the initial
  867. * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
  868. * which might have chance to resolve domain name and get updated IP address.
  869. */
  870. c.nodes.mu.Lock()
  871. c.nodes.activeAddrs = nil
  872. c.nodes.mu.Unlock()
  873. return nil, firstErr
  874. }
  875. // reaper closes idle connections to the cluster.
  876. func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
  877. ticker := time.NewTicker(idleCheckFrequency)
  878. defer ticker.Stop()
  879. for range ticker.C {
  880. nodes, err := c.nodes.All()
  881. if err != nil {
  882. break
  883. }
  884. for _, node := range nodes {
  885. _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
  886. if err != nil {
  887. internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
  888. }
  889. }
  890. }
  891. }
  892. func (c *ClusterClient) Pipeline() Pipeliner {
  893. pipe := Pipeline{
  894. ctx: c.ctx,
  895. exec: c.processPipeline,
  896. }
  897. pipe.init()
  898. return &pipe
  899. }
  900. func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
  901. return c.Pipeline().Pipelined(ctx, fn)
  902. }
  903. func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
  904. return c.hooks.processPipeline(ctx, cmds, c._processPipeline)
  905. }
  906. func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
  907. cmdsMap := newCmdsMap()
  908. err := c.mapCmdsByNode(ctx, cmdsMap, cmds)
  909. if err != nil {
  910. setCmdsErr(cmds, err)
  911. return err
  912. }
  913. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  914. if attempt > 0 {
  915. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  916. setCmdsErr(cmds, err)
  917. return err
  918. }
  919. }
  920. failedCmds := newCmdsMap()
  921. var wg sync.WaitGroup
  922. for node, cmds := range cmdsMap.m {
  923. wg.Add(1)
  924. go func(node *clusterNode, cmds []Cmder) {
  925. defer wg.Done()
  926. err := c._processPipelineNode(ctx, node, cmds, failedCmds)
  927. if err == nil {
  928. return
  929. }
  930. if attempt < c.opt.MaxRedirects {
  931. if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
  932. setCmdsErr(cmds, err)
  933. }
  934. } else {
  935. setCmdsErr(cmds, err)
  936. }
  937. }(node, cmds)
  938. }
  939. wg.Wait()
  940. if len(failedCmds.m) == 0 {
  941. break
  942. }
  943. cmdsMap = failedCmds
  944. }
  945. return cmdsFirstErr(cmds)
  946. }
  947. func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
  948. state, err := c.state.Get(ctx)
  949. if err != nil {
  950. return err
  951. }
  952. if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
  953. for _, cmd := range cmds {
  954. slot := c.cmdSlot(cmd)
  955. node, err := c.slotReadOnlyNode(state, slot)
  956. if err != nil {
  957. return err
  958. }
  959. cmdsMap.Add(node, cmd)
  960. }
  961. return nil
  962. }
  963. for _, cmd := range cmds {
  964. slot := c.cmdSlot(cmd)
  965. node, err := state.slotMasterNode(slot)
  966. if err != nil {
  967. return err
  968. }
  969. cmdsMap.Add(node, cmd)
  970. }
  971. return nil
  972. }
  973. func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
  974. for _, cmd := range cmds {
  975. cmdInfo := c.cmdInfo(cmd.Name())
  976. if cmdInfo == nil || !cmdInfo.ReadOnly {
  977. return false
  978. }
  979. }
  980. return true
  981. }
  982. func (c *ClusterClient) _processPipelineNode(
  983. ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
  984. ) error {
  985. return node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  986. return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  987. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  988. return writeCmds(wr, cmds)
  989. })
  990. if err != nil {
  991. return err
  992. }
  993. return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  994. return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
  995. })
  996. })
  997. })
  998. }
  999. func (c *ClusterClient) pipelineReadCmds(
  1000. ctx context.Context,
  1001. node *clusterNode,
  1002. rd *proto.Reader,
  1003. cmds []Cmder,
  1004. failedCmds *cmdsMap,
  1005. ) error {
  1006. for _, cmd := range cmds {
  1007. err := cmd.readReply(rd)
  1008. cmd.SetErr(err)
  1009. if err == nil {
  1010. continue
  1011. }
  1012. if c.checkMovedErr(ctx, cmd, err, failedCmds) {
  1013. continue
  1014. }
  1015. if c.opt.ReadOnly && isLoadingError(err) {
  1016. node.MarkAsFailing()
  1017. return err
  1018. }
  1019. if isRedisError(err) {
  1020. continue
  1021. }
  1022. return err
  1023. }
  1024. return nil
  1025. }
  1026. func (c *ClusterClient) checkMovedErr(
  1027. ctx context.Context, cmd Cmder, err error, failedCmds *cmdsMap,
  1028. ) bool {
  1029. moved, ask, addr := isMovedError(err)
  1030. if !moved && !ask {
  1031. return false
  1032. }
  1033. node, err := c.nodes.GetOrCreate(addr)
  1034. if err != nil {
  1035. return false
  1036. }
  1037. if moved {
  1038. c.state.LazyReload()
  1039. failedCmds.Add(node, cmd)
  1040. return true
  1041. }
  1042. if ask {
  1043. failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
  1044. return true
  1045. }
  1046. panic("not reached")
  1047. }
  1048. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  1049. func (c *ClusterClient) TxPipeline() Pipeliner {
  1050. pipe := Pipeline{
  1051. ctx: c.ctx,
  1052. exec: c.processTxPipeline,
  1053. }
  1054. pipe.init()
  1055. return &pipe
  1056. }
  1057. func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
  1058. return c.TxPipeline().Pipelined(ctx, fn)
  1059. }
  1060. func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  1061. return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
  1062. }
  1063. func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
  1064. // Trim multi .. exec.
  1065. cmds = cmds[1 : len(cmds)-1]
  1066. state, err := c.state.Get(ctx)
  1067. if err != nil {
  1068. setCmdsErr(cmds, err)
  1069. return err
  1070. }
  1071. cmdsMap := c.mapCmdsBySlot(cmds)
  1072. for slot, cmds := range cmdsMap {
  1073. node, err := state.slotMasterNode(slot)
  1074. if err != nil {
  1075. setCmdsErr(cmds, err)
  1076. continue
  1077. }
  1078. cmdsMap := map[*clusterNode][]Cmder{node: cmds}
  1079. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1080. if attempt > 0 {
  1081. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1082. setCmdsErr(cmds, err)
  1083. return err
  1084. }
  1085. }
  1086. failedCmds := newCmdsMap()
  1087. var wg sync.WaitGroup
  1088. for node, cmds := range cmdsMap {
  1089. wg.Add(1)
  1090. go func(node *clusterNode, cmds []Cmder) {
  1091. defer wg.Done()
  1092. err := c._processTxPipelineNode(ctx, node, cmds, failedCmds)
  1093. if err == nil {
  1094. return
  1095. }
  1096. if attempt < c.opt.MaxRedirects {
  1097. if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
  1098. setCmdsErr(cmds, err)
  1099. }
  1100. } else {
  1101. setCmdsErr(cmds, err)
  1102. }
  1103. }(node, cmds)
  1104. }
  1105. wg.Wait()
  1106. if len(failedCmds.m) == 0 {
  1107. break
  1108. }
  1109. cmdsMap = failedCmds.m
  1110. }
  1111. }
  1112. return cmdsFirstErr(cmds)
  1113. }
  1114. func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
  1115. cmdsMap := make(map[int][]Cmder)
  1116. for _, cmd := range cmds {
  1117. slot := c.cmdSlot(cmd)
  1118. cmdsMap[slot] = append(cmdsMap[slot], cmd)
  1119. }
  1120. return cmdsMap
  1121. }
  1122. func (c *ClusterClient) _processTxPipelineNode(
  1123. ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
  1124. ) error {
  1125. return node.Client.hooks.processTxPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  1126. return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  1127. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1128. return writeCmds(wr, cmds)
  1129. })
  1130. if err != nil {
  1131. return err
  1132. }
  1133. return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1134. statusCmd := cmds[0].(*StatusCmd)
  1135. // Trim multi and exec.
  1136. cmds = cmds[1 : len(cmds)-1]
  1137. err := c.txPipelineReadQueued(ctx, rd, statusCmd, cmds, failedCmds)
  1138. if err != nil {
  1139. moved, ask, addr := isMovedError(err)
  1140. if moved || ask {
  1141. return c.cmdsMoved(ctx, cmds, moved, ask, addr, failedCmds)
  1142. }
  1143. return err
  1144. }
  1145. return pipelineReadCmds(rd, cmds)
  1146. })
  1147. })
  1148. })
  1149. }
  1150. func (c *ClusterClient) txPipelineReadQueued(
  1151. ctx context.Context,
  1152. rd *proto.Reader,
  1153. statusCmd *StatusCmd,
  1154. cmds []Cmder,
  1155. failedCmds *cmdsMap,
  1156. ) error {
  1157. // Parse queued replies.
  1158. if err := statusCmd.readReply(rd); err != nil {
  1159. return err
  1160. }
  1161. for _, cmd := range cmds {
  1162. err := statusCmd.readReply(rd)
  1163. if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
  1164. continue
  1165. }
  1166. return err
  1167. }
  1168. // Parse number of replies.
  1169. line, err := rd.ReadLine()
  1170. if err != nil {
  1171. if err == Nil {
  1172. err = TxFailedErr
  1173. }
  1174. return err
  1175. }
  1176. switch line[0] {
  1177. case proto.ErrorReply:
  1178. return proto.ParseErrorReply(line)
  1179. case proto.ArrayReply:
  1180. // ok
  1181. default:
  1182. return fmt.Errorf("redis: expected '*', but got line %q", line)
  1183. }
  1184. return nil
  1185. }
  1186. func (c *ClusterClient) cmdsMoved(
  1187. ctx context.Context, cmds []Cmder,
  1188. moved, ask bool,
  1189. addr string,
  1190. failedCmds *cmdsMap,
  1191. ) error {
  1192. node, err := c.nodes.GetOrCreate(addr)
  1193. if err != nil {
  1194. return err
  1195. }
  1196. if moved {
  1197. c.state.LazyReload()
  1198. for _, cmd := range cmds {
  1199. failedCmds.Add(node, cmd)
  1200. }
  1201. return nil
  1202. }
  1203. if ask {
  1204. for _, cmd := range cmds {
  1205. failedCmds.Add(node, NewCmd(ctx, "asking"), cmd)
  1206. }
  1207. return nil
  1208. }
  1209. return nil
  1210. }
  1211. func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
  1212. if len(keys) == 0 {
  1213. return fmt.Errorf("redis: Watch requires at least one key")
  1214. }
  1215. slot := hashtag.Slot(keys[0])
  1216. for _, key := range keys[1:] {
  1217. if hashtag.Slot(key) != slot {
  1218. err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
  1219. return err
  1220. }
  1221. }
  1222. node, err := c.slotMasterNode(ctx, slot)
  1223. if err != nil {
  1224. return err
  1225. }
  1226. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1227. if attempt > 0 {
  1228. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1229. return err
  1230. }
  1231. }
  1232. err = node.Client.Watch(ctx, fn, keys...)
  1233. if err == nil {
  1234. break
  1235. }
  1236. moved, ask, addr := isMovedError(err)
  1237. if moved || ask {
  1238. node, err = c.nodes.GetOrCreate(addr)
  1239. if err != nil {
  1240. return err
  1241. }
  1242. continue
  1243. }
  1244. if isReadOnly := isReadOnlyError(err); isReadOnly || err == pool.ErrClosed {
  1245. if isReadOnly {
  1246. c.state.LazyReload()
  1247. }
  1248. node, err = c.slotMasterNode(ctx, slot)
  1249. if err != nil {
  1250. return err
  1251. }
  1252. continue
  1253. }
  1254. if shouldRetry(err, true) {
  1255. continue
  1256. }
  1257. return err
  1258. }
  1259. return err
  1260. }
  1261. func (c *ClusterClient) pubSub() *PubSub {
  1262. var node *clusterNode
  1263. pubsub := &PubSub{
  1264. opt: c.opt.clientOptions(),
  1265. newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
  1266. if node != nil {
  1267. panic("node != nil")
  1268. }
  1269. var err error
  1270. if len(channels) > 0 {
  1271. slot := hashtag.Slot(channels[0])
  1272. node, err = c.slotMasterNode(ctx, slot)
  1273. } else {
  1274. node, err = c.nodes.Random()
  1275. }
  1276. if err != nil {
  1277. return nil, err
  1278. }
  1279. cn, err := node.Client.newConn(context.TODO())
  1280. if err != nil {
  1281. node = nil
  1282. return nil, err
  1283. }
  1284. return cn, nil
  1285. },
  1286. closeConn: func(cn *pool.Conn) error {
  1287. err := node.Client.connPool.CloseConn(cn)
  1288. node = nil
  1289. return err
  1290. },
  1291. }
  1292. pubsub.init()
  1293. return pubsub
  1294. }
  1295. // Subscribe subscribes the client to the specified channels.
  1296. // Channels can be omitted to create empty subscription.
  1297. func (c *ClusterClient) Subscribe(ctx context.Context, channels ...string) *PubSub {
  1298. pubsub := c.pubSub()
  1299. if len(channels) > 0 {
  1300. _ = pubsub.Subscribe(ctx, channels...)
  1301. }
  1302. return pubsub
  1303. }
  1304. // PSubscribe subscribes the client to the given patterns.
  1305. // Patterns can be omitted to create empty subscription.
  1306. func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *PubSub {
  1307. pubsub := c.pubSub()
  1308. if len(channels) > 0 {
  1309. _ = pubsub.PSubscribe(ctx, channels...)
  1310. }
  1311. return pubsub
  1312. }
  1313. func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
  1314. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  1315. }
  1316. func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
  1317. // Try 3 random nodes.
  1318. const nodeLimit = 3
  1319. addrs, err := c.nodes.Addrs()
  1320. if err != nil {
  1321. return nil, err
  1322. }
  1323. var firstErr error
  1324. perm := rand.Perm(len(addrs))
  1325. if len(perm) > nodeLimit {
  1326. perm = perm[:nodeLimit]
  1327. }
  1328. for _, idx := range perm {
  1329. addr := addrs[idx]
  1330. node, err := c.nodes.GetOrCreate(addr)
  1331. if err != nil {
  1332. if firstErr == nil {
  1333. firstErr = err
  1334. }
  1335. continue
  1336. }
  1337. info, err := node.Client.Command(ctx).Result()
  1338. if err == nil {
  1339. return info, nil
  1340. }
  1341. if firstErr == nil {
  1342. firstErr = err
  1343. }
  1344. }
  1345. if firstErr == nil {
  1346. panic("not reached")
  1347. }
  1348. return nil, firstErr
  1349. }
  1350. func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
  1351. cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx)
  1352. if err != nil {
  1353. return nil
  1354. }
  1355. info := cmdsInfo[name]
  1356. if info == nil {
  1357. internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
  1358. }
  1359. return info
  1360. }
  1361. func (c *ClusterClient) cmdSlot(cmd Cmder) int {
  1362. args := cmd.Args()
  1363. if args[0] == "cluster" && args[1] == "getkeysinslot" {
  1364. return args[2].(int)
  1365. }
  1366. cmdInfo := c.cmdInfo(cmd.Name())
  1367. return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
  1368. }
  1369. func cmdSlot(cmd Cmder, pos int) int {
  1370. if pos == 0 {
  1371. return hashtag.RandomSlot()
  1372. }
  1373. firstKey := cmd.stringArg(pos)
  1374. return hashtag.Slot(firstKey)
  1375. }
  1376. func (c *ClusterClient) cmdNode(
  1377. ctx context.Context,
  1378. cmdInfo *CommandInfo,
  1379. slot int,
  1380. ) (*clusterNode, error) {
  1381. state, err := c.state.Get(ctx)
  1382. if err != nil {
  1383. return nil, err
  1384. }
  1385. if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
  1386. return c.slotReadOnlyNode(state, slot)
  1387. }
  1388. return state.slotMasterNode(slot)
  1389. }
  1390. func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
  1391. if c.opt.RouteByLatency {
  1392. return state.slotClosestNode(slot)
  1393. }
  1394. if c.opt.RouteRandomly {
  1395. return state.slotRandomNode(slot)
  1396. }
  1397. return state.slotSlaveNode(slot)
  1398. }
  1399. func (c *ClusterClient) slotMasterNode(ctx context.Context, slot int) (*clusterNode, error) {
  1400. state, err := c.state.Get(ctx)
  1401. if err != nil {
  1402. return nil, err
  1403. }
  1404. return state.slotMasterNode(slot)
  1405. }
  1406. // SlaveForKey gets a client for a replica node to run any command on it.
  1407. // This is especially useful if we want to run a particular lua script which has
  1408. // only read only commands on the replica.
  1409. // This is because other redis commands generally have a flag that points that
  1410. // they are read only and automatically run on the replica nodes
  1411. // if ClusterOptions.ReadOnly flag is set to true.
  1412. func (c *ClusterClient) SlaveForKey(ctx context.Context, key string) (*Client, error) {
  1413. state, err := c.state.Get(ctx)
  1414. if err != nil {
  1415. return nil, err
  1416. }
  1417. slot := hashtag.Slot(key)
  1418. node, err := c.slotReadOnlyNode(state, slot)
  1419. if err != nil {
  1420. return nil, err
  1421. }
  1422. return node.Client, err
  1423. }
  1424. // MasterForKey return a client to the master node for a particular key.
  1425. func (c *ClusterClient) MasterForKey(ctx context.Context, key string) (*Client, error) {
  1426. slot := hashtag.Slot(key)
  1427. node, err := c.slotMasterNode(ctx, slot)
  1428. if err != nil {
  1429. return nil, err
  1430. }
  1431. return node.Client, err
  1432. }
  1433. func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
  1434. for _, n := range nodes {
  1435. if n == node {
  1436. return nodes
  1437. }
  1438. }
  1439. return append(nodes, node)
  1440. }
  1441. func appendIfNotExists(ss []string, es ...string) []string {
  1442. loop:
  1443. for _, e := range es {
  1444. for _, s := range ss {
  1445. if s == e {
  1446. continue loop
  1447. }
  1448. }
  1449. ss = append(ss, e)
  1450. }
  1451. return ss
  1452. }
  1453. //------------------------------------------------------------------------------
  1454. type cmdsMap struct {
  1455. mu sync.Mutex
  1456. m map[*clusterNode][]Cmder
  1457. }
  1458. func newCmdsMap() *cmdsMap {
  1459. return &cmdsMap{
  1460. m: make(map[*clusterNode][]Cmder),
  1461. }
  1462. }
  1463. func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
  1464. m.mu.Lock()
  1465. m.m[node] = append(m.m[node], cmds...)
  1466. m.mu.Unlock()
  1467. }