diff --git a/bfe_basic/condition/primitive_test.go b/bfe_basic/condition/primitive_test.go index 5144512a..bc54318f 100644 --- a/bfe_basic/condition/primitive_test.go +++ b/bfe_basic/condition/primitive_test.go @@ -16,11 +16,10 @@ package condition import ( "net" + "net/http" "testing" "time" -) -import ( "github.com/bfenetworks/bfe/bfe_basic" "github.com/bfenetworks/bfe/bfe_http" ) @@ -227,8 +226,8 @@ func TestContainMatcher_2(t *testing.T) { func TestContextValueFetcher(t *testing.T) { // prepare input data hf := ContextValueFetcher{"hello"} - req := bfe_basic.NewRequest(nil, nil, nil, nil, nil) - req.HttpRequest = &bfe_http.Request{} + rreq, _ := bfe_http.NewRequest(http.MethodGet, "http://example.org", nil) + req := bfe_basic.NewRequest(rreq, nil, nil, nil, nil) req.SetContext("hello", "world") // Fetch contextVal, err := hf.Fetch(req) diff --git a/bfe_basic/request.go b/bfe_basic/request.go index d3d40059..664f1fe9 100644 --- a/bfe_basic/request.go +++ b/bfe_basic/request.go @@ -19,9 +19,8 @@ package bfe_basic import ( "net" "net/url" -) + "strings" -import ( "github.com/bfenetworks/bfe/bfe_balance/backend" "github.com/bfenetworks/bfe/bfe_http" ) @@ -99,6 +98,8 @@ type Request struct { // User context associated with this request Context map[interface{}]interface{} + + IsSse bool } // NewRequest creates and initializes a new request. @@ -120,6 +121,14 @@ func NewRequest(request *bfe_http.Request, conn net.Conn, stat *RequestStat, fReq.SvrDataConf = svrDataConf + accValues := strings.ToLower(request.Header.Get("Accept")) + for _, item := range strings.Split(accValues, ",") { + if strings.TrimSpace(item) == "text/event-stream" { + fReq.IsSse = true + break + } + } + return fReq } diff --git a/bfe_http/response.go b/bfe_http/response.go index 0b216f5a..03998d81 100644 --- a/bfe_http/response.go +++ b/bfe_http/response.go @@ -27,9 +27,7 @@ import ( "net/url" "strconv" "strings" -) -import ( "github.com/bfenetworks/bfe/bfe_bufio" "github.com/bfenetworks/bfe/bfe_net/textproto" "github.com/bfenetworks/bfe/bfe_tls" @@ -47,7 +45,6 @@ type SignCalculator interface { } // Response represents the response from an HTTP request. -// type Response struct { Status string // e.g. "200 OK" StatusCode int // e.g. 200 @@ -109,6 +106,8 @@ type Response struct { // The pointer is shared between responses and should not be // modified. TLS *bfe_tls.ConnectionState + + IsSse bool } // Cookies parses and returns the cookies set in the Set-Cookie headers. @@ -133,6 +132,26 @@ func (r *Response) Location() (*url.URL, error) { return url.Parse(lv) } +func isSSEResponse(ct string) bool { + if len(ct) < 15 { // length of sse relative values is >= 15 + return false + } + + if strings.EqualFold(ct, "text/event-stream") { + return true + } + + ctValues := strings.ToLower(ct) + for _, item := range strings.Split(ctValues, ";") { + fitem := strings.TrimSpace(item) + if fitem == "text/event-stream" || fitem == "application/sse" || fitem == "application/x-sse" { + return true + } + } + + return false +} + // ReadResponse reads and returns an HTTP response from r. // The req parameter optionally specifies the Request that corresponds // to this Response. If nil, a GET request is assumed. @@ -182,6 +201,9 @@ func ReadResponse(r *bfe_bufio.Reader, req *Request) (*Response, error) { fixPragmaCacheControl(resp.Header) + contentType := resp.Header.Get("Content-Type") + resp.IsSse = isSSEResponse(contentType) + err = readTransfer(resp, r) if err != nil { return nil, err @@ -191,8 +213,11 @@ func ReadResponse(r *bfe_bufio.Reader, req *Request) (*Response, error) { } // RFC2616: Should treat +// // Pragma: no-cache +// // like +// // Cache-Control: no-cache func fixPragmaCacheControl(header Header) { if hp, ok := header["Pragma"]; ok && len(hp) > 0 && hp[0] == "no-cache" { @@ -212,16 +237,15 @@ func (r *Response) ProtoAtLeast(major, minor int) bool { // Writes the response (header, body and trailer) in wire format. This method // consults the following fields of the response: // -// StatusCode -// ProtoMajor -// ProtoMinor -// Request.Method -// TransferEncoding -// Trailer -// Body -// ContentLength -// Header, values for non-canonical keys will have unpredictable behavior -// +// StatusCode +// ProtoMajor +// ProtoMinor +// Request.Method +// TransferEncoding +// Trailer +// Body +// ContentLength +// Header, values for non-canonical keys will have unpredictable behavior func (r *Response) Write(w io.Writer) error { // Status line diff --git a/bfe_modules/mod_redirect/action_url_test.go b/bfe_modules/mod_redirect/action_url_test.go index fc68d04d..95db1ef5 100644 --- a/bfe_modules/mod_redirect/action_url_test.go +++ b/bfe_modules/mod_redirect/action_url_test.go @@ -15,17 +15,16 @@ package mod_redirect import ( + "net/http" "net/url" "testing" -) -import ( "github.com/bfenetworks/bfe/bfe_basic" "github.com/bfenetworks/bfe/bfe_http" ) func prepareRequest(urlStr string) *bfe_basic.Request { - req := new(bfe_http.Request) + req, _ := bfe_http.NewRequest(http.MethodGet, "http://example.org", nil) req.URL, _ = url.Parse(urlStr) freq := bfe_basic.NewRequest(req, nil, nil, nil, nil) diff --git a/bfe_server/http_conn.go b/bfe_server/http_conn.go index 3f6338cc..11d87c4d 100644 --- a/bfe_server/http_conn.go +++ b/bfe_server/http_conn.go @@ -27,14 +27,9 @@ import ( "strings" "sync" "time" -) -import ( "github.com/baidu/go-lib/gotrack" "github.com/baidu/go-lib/log" -) - -import ( "github.com/bfenetworks/bfe/bfe_basic" "github.com/bfenetworks/bfe/bfe_bufio" "github.com/bfenetworks/bfe/bfe_http" @@ -535,9 +530,24 @@ func (c *conn) serveRequest(w bfe_http.ResponseWriter, request *bfe_basic.Reques // [*] Not strictly true: HTTP pipelining. We could let them all process // in parallel even if their responses need to be serialized. + isClientReqSse := false + if request.IsSse { + isClientReqSse = true + proxyState.SseReqServed.Inc(1) + + proxyState.SseReqActive.Inc(1) + defer proxyState.SseReqActive.Dec(1) + } + // serve the request ret1 := c.server.ReverseProxy.ServeHTTP(w, request) + if !isClientReqSse && request.IsSse { + //sse tag in response header + proxyState.SseReqServed.Inc(1) + //ignore SseReqActive, since it almost has done + } + // if there is some response, count the time if !request.Stat.ResponseStart.IsZero() { request.Stat.ResponseEnd = time.Now() diff --git a/bfe_server/proxy_state.go b/bfe_server/proxy_state.go index 585f8631..112ff6fd 100644 --- a/bfe_server/proxy_state.go +++ b/bfe_server/proxy_state.go @@ -44,7 +44,7 @@ type ProxyState struct { ErrBkNoBalance *metrics.Counter ErrBkNoCluster *metrics.Counter ErrBkBodyProcess *metrics.Counter - + // backend side errors ErrBkConnectBackend *metrics.Counter ErrBkRequestBackend *metrics.Counter @@ -119,6 +119,10 @@ type ProxyState struct { StreamClientConnActive *metrics.Gauge WsClientConnActive *metrics.Gauge WssClientConnActive *metrics.Gauge + + // sse + SseReqServed *metrics.Counter + SseReqActive *metrics.Gauge } func (s *ProxyState) ClientConnServedInc(proto string, value uint) { diff --git a/bfe_server/response.go b/bfe_server/response.go index c56ccc21..f7e20e8e 100644 --- a/bfe_server/response.go +++ b/bfe_server/response.go @@ -29,13 +29,8 @@ import ( "sync" "sync/atomic" "time" -) -import ( "github.com/baidu/go-lib/log" -) - -import ( "github.com/bfenetworks/bfe/bfe_bufio" "github.com/bfenetworks/bfe/bfe_http" ) @@ -299,12 +294,12 @@ func (w *response) bodyAllowed() bool { // // The Writers are wired together like: // -// 1. *response (the ResponseWriter) -> -// 2. (*response).w, a *bufio.Writer of bufferBeforeChunkingSize bytes -// 3. chunkWriter.Writer (whose writeHeader finalizes Content-Length/Type) -// and which writes the chunk headers, if needed. -// 4. conn.buf, a bufio.Writer of default (4kB) bytes -// 5. the rwc, the net.Conn. +// 1. *response (the ResponseWriter) -> +// 2. (*response).w, a *bufio.Writer of bufferBeforeChunkingSize bytes +// 3. chunkWriter.Writer (whose writeHeader finalizes Content-Length/Type) +// and which writes the chunk headers, if needed. +// 4. conn.buf, a bufio.Writer of default (4kB) bytes +// 5. the rwc, the net.Conn. // // TODO(bradfitz): short-circuit some of the buffering when the // initial header contains both a Content-Type and Content-Length. diff --git a/bfe_server/reverseproxy.go b/bfe_server/reverseproxy.go index 4910b4b3..83e28fb0 100644 --- a/bfe_server/reverseproxy.go +++ b/bfe_server/reverseproxy.go @@ -594,14 +594,18 @@ func (p *ReverseProxy) setTimeout(stage bfe_basic.OperationStage, conn net.Conn, req *bfe_http.Request, d time.Duration) { switch b := req.Body.(type) { case *bfe_http2.RequestBody: // http2 - if stage == bfe_basic.StageReadReqBody { - bfe_http2.SetReadStreamTimeout(b, d) - } - if stage == bfe_basic.StageWriteClient { - bfe_http2.SetWriteStreamTimeout(b, d) - } - if stage == bfe_basic.StageEndRequest { - bfe_http2.SetConnTimeout(b, d) + if d >= 0 { + if stage == bfe_basic.StageReadReqBody { + bfe_http2.SetReadStreamTimeout(b, d) + } + if stage == bfe_basic.StageWriteClient { + bfe_http2.SetWriteStreamTimeout(b, d) + } + if stage == bfe_basic.StageEndRequest { + bfe_http2.SetConnTimeout(b, d) + } + } else { + //skip timeout setingg } case *bfe_spdy.RequestBody: // spdy if stage == bfe_basic.StageReadReqBody { @@ -614,11 +618,18 @@ func (p *ReverseProxy) setTimeout(stage bfe_basic.OperationStage, bfe_spdy.SetConnTimeout(b, d) } default: // http + timeout := time.Time{} //no timeout + if d >= 0 { + timeout = time.Now().Add(d) + } else { + //skip timeout setingg + } + if stage == bfe_basic.StageReadReqBody || stage == bfe_basic.StageEndRequest { - conn.SetReadDeadline(time.Now().Add(d)) + conn.SetReadDeadline(timeout) } if stage == bfe_basic.StageWriteClient { - conn.SetWriteDeadline(time.Now().Add(d)) + conn.SetWriteDeadline(timeout) } } } @@ -654,6 +665,7 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic resFlushInterval := time.Duration(0) cancelOnClientClose := false + timeoutReadClient := time.Duration(cluster_conf.DefaultReadClientTimeout) * time.Millisecond timeoutWriteClient := time.Duration(cluster_conf.DefaultWriteClientTimeout) * time.Millisecond timeoutReadClientAgain := time.Duration(cluster_conf.DefaultReadClientAgainTimeout) * time.Millisecond @@ -761,12 +773,20 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic basicReq.Backend.ClusterName = clusterName // set deadline to finish read client request body - p.setTimeout(bfe_basic.StageReadReqBody, basicReq.Connection, req, cluster.TimeoutReadClient()) + timeoutReadClient = cluster.TimeoutReadClient() resFlushInterval = cluster.ResFlushInterval() cancelOnClientClose = cluster.CancelOnClientClose() timeoutWriteClient = cluster.TimeoutWriteClient() timeoutReadClientAgain = cluster.TimeoutReadClientAgain() + if basicReq.IsSse { + timeoutReadClient = -1 + timeoutWriteClient = -1 + cancelOnClientClose = true + } + + p.setTimeout(bfe_basic.StageReadReqBody, basicReq.Connection, req, timeoutReadClient) + // Callback for HandleAfterLocation hl = srv.CallBacks.GetHandlerList(bfe_module.HandleAfterLocation) if hl != nil { @@ -839,28 +859,28 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic } } } -/* - // do body process before forwarding - bf, ok = outreq.Body.(BufferFiller) - if ok { - // if body is BufferFiller, call FillBuffer to process body before forwarding - for err == nil { - err = bf.FillBuffer() - } - if err != io.EOF { - basicReq.ErrCode = bfe_basic.ErrBkBodyProcess - basicReq.ErrMsg = err.Error() - - p.proxyState.ErrBkBodyProcess.Inc(1) + /* + // do body process before forwarding + bf, ok = outreq.Body.(BufferFiller) + if ok { + // if body is BufferFiller, call FillBuffer to process body before forwarding + for err == nil { + err = bf.FillBuffer() + } + if err != io.EOF { + basicReq.ErrCode = bfe_basic.ErrBkBodyProcess + basicReq.ErrMsg = err.Error() - // close connection - res = bfe_basic.CreateSpecifiedContentResp(basicReq, bfe_http.StatusBadRequest, "text/plain", - fmt.Sprintf("Error %s: %s", basicReq.ErrCode.Error(), basicReq.ErrMsg)) - action = closeAfterReply - goto send_response + p.proxyState.ErrBkBodyProcess.Inc(1) + + // close connection + res = bfe_basic.CreateSpecifiedContentResp(basicReq, bfe_http.StatusBadRequest, "text/plain", + fmt.Sprintf("Error %s: %s", basicReq.ErrCode.Error(), basicReq.ErrMsg)) + action = closeAfterReply + goto send_response + } } - } -*/ + */ // invoke cluster to get response res, action, err = p.clusterInvoke(srv, cluster, basicReq, rw) basicReq.HttpResponse = res @@ -881,6 +901,17 @@ func (p *ReverseProxy) ServeHTTP(rw bfe_http.ResponseWriter, basicReq *bfe_basic } response_got: + if res != nil && res.IsSse { + if !basicReq.IsSse { + timeoutReadClient = -1 + p.setTimeout(bfe_basic.StageReadReqBody, basicReq.Connection, req, timeoutReadClient) + + timeoutWriteClient = -1 + cancelOnClientClose = true + basicReq.IsSse = true + } + } + // timeout for write response to client // Note: we use io.Copy() to read from backend and write to client. // For avoid from blocking on client conn or backend conn forever, @@ -1040,4 +1071,4 @@ func checkBackendStatus(outlierDetectionHttpCodeStr string, statusCode int) bool type BufferFiller interface { FillBuffer() error -} \ No newline at end of file +}