Go语言网关开发


网络基础

协议模型

OSI七层网络协议.png

经典协议与数据包.png

TCP连接

TCP的三次握手最主要目的是保证连接是双工的,可靠更多的是通过重传机制来保证。因为连接是全双工的,双方必须都收到对方的FIN包及确认才可关闭。

三次握手连接.png

四次回收关闭.png

MSL:Maximum Segment Lifetime,30秒到1分钟。

TIME-WAIT等待2MSL是保证TCP协议的全双工连接能够可靠关闭。保证这次连接的重复数据段从网络中消失。

大量出现close_wait的原因:

  1. 一般出现在被动关闭方
  2. 并发请求太多导致
  3. 被动关闭方未及时释放端口资源导致

TCP流量控制

由于通讯双方,网速不同。通讯方任一方发送过快都会导致对方消息处理不过来,所以就需要把数据放在缓冲区中。如果缓冲区满了,发送方还在发送,那接收方只能把数据包丢弃。因此需要控制发送速率。

缓冲区大小称之为接收端口,用变量win表示。如果win=0,则发送方停止发送。

TCP流量控制.png

TCP拥塞控制

拥塞控制是调节网络的负载。接收方网络资源繁忙,因未及时响应ACK导致发送方重传大量数据,这样将会导致网络更加拥堵。拥塞控制是动态调整win大小,不知是依赖缓冲区大小确定窗口大小

慢开始和拥塞避免

慢开始与拥塞避免.png

快速重传和快速恢复

快重传和快恢复.png

粘包和拆包

发生的情况:

  • 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
  • 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
  • 进行MSS(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将会发生拆包。
  • 接收方法不及时读取套接字缓冲区数据,这将发生粘包。

数据传输协议

  • 使用带消息头的协议,头部写入包长度,然后再读取包内容
  • 设置定长消息,每次读取定长内容,长度不够时空位补固定字段
  • 设置消息边界,服务端从网络流中按消息边界分理出消息内容,一般使用\n
  • 专有的协议:json,protobuf

网络代理和网络转发区别

  • 网络代理
    • 用户不直接连接服务器,网络代理去连接。获取数据后返回给用户。
  • 网络转发
    • 路由器对报文的转发操作,中间可能对数据包修改。

网络代理

正向代理

是一种客户端的代理技术,帮助客户端访问无法访问的服务资源,可以隐藏用户真实IP。

反向代理

是一种服务端的代理技术,帮助服务器做负载均衡、缓存、提供安全校验等,可以隐藏服务器真实IP。

UDP创建过程

Server

步骤:

  1. 监听服务器
  2. 循环读取消息(不创建socket)
  3. 回复数据

Client

步骤:

  1. 连接服务器
  2. 发送数据
  3. 接收数据

TCP创建过程

Server

步骤:

  1. 监听端口
  2. 接受请求,创建套接字连接
  3. 创建独立协程
  4. 数据解码

Client

步骤:

  1. 连接服务器
  2. 将信息写入套接字,数据编码

HTTP创建过程

graph LR;
HTTP-->Server
Server-->创建路由器
Server-->设置路由规则
Server-->创建服务器
Server-->监听端口并提供服务
HTTP-->Client
Client-->创建连接池
Client-->创建客户端
Client-->请求数据
Client-->读取内容

Server

主要结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
hosts bool // whether any patterns contain hostnames
}

type muxEntry struct {
h Handler
pattern string
}

type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}

步骤:

  1. 创建路由器 http.NewServeMux
  2. 设置路由规则 http.HandleFunc->http.Handle
  3. 创建服务器 http.Server
  4. 监听端口并提供服务 http.Server.ListenAndServe->http.server.Serve->http.Accept()->http.newConn->go c.serve->mux.ServeHTTP0>mux.Handler®->mux.handler(r.host,r.URL.Path)->mux.match(path)->v,ok:=mux.m[path]

Client

主要结构体: http.Client->Timeout && Transport type RoundTripper interface

步骤:

  1. 创建连接池 http.Transport
  2. 创建客户端 http.Client
  3. 请求数据 func(c *Client) Get(url string)->c.DO(req)->c.do(req)->c.send(req,deadline)->send(req,c.transport(),deadline)->resq,err=rt.RoundTrip(req)&&func(t *Transport)roundTrip(req *Request)
  4. 读取数据
