meter.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package metrics
  2. import (
  3. "math"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. // Meters count events to produce exponentially-weighted moving average rates
  9. // at one-, five-, and fifteen-minutes and a mean rate.
  10. type Meter interface {
  11. Count() int64
  12. Mark(int64)
  13. Rate1() float64
  14. Rate5() float64
  15. Rate15() float64
  16. RateMean() float64
  17. Snapshot() Meter
  18. Stop()
  19. }
  20. // GetOrRegisterMeter returns an existing Meter or constructs and registers a
  21. // new StandardMeter.
  22. // Be sure to unregister the meter from the registry once it is of no use to
  23. // allow for garbage collection.
  24. func GetOrRegisterMeter(name string, r Registry) Meter {
  25. if nil == r {
  26. r = DefaultRegistry
  27. }
  28. return r.GetOrRegister(name, NewMeter).(Meter)
  29. }
  30. // NewMeter constructs a new StandardMeter and launches a goroutine.
  31. // Be sure to call Stop() once the meter is of no use to allow for garbage collection.
  32. func NewMeter() Meter {
  33. if UseNilMetrics {
  34. return NilMeter{}
  35. }
  36. m := newStandardMeter()
  37. arbiter.Lock()
  38. defer arbiter.Unlock()
  39. arbiter.meters[m] = struct{}{}
  40. if !arbiter.started {
  41. arbiter.started = true
  42. go arbiter.tick()
  43. }
  44. return m
  45. }
  46. // NewMeter constructs and registers a new StandardMeter and launches a
  47. // goroutine.
  48. // Be sure to unregister the meter from the registry once it is of no use to
  49. // allow for garbage collection.
  50. func NewRegisteredMeter(name string, r Registry) Meter {
  51. c := NewMeter()
  52. if nil == r {
  53. r = DefaultRegistry
  54. }
  55. r.Register(name, c)
  56. return c
  57. }
  58. // MeterSnapshot is a read-only copy of another Meter.
  59. type MeterSnapshot struct {
  60. count int64
  61. rate1, rate5, rate15, rateMean uint64
  62. }
  63. // Count returns the count of events at the time the snapshot was taken.
  64. func (m *MeterSnapshot) Count() int64 { return m.count }
  65. // Mark panics.
  66. func (*MeterSnapshot) Mark(n int64) {
  67. panic("Mark called on a MeterSnapshot")
  68. }
  69. // Rate1 returns the one-minute moving average rate of events per second at the
  70. // time the snapshot was taken.
  71. func (m *MeterSnapshot) Rate1() float64 { return math.Float64frombits(m.rate1) }
  72. // Rate5 returns the five-minute moving average rate of events per second at
  73. // the time the snapshot was taken.
  74. func (m *MeterSnapshot) Rate5() float64 { return math.Float64frombits(m.rate5) }
  75. // Rate15 returns the fifteen-minute moving average rate of events per second
  76. // at the time the snapshot was taken.
  77. func (m *MeterSnapshot) Rate15() float64 { return math.Float64frombits(m.rate15) }
  78. // RateMean returns the meter's mean rate of events per second at the time the
  79. // snapshot was taken.
  80. func (m *MeterSnapshot) RateMean() float64 { return math.Float64frombits(m.rateMean) }
  81. // Snapshot returns the snapshot.
  82. func (m *MeterSnapshot) Snapshot() Meter { return m }
  83. // Stop is a no-op.
  84. func (m *MeterSnapshot) Stop() {}
  85. // NilMeter is a no-op Meter.
  86. type NilMeter struct{}
  87. // Count is a no-op.
  88. func (NilMeter) Count() int64 { return 0 }
  89. // Mark is a no-op.
  90. func (NilMeter) Mark(n int64) {}
  91. // Rate1 is a no-op.
  92. func (NilMeter) Rate1() float64 { return 0.0 }
  93. // Rate5 is a no-op.
  94. func (NilMeter) Rate5() float64 { return 0.0 }
  95. // Rate15is a no-op.
  96. func (NilMeter) Rate15() float64 { return 0.0 }
  97. // RateMean is a no-op.
  98. func (NilMeter) RateMean() float64 { return 0.0 }
  99. // Snapshot is a no-op.
  100. func (NilMeter) Snapshot() Meter { return NilMeter{} }
  101. // Stop is a no-op.
  102. func (NilMeter) Stop() {}
  103. // StandardMeter is the standard implementation of a Meter.
  104. type StandardMeter struct {
  105. snapshot *MeterSnapshot
  106. a1, a5, a15 EWMA
  107. startTime time.Time
  108. stopped uint32
  109. }
  110. func newStandardMeter() *StandardMeter {
  111. return &StandardMeter{
  112. snapshot: &MeterSnapshot{},
  113. a1: NewEWMA1(),
  114. a5: NewEWMA5(),
  115. a15: NewEWMA15(),
  116. startTime: time.Now(),
  117. }
  118. }
  119. // Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
  120. func (m *StandardMeter) Stop() {
  121. if atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
  122. arbiter.Lock()
  123. delete(arbiter.meters, m)
  124. arbiter.Unlock()
  125. }
  126. }
  127. // Count returns the number of events recorded.
  128. func (m *StandardMeter) Count() int64 {
  129. return atomic.LoadInt64(&m.snapshot.count)
  130. }
  131. // Mark records the occurance of n events.
  132. func (m *StandardMeter) Mark(n int64) {
  133. if atomic.LoadUint32(&m.stopped) == 1 {
  134. return
  135. }
  136. atomic.AddInt64(&m.snapshot.count, n)
  137. m.a1.Update(n)
  138. m.a5.Update(n)
  139. m.a15.Update(n)
  140. m.updateSnapshot()
  141. }
  142. // Rate1 returns the one-minute moving average rate of events per second.
  143. func (m *StandardMeter) Rate1() float64 {
  144. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate1))
  145. }
  146. // Rate5 returns the five-minute moving average rate of events per second.
  147. func (m *StandardMeter) Rate5() float64 {
  148. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate5))
  149. }
  150. // Rate15 returns the fifteen-minute moving average rate of events per second.
  151. func (m *StandardMeter) Rate15() float64 {
  152. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rate15))
  153. }
  154. // RateMean returns the meter's mean rate of events per second.
  155. func (m *StandardMeter) RateMean() float64 {
  156. return math.Float64frombits(atomic.LoadUint64(&m.snapshot.rateMean))
  157. }
  158. // Snapshot returns a read-only copy of the meter.
  159. func (m *StandardMeter) Snapshot() Meter {
  160. copiedSnapshot := MeterSnapshot{
  161. count: atomic.LoadInt64(&m.snapshot.count),
  162. rate1: atomic.LoadUint64(&m.snapshot.rate1),
  163. rate5: atomic.LoadUint64(&m.snapshot.rate5),
  164. rate15: atomic.LoadUint64(&m.snapshot.rate15),
  165. rateMean: atomic.LoadUint64(&m.snapshot.rateMean),
  166. }
  167. return &copiedSnapshot
  168. }
  169. func (m *StandardMeter) updateSnapshot() {
  170. rate1 := math.Float64bits(m.a1.Rate())
  171. rate5 := math.Float64bits(m.a5.Rate())
  172. rate15 := math.Float64bits(m.a15.Rate())
  173. rateMean := math.Float64bits(float64(m.Count()) / time.Since(m.startTime).Seconds())
  174. atomic.StoreUint64(&m.snapshot.rate1, rate1)
  175. atomic.StoreUint64(&m.snapshot.rate5, rate5)
  176. atomic.StoreUint64(&m.snapshot.rate15, rate15)
  177. atomic.StoreUint64(&m.snapshot.rateMean, rateMean)
  178. }
  179. func (m *StandardMeter) tick() {
  180. m.a1.Tick()
  181. m.a5.Tick()
  182. m.a15.Tick()
  183. m.updateSnapshot()
  184. }
  185. // meterArbiter ticks meters every 5s from a single goroutine.
  186. // meters are references in a set for future stopping.
  187. type meterArbiter struct {
  188. sync.RWMutex
  189. started bool
  190. meters map[*StandardMeter]struct{}
  191. ticker *time.Ticker
  192. }
  193. var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
  194. // Ticks meters on the scheduled interval
  195. func (ma *meterArbiter) tick() {
  196. for {
  197. select {
  198. case <-ma.ticker.C:
  199. ma.tickMeters()
  200. }
  201. }
  202. }
  203. func (ma *meterArbiter) tickMeters() {
  204. ma.RLock()
  205. defer ma.RUnlock()
  206. for meter := range ma.meters {
  207. meter.tick()
  208. }
  209. }