1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
| // pester provides all the logic of retries, concurrency, backoff, and logging
func (c *Client) pester(p params) (*http.Response, error) {
// 因为涉及到goroutine,所以用通道来进行通信
resultCh := make(chan result)
multiplexCh := make(chan result)
finishCh := make(chan struct{})
// track all requests that go out so we can close the late listener routine that closes late incoming response bodies
totalSentRequests := &sync.WaitGroup{}
totalSentRequests.Add(1)
defer totalSentRequests.Done()
allRequestsBackCh := make(chan struct{})
// 如果所有请求都完成了,关闭allRequestsBackCh通道
go func() {
totalSentRequests.Wait()
close(allRequestsBackCh)
}()
// GET calls should be idempotent and can make use
// of concurrency. Other verbs can mutate and should not
// make use of the concurrency feature
// 因为GET操作是幂等的,所以可以设置并发请求,但是其他方法不行,一次只能请求一次
concurrency := c.Concurrency
if p.verb != "GET" {
concurrency = 1
}
// 这里涉及到写入操作,防止竞争,上了个锁
// 如果没有给定http.client,则初始化一个
c.Lock()
if c.hc == nil {
c.hc = &http.Client{}
c.hc.Transport = c.Transport
c.hc.CheckRedirect = c.CheckRedirect
c.hc.Jar = c.Jar
c.hc.Timeout = c.Timeout
}
c.Unlock()
// re-create the http client so we can leverage the std lib
// 重新建一个http client,以便复用标准库
httpClient := http.Client{
Transport: c.hc.Transport,
CheckRedirect: c.hc.CheckRedirect,
Jar: c.hc.Jar,
Timeout: c.hc.Timeout,
}
// if we have a request body, we need to save it for later
// 如果收到的请求不为空,则需要保存内容,后面作为判断请求是否完成的依据
var originalRequestBody []byte
var originalBody []byte
var err error
if p.req != nil && p.req.Body != nil {
originalRequestBody, err = ioutil.ReadAll(p.req.Body)
if err != nil {
return nil, ErrReadingRequestBody
}
p.req.Body.Close()
}
if p.body != nil {
originalBody, err = ioutil.ReadAll(p.body)
if err != nil {
return nil, ErrReadingBody
}
}
// 最大重试次数
AttemptLimit := c.MaxRetries
if AttemptLimit <= 0 {
AttemptLimit = 1
}
for req := 0; req < concurrency; req++ {
c.wg.Add(1)
totalSentRequests.Add(1)
// 创建concurrency个请求
go func(n int, p params) {
defer c.wg.Done()
defer totalSentRequests.Done()
var err error
for i := 1; i <= AttemptLimit; i++ {
// 如果没有成功返回,则等待n秒后重试,重试次数为AttemptLimit
c.wg.Add(1)
defer c.wg.Done()
select {
case <-finishCh: // 如果请求完成,finishCh通道会被关闭,就退出阻塞状态,return
return
default:
}
// rehydrate the body (it is drained each read)
// 每次请求后清空收到请求内容,防止干扰
if len(originalRequestBody) > 0 {
p.req.Body = ioutil.NopCloser(bytes.NewBuffer(originalRequestBody))
}
if len(originalBody) > 0 {
p.body = bytes.NewBuffer(originalBody)
}
var resp *http.Response // 新建resp来接收请求
// route the calls
// 形象生动,调用路由表
switch p.method {
case "Do":
resp, err = httpClient.Do(p.req)
case "Get":
resp, err = httpClient.Get(p.url)
case "Head":
resp, err = httpClient.Head(p.url)
case "Post":
resp, err = httpClient.Post(p.url, p.bodyType, p.body)
case "PostForm":
resp, err = httpClient.PostForm(p.url, p.data)
default:
err = ErrUnexpectedMethod
}
// Early return if we have a valid result
// Only retry (ie, continue the loop) on 5xx status codes
// 如果请求为空,并且错误代码小于500(说明不是服务端的问题)
// 则向通道内输入报错日志
// 只有服务端错误才进行重试
if err == nil && resp.StatusCode < 500 {
multiplexCh <- result{resp: resp, err: err, req: n, retry: i}
return
}
// 日志格式
c.log(ErrEntry{
Time: time.Now(),
Method: p.method,
Verb: p.verb,
URL: p.url,
Request: n,
Retry: i + 1, // would remove, but would break backward compatibility
Attempt: i,
Err: err,
})
// if it is the last iteration, grab the result (which is an error at this point)
// 如果超出了最大重试次数,则向通道内输入错误,并返回
if i == AttemptLimit {
multiplexCh <- result{resp: resp, err: err}
return
}
//If the request has been cancelled, skip retries
// 如果请求被取消,则跳过重试
// p.req.Context()获取当前请求的状态
if p.req != nil {
ctx := p.req.Context()
select {
case <-ctx.Done():
multiplexCh <- result{resp: resp, err: ctx.Err()}
return
default:
}
}
// if we are retrying, we should close this response body to free the fd
// 如果进行到这里了,肯定是要重试了,先清空一波response body压压惊
if resp != nil {
resp.Body.Close()
}
// prevent a 0 from causing the tick to block, pass additional microsecond
// 定时器
<-time.After(c.Backoff(i) + 1*time.Microsecond)
}
}(req, p)
}
// spin off the go routine so it can continually listen in on late results and close the response bodies
go func() {
gotFirstResult := false
for {
select {
case res := <-multiplexCh: // 如果有请求完成,不管返回的是成功还是失败,通道中就会有数据输出
// 如果是收到的第一个请求
if !gotFirstResult {
gotFirstResult = true
close(finishCh)
resultCh <- res
} else if res.resp != nil {
// we only return one result to the caller; close all other response bodies that come back
// drain the body before close as to not prevent keepalive. see https://gist.github.com/mholt/eba0f2cc96658be0f717
// 如果不是第一个收到的请求,则将请求内容输入无底洞(放弃不是第一个的响应)
io.Copy(ioutil.Discard, res.resp.Body)
res.resp.Body.Close()
}
case <-allRequestsBackCh: // 如果所有请求都完成了,该通道被关闭,退出阻塞态
// don't leave this goroutine running
return
}
}
}()
res := <-resultCh
c.Lock()
defer c.Unlock()
c.SuccessReqNum = res.req
c.SuccessRetryNum = res.retry
return res.resp, res.err
}
|