Transport
1
2
3
4
5
6
7
8
9
10
11
12
type Transport struct {
idleMu sync.Mutex
closeIdle bool // user has requested to close all idle conns
idleConn map[connectMethodKey][]*persistConn // most recently used at end
idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns
...
}

type connectMethodKey struct {
proxy, scheme, addr string
onlyH1 bool
}
persistConn
1
2
3
4
5
6
7
type persistConn struct {
br *bufio.Reader // from conn
bw *bufio.Writer // to conn
reqch chan requestAndChan // written by roundTrip; read by readLoop
writech chan writeRequest // written by roundTrip; read by writeLoop
...
}
RoundTrip流程

Transport_RoundTrip流程.png

超时时间

Client超时时间.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
   // 创建连接池
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second, //连接超时
KeepAlive: 30 * time.Second, //探活时间
}).DialContext,
MaxIdleConns: 100, //最大空闲连接
IdleConnTimeout: 90 * time.Second, //空闲超时时间
TLSHandshakeTimeout: 10 * time.Second, //tls握手超时时间
ExpectContinueTimeout: 1 * time.Second, //100-continue状态码超时时间
}
// 创建客户端
client := &http.Client{
Timeout: time.Second * 30, //请求超时时间
Transport: transport,
}

网络代理

网络代理 v.s 网络转发:

  • 网络代理:用户不直接连接服务器,网络代理去连接。获取数据后返回给用户。
  • 网络转发:是路由器对报文的转发操作,中间可能堆数据包修改。

正向代理

正向代理:是一种客户端的代理技术。帮助客户端访问无法访问的服务资源,可以隐藏用户真实IP。

  1. 代理接收客户端请求,复制原请求对象,并根据数据配置新请求各种参数
  2. 把新请求发送到真实服务端,并接收到服务器返回
  3. 代理服务器相应做一些处理,然后返回给客户端

http代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import (
"fmt"
"io"
"net"
"net/http"
"strings"
)

type Pxy struct{}

func (p *Pxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
fmt.Printf("Received request %s %s %s\n", req.Method, req.Host, req.RemoteAddr)
transport := http.DefaultTransport
// step 1,浅拷贝对象,然后就再新增属性数据
outReq := new(http.Request)
*outReq = *req
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
if prior, ok := outReq.Header["X-Forwarded-For"]; ok {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
outReq.Header.Set("X-Forwarded-For", clientIP)
}

// step 2, 请求下游
res, err := transport.RoundTrip(outReq)
if err != nil {
rw.WriteHeader(http.StatusBadGateway)
return
}

// step 3, 把下游请求内容返回给上游
for key, value := range res.Header {
for _, v := range value {
rw.Header().Add(key, v)
}
}
rw.WriteHeader(res.StatusCode)
io.Copy(rw, res.Body)
res.Body.Close()
}

func main() {
fmt.Println("Serve on :8080")
http.Handle("/", &Pxy{})
http.ListenAndServe("0.0.0.0:8080", nil)
}

sock5代理

反向代理

反向代理:是一种服务端的代理技术,帮助服务器做负载均衡、缓存、提供安全校验等,可以隐藏服务器真实IP。

  1. 代理接受客户端请求,更改请求结构体信息
  2. 通过一定的负载均衡算法获取下游服务地址
  3. 把请求发送给下游服务器,并获取返回内容
  4. 对返回内容做一些处理,然后返回给客户端

http代理

Go内置了httputil.ReverseProxy类型。内置httputil.NewSingleHostReverseProxy。可以按照其实现自己实现。

简单版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import (
"bufio"
"log"
"net/http"
"net/url"
)

var (
proxy_addr = "http://127.0.0.1:2003"
port = "2002"
)

func handler(w http.ResponseWriter, r *http.Request) {
//step 1 解析代理地址,并更改请求体的协议和主机
proxy, err := url.Parse(proxy_addr)
r.URL.Scheme = proxy.Scheme
r.URL.Host = proxy.Host

//step 2 请求下游
transport := http.DefaultTransport
resp, err := transport.RoundTrip(r)
if err != nil {
log.Print(err)
return
}

//step 3 把下游请求内容返回给上游
for k, vv := range resp.Header {
for _, v := range vv {
w.Header().Add(k, v)
}
}
defer resp.Body.Close()
bufio.NewReader(resp.Body).WriteTo(w)
}

