cluster_commands.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package redis
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. )
  7. func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
  8. cmd := NewIntCmd(ctx, "dbsize")
  9. _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
  10. var size int64
  11. err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
  12. n, err := master.DBSize(ctx).Result()
  13. if err != nil {
  14. return err
  15. }
  16. atomic.AddInt64(&size, n)
  17. return nil
  18. })
  19. if err != nil {
  20. cmd.SetErr(err)
  21. } else {
  22. cmd.val = size
  23. }
  24. return nil
  25. })
  26. return cmd
  27. }
  28. func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
  29. cmd := NewStringCmd(ctx, "script", "load", script)
  30. _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
  31. mu := &sync.Mutex{}
  32. err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
  33. val, err := shard.ScriptLoad(ctx, script).Result()
  34. if err != nil {
  35. return err
  36. }
  37. mu.Lock()
  38. if cmd.Val() == "" {
  39. cmd.val = val
  40. }
  41. mu.Unlock()
  42. return nil
  43. })
  44. if err != nil {
  45. cmd.SetErr(err)
  46. }
  47. return nil
  48. })
  49. return cmd
  50. }
  51. func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
  52. cmd := NewStatusCmd(ctx, "script", "flush")
  53. _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
  54. err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
  55. return shard.ScriptFlush(ctx).Err()
  56. })
  57. if err != nil {
  58. cmd.SetErr(err)
  59. }
  60. return nil
  61. })
  62. return cmd
  63. }
  64. func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *BoolSliceCmd {
  65. args := make([]interface{}, 2+len(hashes))
  66. args[0] = "script"
  67. args[1] = "exists"
  68. for i, hash := range hashes {
  69. args[2+i] = hash
  70. }
  71. cmd := NewBoolSliceCmd(ctx, args...)
  72. result := make([]bool, len(hashes))
  73. for i := range result {
  74. result[i] = true
  75. }
  76. _ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
  77. mu := &sync.Mutex{}
  78. err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
  79. val, err := shard.ScriptExists(ctx, hashes...).Result()
  80. if err != nil {
  81. return err
  82. }
  83. mu.Lock()
  84. for i, v := range val {
  85. result[i] = result[i] && v
  86. }
  87. mu.Unlock()
  88. return nil
  89. })
  90. if err != nil {
  91. cmd.SetErr(err)
  92. } else {
  93. cmd.val = result
  94. }
  95. return nil
  96. })
  97. return cmd
  98. }