breaker.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. // Package breaker implements the circuit-breaker resiliency pattern for Go.
  2. package breaker
  3. import (
  4. "errors"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. )
  9. // ErrBreakerOpen is the error returned from Run() when the function is not executed
  10. // because the breaker is currently open.
  11. var ErrBreakerOpen = errors.New("circuit breaker is open")
  12. // State is a type representing the possible states of a circuit breaker.
  13. type State uint32
  14. const (
  15. Closed State = iota
  16. Open
  17. HalfOpen
  18. )
  19. // Breaker implements the circuit-breaker resiliency pattern
  20. type Breaker struct {
  21. errorThreshold, successThreshold int
  22. timeout time.Duration
  23. lock sync.Mutex
  24. state State
  25. errors, successes int
  26. lastError time.Time
  27. }
  28. // New constructs a new circuit-breaker that starts closed.
  29. // From closed, the breaker opens if "errorThreshold" errors are seen
  30. // without an error-free period of at least "timeout". From open, the
  31. // breaker half-closes after "timeout". From half-open, the breaker closes
  32. // after "successThreshold" consecutive successes, or opens on a single error.
  33. func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker {
  34. return &Breaker{
  35. errorThreshold: errorThreshold,
  36. successThreshold: successThreshold,
  37. timeout: timeout,
  38. }
  39. }
  40. // Run will either return ErrBreakerOpen immediately if the circuit-breaker is
  41. // already open, or it will run the given function and pass along its return
  42. // value. It is safe to call Run concurrently on the same Breaker.
  43. func (b *Breaker) Run(work func() error) error {
  44. state := b.GetState()
  45. if state == Open {
  46. return ErrBreakerOpen
  47. }
  48. return b.doWork(state, work)
  49. }
  50. // Go will either return ErrBreakerOpen immediately if the circuit-breaker is
  51. // already open, or it will run the given function in a separate goroutine.
  52. // If the function is run, Go will return nil immediately, and will *not* return
  53. // the return value of the function. It is safe to call Go concurrently on the
  54. // same Breaker.
  55. func (b *Breaker) Go(work func() error) error {
  56. state := b.GetState()
  57. if state == Open {
  58. return ErrBreakerOpen
  59. }
  60. // errcheck complains about ignoring the error return value, but
  61. // that's on purpose; if you want an error from a goroutine you have to
  62. // get it over a channel or something
  63. go b.doWork(state, work)
  64. return nil
  65. }
  66. // GetState returns the current State of the circuit-breaker at the moment
  67. // that it is called.
  68. func (b *Breaker) GetState() State {
  69. return (State)(atomic.LoadUint32((*uint32)(&b.state)))
  70. }
  71. func (b *Breaker) doWork(state State, work func() error) error {
  72. var panicValue interface{}
  73. result := func() error {
  74. defer func() {
  75. panicValue = recover()
  76. }()
  77. return work()
  78. }()
  79. if result == nil && panicValue == nil && state == Closed {
  80. // short-circuit the normal, success path without contending
  81. // on the lock
  82. return nil
  83. }
  84. // oh well, I guess we have to contend on the lock
  85. b.processResult(result, panicValue)
  86. if panicValue != nil {
  87. // as close as Go lets us come to a "rethrow" although unfortunately
  88. // we lose the original panicing location
  89. panic(panicValue)
  90. }
  91. return result
  92. }
  93. func (b *Breaker) processResult(result error, panicValue interface{}) {
  94. b.lock.Lock()
  95. defer b.lock.Unlock()
  96. if result == nil && panicValue == nil {
  97. if b.state == HalfOpen {
  98. b.successes++
  99. if b.successes == b.successThreshold {
  100. b.closeBreaker()
  101. }
  102. }
  103. } else {
  104. if b.errors > 0 {
  105. expiry := b.lastError.Add(b.timeout)
  106. if time.Now().After(expiry) {
  107. b.errors = 0
  108. }
  109. }
  110. switch b.state {
  111. case Closed:
  112. b.errors++
  113. if b.errors == b.errorThreshold {
  114. b.openBreaker()
  115. } else {
  116. b.lastError = time.Now()
  117. }
  118. case HalfOpen:
  119. b.openBreaker()
  120. }
  121. }
  122. }
  123. func (b *Breaker) openBreaker() {
  124. b.changeState(Open)
  125. go b.timer()
  126. }
  127. func (b *Breaker) closeBreaker() {
  128. b.changeState(Closed)
  129. }
  130. func (b *Breaker) timer() {
  131. time.Sleep(b.timeout)
  132. b.lock.Lock()
  133. defer b.lock.Unlock()
  134. b.changeState(HalfOpen)
  135. }
  136. func (b *Breaker) changeState(newState State) {
  137. b.errors = 0
  138. b.successes = 0
  139. atomic.StoreUint32((*uint32)(&b.state), (uint32)(newState))
  140. }