func main() {
http.HandleFunc("/", handler)
log.Println("Start serving on port " + port)
err := http.ListenAndServe(":"+port, nil)
if err != nil {
log.Fatal(err)
}
}
ReverseProxy的特殊StatusCode
  • 100:目前一切正常,客户端可以继续请求
    • 客户端Post的数据大于1024字节的时候
    • 客户端需要先发送一个请求,包含一个Expect:100-continue,询问服务端是否愿意接收数据
    • 接收到服务端返回的100-continue应答之后,返回100状态,把数据Post到服务端
    • 相关策略
      • 客户端策略
        • 如果客户端有 post 数据要上传,可以考虑使用 100-continue 协议。在请求头中加入 {“Expect”:”100-continue”}
        • 如果没有 post 数据,不能使用 100-continue 协议,因为这会让服务端造成误解。
        • 并不是所有的 Server 都会正确实现 100-continue 协议,如果 Client 发送 Expect:100-continue 消息后,在 timeout 时间内无响应,Client 需要立马上传 post 数据。
        • 有些 Server 会错误实现 100-continue 协议,在不需要此协议时返回 100,此时客户端应该忽略。
      • 服务端策略
        • 正确情况下,收到请求后,返回 100 或错误码。
        • 如果在发送 100-continue 前收到了 post 数据(客户端提前发送 post 数据),则不发送 100 响应码(略去)。
  • 101:服务端发送给客户端升级协议的请求
ReverseProxy的特殊Header头
  • 特殊Header头
    • X-Forwarded-For
      • 记录最后直连实际服务器之前的整个代理过程
      • 可能被伪造
    • X-Real-IP
      • 请求实际服务器的IP
      • 每过一层代理都会被覆盖掉,只需第一代理设置转发
    • Connection
    • TE
    • Trailer
  • 第一代理除去标准的逐段传输头
ReverseProxy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
type ReverseProxy struct {
// 控制器必须是一个函数,函数内部可以堆请求进行修改
Director func(*http.Request)

// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
// 连接池
Transport http.RoundTripper

// FlushInterval specifies the flush interval
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
// A negative value means to flush immediately
// after each write to the client.
// The FlushInterval is ignored when ReverseProxy
// recognizes a response as a streaming response, or
// if its ContentLength is -1; for such responses, writes
// are flushed to the client immediately.
// 刷新到客户端的刷新间隔
FlushInterval time.Duration

// ErrorLog specifies an optional logger for errors
// that occur when attempting to proxy the request.
// If nil, logging is done via the log package's standard logger.
// 错误记录器
ErrorLog *log.Logger

// BufferPool optionally specifies a buffer pool to
// get byte slices for use by io.CopyBuffer when
// copying HTTP response bodies.
// 定义缓冲池,在复制http响应时使用,用以提高请求效率
BufferPool BufferPool

// ModifyResponse is an optional function that modifies the
// Response from the backend. It is called if the backend
// returns a response at all, with any HTTP status code.
// If the backend is unreachable, the optional ErrorHandler is
// called without any call to ModifyResponse.
//
// If ModifyResponse returns an error, ErrorHandler is called
// with its error value. If ErrorHandler is nil, its default
// implementation is used.
// 修改response函数
ModifyResponse func(*http.Response) error

// ErrorHandler is an optional function that handles errors
// reaching the backend or errors from ModifyResponse.
//
// If nil, the default is to log the provided error and return
// a 502 Status Bad Gateway response.
// 错误处理回调函数,如果为nil时,则遇到错误会显示502
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}

