reader.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package proto
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "github.com/go-redis/redis/v8/internal/util"
  7. )
  8. // redis resp protocol data type.
  9. const (
  10. ErrorReply = '-'
  11. StatusReply = '+'
  12. IntReply = ':'
  13. StringReply = '$'
  14. ArrayReply = '*'
  15. )
  16. //------------------------------------------------------------------------------
  17. const Nil = RedisError("redis: nil") // nolint:errname
  18. type RedisError string
  19. func (e RedisError) Error() string { return string(e) }
  20. func (RedisError) RedisError() {}
  21. //------------------------------------------------------------------------------
  22. type MultiBulkParse func(*Reader, int64) (interface{}, error)
  23. type Reader struct {
  24. rd *bufio.Reader
  25. _buf []byte
  26. }
  27. func NewReader(rd io.Reader) *Reader {
  28. return &Reader{
  29. rd: bufio.NewReader(rd),
  30. _buf: make([]byte, 64),
  31. }
  32. }
  33. func (r *Reader) Buffered() int {
  34. return r.rd.Buffered()
  35. }
  36. func (r *Reader) Peek(n int) ([]byte, error) {
  37. return r.rd.Peek(n)
  38. }
  39. func (r *Reader) Reset(rd io.Reader) {
  40. r.rd.Reset(rd)
  41. }
  42. func (r *Reader) ReadLine() ([]byte, error) {
  43. line, err := r.readLine()
  44. if err != nil {
  45. return nil, err
  46. }
  47. if isNilReply(line) {
  48. return nil, Nil
  49. }
  50. return line, nil
  51. }
  52. // readLine that returns an error if:
  53. // - there is a pending read error;
  54. // - or line does not end with \r\n.
  55. func (r *Reader) readLine() ([]byte, error) {
  56. b, err := r.rd.ReadSlice('\n')
  57. if err != nil {
  58. if err != bufio.ErrBufferFull {
  59. return nil, err
  60. }
  61. full := make([]byte, len(b))
  62. copy(full, b)
  63. b, err = r.rd.ReadBytes('\n')
  64. if err != nil {
  65. return nil, err
  66. }
  67. full = append(full, b...) //nolint:makezero
  68. b = full
  69. }
  70. if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
  71. return nil, fmt.Errorf("redis: invalid reply: %q", b)
  72. }
  73. return b[:len(b)-2], nil
  74. }
  75. func (r *Reader) ReadReply(m MultiBulkParse) (interface{}, error) {
  76. line, err := r.ReadLine()
  77. if err != nil {
  78. return nil, err
  79. }
  80. switch line[0] {
  81. case ErrorReply:
  82. return nil, ParseErrorReply(line)
  83. case StatusReply:
  84. return string(line[1:]), nil
  85. case IntReply:
  86. return util.ParseInt(line[1:], 10, 64)
  87. case StringReply:
  88. return r.readStringReply(line)
  89. case ArrayReply:
  90. n, err := parseArrayLen(line)
  91. if err != nil {
  92. return nil, err
  93. }
  94. if m == nil {
  95. err := fmt.Errorf("redis: got %.100q, but multi bulk parser is nil", line)
  96. return nil, err
  97. }
  98. return m(r, n)
  99. }
  100. return nil, fmt.Errorf("redis: can't parse %.100q", line)
  101. }
  102. func (r *Reader) ReadIntReply() (int64, error) {
  103. line, err := r.ReadLine()
  104. if err != nil {
  105. return 0, err
  106. }
  107. switch line[0] {
  108. case ErrorReply:
  109. return 0, ParseErrorReply(line)
  110. case IntReply:
  111. return util.ParseInt(line[1:], 10, 64)
  112. default:
  113. return 0, fmt.Errorf("redis: can't parse int reply: %.100q", line)
  114. }
  115. }
  116. func (r *Reader) ReadString() (string, error) {
  117. line, err := r.ReadLine()
  118. if err != nil {
  119. return "", err
  120. }
  121. switch line[0] {
  122. case ErrorReply:
  123. return "", ParseErrorReply(line)
  124. case StringReply:
  125. return r.readStringReply(line)
  126. case StatusReply:
  127. return string(line[1:]), nil
  128. case IntReply:
  129. return string(line[1:]), nil
  130. default:
  131. return "", fmt.Errorf("redis: can't parse reply=%.100q reading string", line)
  132. }
  133. }
  134. func (r *Reader) readStringReply(line []byte) (string, error) {
  135. if isNilReply(line) {
  136. return "", Nil
  137. }
  138. replyLen, err := util.Atoi(line[1:])
  139. if err != nil {
  140. return "", err
  141. }
  142. b := make([]byte, replyLen+2)
  143. _, err = io.ReadFull(r.rd, b)
  144. if err != nil {
  145. return "", err
  146. }
  147. return util.BytesToString(b[:replyLen]), nil
  148. }
  149. func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error) {
  150. line, err := r.ReadLine()
  151. if err != nil {
  152. return nil, err
  153. }
  154. switch line[0] {
  155. case ErrorReply:
  156. return nil, ParseErrorReply(line)
  157. case ArrayReply:
  158. n, err := parseArrayLen(line)
  159. if err != nil {
  160. return nil, err
  161. }
  162. return m(r, n)
  163. default:
  164. return nil, fmt.Errorf("redis: can't parse array reply: %.100q", line)
  165. }
  166. }
  167. func (r *Reader) ReadArrayLen() (int, error) {
  168. line, err := r.ReadLine()
  169. if err != nil {
  170. return 0, err
  171. }
  172. switch line[0] {
  173. case ErrorReply:
  174. return 0, ParseErrorReply(line)
  175. case ArrayReply:
  176. n, err := parseArrayLen(line)
  177. if err != nil {
  178. return 0, err
  179. }
  180. return int(n), nil
  181. default:
  182. return 0, fmt.Errorf("redis: can't parse array reply: %.100q", line)
  183. }
  184. }
  185. func (r *Reader) ReadScanReply() ([]string, uint64, error) {
  186. n, err := r.ReadArrayLen()
  187. if err != nil {
  188. return nil, 0, err
  189. }
  190. if n != 2 {
  191. return nil, 0, fmt.Errorf("redis: got %d elements in scan reply, expected 2", n)
  192. }
  193. cursor, err := r.ReadUint()
  194. if err != nil {
  195. return nil, 0, err
  196. }
  197. n, err = r.ReadArrayLen()
  198. if err != nil {
  199. return nil, 0, err
  200. }
  201. keys := make([]string, n)
  202. for i := 0; i < n; i++ {
  203. key, err := r.ReadString()
  204. if err != nil {
  205. return nil, 0, err
  206. }
  207. keys[i] = key
  208. }
  209. return keys, cursor, err
  210. }
  211. func (r *Reader) ReadInt() (int64, error) {
  212. b, err := r.readTmpBytesReply()
  213. if err != nil {
  214. return 0, err
  215. }
  216. return util.ParseInt(b, 10, 64)
  217. }
  218. func (r *Reader) ReadUint() (uint64, error) {
  219. b, err := r.readTmpBytesReply()
  220. if err != nil {
  221. return 0, err
  222. }
  223. return util.ParseUint(b, 10, 64)
  224. }
  225. func (r *Reader) ReadFloatReply() (float64, error) {
  226. b, err := r.readTmpBytesReply()
  227. if err != nil {
  228. return 0, err
  229. }
  230. return util.ParseFloat(b, 64)
  231. }
  232. func (r *Reader) readTmpBytesReply() ([]byte, error) {
  233. line, err := r.ReadLine()
  234. if err != nil {
  235. return nil, err
  236. }
  237. switch line[0] {
  238. case ErrorReply:
  239. return nil, ParseErrorReply(line)
  240. case StringReply:
  241. return r._readTmpBytesReply(line)
  242. case StatusReply:
  243. return line[1:], nil
  244. default:
  245. return nil, fmt.Errorf("redis: can't parse string reply: %.100q", line)
  246. }
  247. }
  248. func (r *Reader) _readTmpBytesReply(line []byte) ([]byte, error) {
  249. if isNilReply(line) {
  250. return nil, Nil
  251. }
  252. replyLen, err := util.Atoi(line[1:])
  253. if err != nil {
  254. return nil, err
  255. }
  256. buf := r.buf(replyLen + 2)
  257. _, err = io.ReadFull(r.rd, buf)
  258. if err != nil {
  259. return nil, err
  260. }
  261. return buf[:replyLen], nil
  262. }
  263. func (r *Reader) buf(n int) []byte {
  264. if n <= cap(r._buf) {
  265. return r._buf[:n]
  266. }
  267. d := n - cap(r._buf)
  268. r._buf = append(r._buf, make([]byte, d)...)
  269. return r._buf
  270. }
  271. func isNilReply(b []byte) bool {
  272. return len(b) == 3 &&
  273. (b[0] == StringReply || b[0] == ArrayReply) &&
  274. b[1] == '-' && b[2] == '1'
  275. }
  276. func ParseErrorReply(line []byte) error {
  277. return RedisError(string(line[1:]))
  278. }
  279. func parseArrayLen(line []byte) (int64, error) {
  280. if isNilReply(line) {
  281. return 0, Nil
  282. }
  283. return util.ParseInt(line[1:], 10, 64)
  284. }