type ServeMux struct { mu sync.RWMutex m map[string]muxEntry es []muxEntry // slice of entries sorted from longest to shortest. hosts bool// whether any patterns contain hostnames }
type muxEntry struct { h Handler pattern string }
type Handler interface { ServeHTTP(ResponseWriter, *Request) }
type Transport struct { idleMu sync.Mutex closeIdle bool// user has requested to close all idle conns idleConn map[connectMethodKey][]*persistConn // most recently used at end idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns ... }
type persistConn struct { br *bufio.Reader // from conn bw *bufio.Writer // to conn reqch chan requestAndChan // written by roundTrip; read by readLoop writech chan writeRequest // written by roundTrip; read by writeLoop ... }
// step 3, 把下游请求内容返回给上游 for key, value := range res.Header { for _, v := range value { rw.Header().Add(key, v) } } rw.WriteHeader(res.StatusCode) io.Copy(rw, res.Body) res.Body.Close() }
funcmain() { fmt.Println("Serve on :8080") http.Handle("/", &Pxy{}) http.ListenAndServe("0.0.0.0:8080", nil) }
//step 2 请求下游 transport := http.DefaultTransport resp, err := transport.RoundTrip(r) if err != nil { log.Print(err) return }
//step 3 把下游请求内容返回给上游 for k, vv := range resp.Header { for _, v := range vv { w.Header().Add(k, v) } } defer resp.Body.Close() bufio.NewReader(resp.Body).WriteTo(w) }
funcmain() { http.HandleFunc("/", handler) log.Println("Start serving on port " + port) err := http.ListenAndServe(":"+port, nil) if err != nil { log.Fatal(err) } }
type ReverseProxy struct { // 控制器必须是一个函数,函数内部可以堆请求进行修改 Director func(*http.Request)
// The transport used to perform proxy requests. // If nil, http.DefaultTransport is used. // 连接池 Transport http.RoundTripper
// FlushInterval specifies the flush interval // to flush to the client while copying the // response body. // If zero, no periodic flushing is done. // A negative value means to flush immediately // after each write to the client. // The FlushInterval is ignored when ReverseProxy // recognizes a response as a streaming response, or // if its ContentLength is -1; for such responses, writes // are flushed to the client immediately. // 刷新到客户端的刷新间隔 FlushInterval time.Duration
// ErrorLog specifies an optional logger for errors // that occur when attempting to proxy the request. // If nil, logging is done via the log package's standard logger. // 错误记录器 ErrorLog *log.Logger
// BufferPool optionally specifies a buffer pool to // get byte slices for use by io.CopyBuffer when // copying HTTP response bodies. // 定义缓冲池,在复制http响应时使用,用以提高请求效率 BufferPool BufferPool
// ModifyResponse is an optional function that modifies the // Response from the backend. It is called if the backend // returns a response at all, with any HTTP status code. // If the backend is unreachable, the optional ErrorHandler is // called without any call to ModifyResponse. // // If ModifyResponse returns an error, ErrorHandler is called // with its error value. If ErrorHandler is nil, its default // implementation is used. // 修改response函数 ModifyResponse func(*http.Response)error
// ErrorHandler is an optional function that handles errors // reaching the backend or errors from ModifyResponse. // // If nil, the default is to log the provided error and return // a 502 Status Bad Gateway response. // 错误处理回调函数,如果为nil时,则遇到错误会显示502 ErrorHandler func(http.ResponseWriter, *http.Request, error) }
func(p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { transport := p.Transport if transport == nil { transport = http.DefaultTransport } // 验证请求是否终止 ctx := req.Context() if ctx.Done() != nil { // CloseNotifier predates context.Context, and has been // entirely superseded by it. If the request contains // a Context that carries a cancellation signal, don't // bother spinning up a goroutine to watch the CloseNotify // channel (if any). // // If the request Context has a nil Done channel (which // means it is either context.Background, or a custom // Context implementation with no cancellation signal), // then consult the CloseNotifier if available. } elseif cn, ok := rw.(http.CloseNotifier); ok { var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) defer cancel() notifyChan := cn.CloseNotify() gofunc() { select { case <-notifyChan: cancel() case <-ctx.Done(): } }() } // 设置请求ctx信息 outreq := req.Clone(ctx) if req.ContentLength == 0 { outreq.Body = nil// Issue 16036: nil Body for http.Transport retries } if outreq.Body != nil { // Reading from the request body after returning from a handler is not // allowed, and the RoundTrip goroutine that reads the Body can outlive // this handler. This can lead to a crash if the handler panics (see // Issue 46866). Although calling Close doesn't guarantee there isn't // any Read in flight after the handle returns, in practice it's safe to // read after closing it. defer outreq.Body.Close() } // 深拷贝header if outreq.Header == nil { outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate } // 修改req p.Director(outreq) outreq.Close = false // Upgrade头的特殊处理 reqUpType := upgradeType(outreq.Header) if !ascii.IsPrint(reqUpType) { p.getErrorHandler()(rw, req, fmt.Errorf("client tried to switch to invalid protocol %q", reqUpType)) return } removeConnectionHeaders(outreq.Header)
// Remove hop-by-hop headers to the backend. Especially // important is "Connection" because we want a persistent // connection, regardless of what the client sent to us. // 删除后端的逐段标题 for _, h := range hopHeaders { outreq.Header.Del(h) }
// Issue 21096: tell backend applications that care about trailer support // that we support trailers. (We do, but we don't go out of our way to // advertise that unless the incoming client request thought it was worth // mentioning.) Note that we look at req.Header, not outreq.Header, since // the latter has passed through removeConnectionHeaders. if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") { outreq.Header.Set("Te", "trailers") }
// After stripping all the hop-by-hop connection headers above, add back any // necessary for protocol upgrades, such as for websockets. if reqUpType != "" { outreq.Header.Set("Connection", "Upgrade") outreq.Header.Set("Upgrade", reqUpType) } // 追加clint IP信息 if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil { // If we aren't the first proxy retain prior // X-Forwarded-For information as a comma+space // separated list and fold multiple headers into one. prior, ok := outreq.Header["X-Forwarded-For"] omit := ok && prior == nil// Issue 38079: nil now means don't populate the header iflen(prior) > 0 { clientIP = strings.Join(prior, ", ") + ", " + clientIP } if !omit { outreq.Header.Set("X-Forwarded-For", clientIP) } } // 向下游请求数据 res, err := transport.RoundTrip(outreq) if err != nil { p.getErrorHandler()(rw, outreq, err) return }
for _, h := range hopHeaders { res.Header.Del(h) } // 修改返回数据 if !p.modifyResponse(rw, res, outreq) { return } // 拷贝头部信息 copyHeader(rw.Header(), res.Header)
// The "Trailer" header isn't included in the Transport's response, // at least for *http.Transport. Build it up from Trailer. announcedTrailers := len(res.Trailer) if announcedTrailers > 0 { trailerKeys := make([]string, 0, len(res.Trailer)) for k := range res.Trailer { trailerKeys = append(trailerKeys, k) } rw.Header().Add("Trailer", strings.Join(trailerKeys, ", ")) } // 写入状态码 rw.WriteHeader(res.StatusCode) // 周期刷新内容到response err = p.copyResponse(rw, res.Body, p.flushInterval(res)) if err != nil { defer res.Body.Close() // Since we're streaming the response, if we run into an error all we can do // is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler // on read error while copying body. if !shouldPanicOnCopyError(req) { p.logf("suppressing panic for copyResponse error in test; copy error: %v", err) return } panic(http.ErrAbortHandler) } res.Body.Close() // close now, instead of defer, to populate res.Trailer
iflen(res.Trailer) > 0 { // Force chunking if we saw a response trailer. // This prevents net/http from calculating the length for short // bodies and adding a Content-Length. if fl, ok := rw.(http.Flusher); ok { fl.Flush() } }
func(r *RandomBalance) Update() { if conf, ok := r.conf.(*LoadBalanceZkConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = []string{} for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } if conf, ok := r.conf.(*LoadBalanceCheckConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } }
func(r *RoundRobinBalance) Update() { if conf, ok := r.conf.(*LoadBalanceZkConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = []string{} for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } if conf, ok := r.conf.(*LoadBalanceCheckConf); ok { fmt.Println("Update get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } }
func(r *WeightRoundRobinBalance) Next() string { total := 0 var best *WeightNode for i := 0; i < len(r.rss); i++ { w := r.rss[i] //step 1 统计所有有效权重之和 total += w.effectiveWeight //step 2 变更节点临时权重为的节点临时权重+节点有效权重 w.currentWeight += w.effectiveWeight //step 3 有效权重默认与权重相同,通讯异常时-1, 通讯成功+1,直到恢复到weight大小 if w.effectiveWeight < w.weight { w.effectiveWeight++ } //step 4 选择最大临时权重点节点 if best == nil || w.currentWeight > best.currentWeight { best = w } } if best == nil { return"" } //step 5 变更临时权重为 临时权重-有效权重之和 best.currentWeight -= total return best.addr }
func(r *WeightRoundRobinBalance) Update() { if conf, ok := r.conf.(*LoadBalanceZkConf); ok { fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } if conf, ok := r.conf.(*LoadBalanceCheckConf); ok { fmt.Println("WeightRoundRobinBalance get conf:", conf.GetConf()) r.rss = nil for _, ip := range conf.GetConf() { r.Add(strings.Split(ip, ",")...) } } }
func(c *ConsistentHashBanlance) Update() { if conf, ok := c.conf.(*LoadBalanceZkConf); ok { fmt.Println("Update get conf:", conf.GetConf()) c.keys = nil c.hashMap = nil for _, ip := range conf.GetConf() { c.Add(strings.Split(ip, ",")...) } } if conf, ok := c.conf.(*LoadBalanceCheckConf); ok { fmt.Println("Update get conf:", conf.GetConf()) c.keys = nil c.hashMap = map[uint32]string{} for _, ip := range conf.GetConf() { c.Add(strings.Split(ip, ",")...) } } }