func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}
// 验证请求是否终止
ctx := req.Context()
if ctx.Done() != nil {
// CloseNotifier predates context.Context, and has been
// entirely superseded by it. If the request contains
// a Context that carries a cancellation signal, don't
// bother spinning up a goroutine to watch the CloseNotify
// channel (if any).
//
// If the request Context has a nil Done channel (which
// means it is either context.Background, or a custom
// Context implementation with no cancellation signal),
// then consult the CloseNotifier if available.
} else if cn, ok := rw.(http.CloseNotifier); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel()
case <-ctx.Done():
}
}()
}
// 设置请求ctx信息
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}
if outreq.Body != nil {
// Reading from the request body after returning from a handler is not
// allowed, and the RoundTrip goroutine that reads the Body can outlive
// this handler. This can lead to a crash if the handler panics (see
// Issue 46866). Although calling Close doesn't guarantee there isn't
// any Read in flight after the handle returns, in practice it's safe to
// read after closing it.
defer outreq.Body.Close()
}
// 深拷贝header
if outreq.Header == nil {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
// 修改req
p.Director(outreq)
outreq.Close = false
// Upgrade头的特殊处理
reqUpType := upgradeType(outreq.Header)
if !ascii.IsPrint(reqUpType) {
p.getErrorHandler()(rw, req, fmt.Errorf("client tried to switch to invalid protocol %q", reqUpType))
return
}
removeConnectionHeaders(outreq.Header)

// Remove hop-by-hop headers to the backend. Especially
// important is "Connection" because we want a persistent
// connection, regardless of what the client sent to us.
// 删除后端的逐段标题
for _, h := range hopHeaders {
outreq.Header.Del(h)
}

// Issue 21096: tell backend applications that care about trailer support
// that we support trailers. (We do, but we don't go out of our way to
// advertise that unless the incoming client request thought it was worth
// mentioning.) Note that we look at req.Header, not outreq.Header, since
// the latter has passed through removeConnectionHeaders.
if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
outreq.Header.Set("Te", "trailers")
}

// After stripping all the hop-by-hop connection headers above, add back any
// necessary for protocol upgrades, such as for websockets.
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
// 追加clint IP信息
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// If we aren't the first proxy retain prior
// X-Forwarded-For information as a comma+space
// separated list and fold multiple headers into one.
prior, ok := outreq.Header["X-Forwarded-For"]
omit := ok && prior == nil // Issue 38079: nil now means don't populate the header
if len(prior) > 0 {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
if !omit {
outreq.Header.Set("X-Forwarded-For", clientIP)
}
}
// 向下游请求数据
res, err := transport.RoundTrip(outreq)
if err != nil {
p.getErrorHandler()(rw, outreq, err)
return
}

// Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
// 处理升级协议请求
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
// 移除逐段头部
removeConnectionHeaders(res.Header)

for _, h := range hopHeaders {
res.Header.Del(h)
}
// 修改返回数据
if !p.modifyResponse(rw, res, outreq) {
return
}
// 拷贝头部信息
copyHeader(rw.Header(), res.Header)

// The "Trailer" header isn't included in the Transport's response,
// at least for *http.Transport. Build it up from Trailer.
announcedTrailers := len(res.Trailer)
if announcedTrailers > 0 {
trailerKeys := make([]string, 0, len(res.Trailer))
for k := range res.Trailer {
trailerKeys = append(trailerKeys, k)
}
rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}
// 写入状态码
rw.WriteHeader(res.StatusCode)
// 周期刷新内容到response
err = p.copyResponse(rw, res.Body, p.flushInterval(res))
if err != nil {
defer res.Body.Close()
// Since we're streaming the response, if we run into an error all we can do
// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
// on read error while copying body.
if !shouldPanicOnCopyError(req) {
p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
return
}
panic(http.ErrAbortHandler)
}
res.Body.Close() // close now, instead of defer, to populate res.Trailer

if len(res.Trailer) > 0 {
// Force chunking if we saw a response trailer.
// This prevents net/http from calculating the length for short
// bodies and adding a Content-Length.
if fl, ok := rw.(http.Flusher); ok {
fl.Flush()
}
}

if len(res.Trailer) == announcedTrailers {
copyHeader(rw.Header(), res.Trailer)
return
}

for k, vv := range res.Trailer {
k = http.TrailerPrefix + k
for _, v := range vv {
rw.Header().Add(k, v)
}
}
}

修改response后记得更新响应头中的 Content-Length以及响应体中ContentLength属性。

func (p *ReverseProxy)ServerHTTP主要步骤如下:

  • 验证是否请求终止
  • 设置请求ctx信息
  • 深拷贝header
  • 修改req
  • Upgrade头的特殊处理
  • 追加clientIp信息
  • 向下游请求数据
  • 处理升级协议请求
  • 移除逐段头部
  • 修改返回内容
  • 拷贝头部的数据
  • 写入状态码
  • 周期刷新内容到response
Connection
  • 标记请求发起方与第一代理的状态
  • 决定当前事务完成后,是否会关闭网络
    • Connection:keep-alive 不关闭网络
    • Connection:close 关闭网络
    • Connection:upgrade 协议升级
TE

TE是requesy_header,表示希望使用的传输编码类型。

如:TE:trailers,deflate;q=0.5表示,期望在采用分块传输编码响应中接收挂载字段,zlib编码,0.5优先级排序。

Trailer

Trailer是response header,允许发送方在消息后面添加额外的元信息。

如:Trailer:Expires表示Expires将出现在分块信息的结尾。

去除逐段传输头

代理过程中上述只发生在客户端和第一代理中。因此第一代理需要除去标准的逐段传输头(hop-by-hop)

  • 逐段传输头都需要在Connection头中列出
  • 第一代理必须处理它们且不转发
  • 逐段传输头:Keep-Alive,Transfer-Encoding,TE,Connection,Trailer,Upgrade,Proxy-Authorization,Proxy-Authenticate

socks5代理

负载均衡

随机负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import (
"errors"
"fmt"
"math/rand"
"strings"
)

type RandomBalance struct {
curIndex int
rss []string
//观察主体
conf LoadBalanceConf
}

func (r *RandomBalance) Add(params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
r.rss = append(r.rss, addr)
return nil
}

func (r *RandomBalance) Next() string {
if len(r.rss) == 0 {
return ""
}
r.curIndex = rand.Intn(len(r.rss))
return r.rss[r.curIndex]
}

func (r *RandomBalance) Get(key string) (string, error) {
return r.Next(), nil
}

func (r *RandomBalance) SetConf(conf LoadBalanceConf) {
r.conf = conf
}

func (r *RandomBalance) Update() {
if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
r.rss = []string{}
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
if conf, ok := r.conf.(*LoadBalanceCheckConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
r.rss = nil
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
}

轮询负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import (
"errors"
"fmt"
"strings"
)

type RoundRobinBalance struct {
curIndex int
rss []string
//观察主体
conf LoadBalanceConf
}

func (r *RoundRobinBalance) Add(params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
r.rss = append(r.rss, addr)
return nil
}

func (r *RoundRobinBalance) Next() string {
if len(r.rss) == 0 {
return ""
}
lens := len(r.rss) //5
if r.curIndex >= lens {
r.curIndex = 0
}
curAddr := r.rss[r.curIndex]
r.curIndex = (r.curIndex + 1) % lens
return curAddr
}

func (r *RoundRobinBalance) Get(key string) (string, error) {
return r.Next(), nil
}

func (r *RoundRobinBalance) SetConf(conf LoadBalanceConf) {
r.conf = conf
}

func (r *RoundRobinBalance) Update() {
if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
r.rss = []string{}
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
if conf, ok := r.conf.(*LoadBalanceCheckConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
r.rss = nil
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
}

加权轮询负载均衡

加权负载均衡.png

注意这里没有考虑调用下游失败的情况,实际环境中会因为下游失败导致effectiveWeight减少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import (
"errors"
"fmt"
"strconv"
"strings"
)

type WeightRoundRobinBalance struct {
curIndex int
rss []*WeightNode
rsw []int
//观察主体
conf LoadBalanceConf
}

type WeightNode struct {
addr string
weight int //权重值
currentWeight int //节点当前权重
effectiveWeight int //有效权重
}

func (r *WeightRoundRobinBalance) Add(params ...string) error {
if len(params) != 2 {
return errors.New("param len need 2")
}
parInt, err := strconv.ParseInt(params[1], 10, 64)
if err != nil {
return err
}
node := &WeightNode{addr: params[0], weight: int(parInt)}
node.effectiveWeight = node.weight
r.rss = append(r.rss, node)
return nil
}

func (r *WeightRoundRobinBalance) Next() string {
total := 0
var best *WeightNode
for i := 0; i < len(r.rss); i++ {
w := r.rss[i]
//step 1 统计所有有效权重之和
total += w.effectiveWeight
//step 2 变更节点临时权重为的节点临时权重+节点有效权重
w.currentWeight += w.effectiveWeight
//step 3 有效权重默认与权重相同,通讯异常时-1, 通讯成功+1,直到恢复到weight大小
if w.effectiveWeight < w.weight {
w.effectiveWeight++
}
//step 4 选择最大临时权重点节点
if best == nil || w.currentWeight > best.currentWeight {
best = w
}
}
if best == nil {
return ""
}
//step 5 变更临时权重为 临时权重-有效权重之和
best.currentWeight -= total
return best.addr
}

func (r *WeightRoundRobinBalance) Get(key string) (string, error) {
return r.Next(), nil
}

func (r *WeightRoundRobinBalance) SetConf(conf LoadBalanceConf) {
r.conf = conf
}

func (r *WeightRoundRobinBalance) Update() {
if conf, ok := r.conf.(*LoadBalanceZkConf); ok {
fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf())
r.rss = nil
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
if conf, ok := r.conf.(*LoadBalanceCheckConf); ok {
fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf())
r.rss = nil
for _, ip := range conf.GetConf() {
r.Add(strings.Split(ip, ",")...)
}
}
}

一致性负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import (
"errors"
"fmt"
"hash/crc32"
"sort"
"strconv"
"strings"
"sync"
)

type Hash func(data []byte) uint32

type UInt32Slice []uint32

func (s UInt32Slice) Len() int {
return len(s)
}

func (s UInt32Slice) Less(i, j int) bool {
return s[i] < s[j]
}

func (s UInt32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

type ConsistentHashBanlance struct {
mux sync.RWMutex
hash Hash
replicas int //复制因子
keys UInt32Slice //已排序的节点hash切片
hashMap map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key

//观察主体
conf LoadBalanceConf
}

func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance {
m := &ConsistentHashBanlance{
replicas: replicas,
hash: fn,
hashMap: make(map[uint32]string),
}
if m.hash == nil {
//最多32位,保证是一个2^32-1环
m.hash = crc32.ChecksumIEEE
}
return m
}

// 验证是否为空
func (c *ConsistentHashBanlance) IsEmpty() bool {
return len(c.keys) == 0
}

// Add 方法用来添加缓存节点,参数为节点key,比如使用IP
func (c *ConsistentHashBanlance) Add(params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
c.mux.Lock()
defer c.mux.Unlock()
// 结合复制因子计算所有虚拟节点的hash值,并存入m.keys中,同时在m.hashMap中保存哈希值和key的映射
for i := 0; i < c.replicas; i++ {
hash := c.hash([]byte(strconv.Itoa(i) + addr))
c.keys = append(c.keys, hash)
c.hashMap[hash] = addr
}
// 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找
sort.Sort(c.keys)
return nil
}

// Get 方法根据给定的对象获取最靠近它的那个节点
func (c *ConsistentHashBanlance) Get(key string) (string, error) {
if c.IsEmpty() {
return "", errors.New("node is empty")
}
hash := c.hash([]byte(key))

// 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点"
idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })

// 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中
if idx == len(c.keys) {
idx = 0
}
c.mux.RLock()
defer c.mux.RUnlock()
return c.hashMap[c.keys[idx]], nil
}

func (c *ConsistentHashBanlance) SetConf(conf LoadBalanceConf) {
c.conf = conf
}

func (c *ConsistentHashBanlance) Update() {
if conf, ok := c.conf.(*LoadBalanceZkConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
c.keys = nil
c.hashMap = nil
for _, ip := range conf.GetConf() {
c.Add(strings.Split(ip, ",")...)
}
}
if conf, ok := c.conf.(*LoadBalanceCheckConf); ok {
fmt.Println("Update get conf:", conf.GetConf())
c.keys = nil
c.hashMap = map[uint32]string{}
for _, ip := range conf.GetConf() {
c.Add(strings.Split(ip, ",")...)
}
}
}

Tip

代码中合理运用swagger可以快速生成对应的测试接口文档。

使用Validation和translations可以校验参数,针对特殊的校验可以在参数的结构体中添加自定义校验方法,并将方案实现在验证器中,对用路由注册时添加验证器中间件即可。


文章作者: 不二
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 不二 !
  目录