Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions bfe_basic/condition/primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ package condition

import (
"net"
"net/http"
"testing"
"time"
)

import (
"github.com/bfenetworks/bfe/bfe_basic"
"github.com/bfenetworks/bfe/bfe_http"
)
Expand Down Expand Up @@ -227,8 +226,8 @@ func TestContainMatcher_2(t *testing.T) {
func TestContextValueFetcher(t *testing.T) {
// prepare input data
hf := ContextValueFetcher{"hello"}
req := bfe_basic.NewRequest(nil, nil, nil, nil, nil)
req.HttpRequest = &bfe_http.Request{}
rreq, _ := bfe_http.NewRequest(http.MethodGet, "http://example.org", nil)
req := bfe_basic.NewRequest(rreq, nil, nil, nil, nil)
req.SetContext("hello", "world")
// Fetch
contextVal, err := hf.Fetch(req)
Expand Down
13 changes: 11 additions & 2 deletions bfe_basic/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package bfe_basic
import (
"net"
"net/url"
)
"strings"

import (
"github.com/bfenetworks/bfe/bfe_balance/backend"
"github.com/bfenetworks/bfe/bfe_http"
)
Expand Down Expand Up @@ -99,6 +98,8 @@ type Request struct {

// User context associated with this request
Context map[interface{}]interface{}

IsSse bool
}

// NewRequest creates and initializes a new request.
Expand All @@ -120,6 +121,14 @@ func NewRequest(request *bfe_http.Request, conn net.Conn, stat *RequestStat,

fReq.SvrDataConf = svrDataConf

accValues := strings.ToLower(request.Header.Get("Accept"))
for _, item := range strings.Split(accValues, ",") {
if strings.TrimSpace(item) == "text/event-stream" {
fReq.IsSse = true
break
}
}

return fReq
}

Expand Down
50 changes: 37 additions & 13 deletions bfe_http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"net/url"
"strconv"
"strings"
)

import (
"github.com/bfenetworks/bfe/bfe_bufio"
"github.com/bfenetworks/bfe/bfe_net/textproto"
"github.com/bfenetworks/bfe/bfe_tls"
Expand All @@ -47,7 +45,6 @@ type SignCalculator interface {
}

// Response represents the response from an HTTP request.
//
type Response struct {
Status string // e.g. "200 OK"
StatusCode int // e.g. 200
Expand Down Expand Up @@ -109,6 +106,8 @@ type Response struct {
// The pointer is shared between responses and should not be
// modified.
TLS *bfe_tls.ConnectionState

IsSse bool
}

// Cookies parses and returns the cookies set in the Set-Cookie headers.
Expand All @@ -133,6 +132,26 @@ func (r *Response) Location() (*url.URL, error) {
return url.Parse(lv)
}

func isSSEResponse(ct string) bool {
if len(ct) < 15 { // length of sse relative values is >= 15
return false
}

if strings.EqualFold(ct, "text/event-stream") {
return true
}

ctValues := strings.ToLower(ct)
for _, item := range strings.Split(ctValues, ";") {
fitem := strings.TrimSpace(item)
if fitem == "text/event-stream" || fitem == "application/sse" || fitem == "application/x-sse" {
return true
}
}

return false
}

// ReadResponse reads and returns an HTTP response from r.
// The req parameter optionally specifies the Request that corresponds
// to this Response. If nil, a GET request is assumed.
Expand Down Expand Up @@ -182,6 +201,9 @@ func ReadResponse(r *bfe_bufio.Reader, req *Request) (*Response, error) {

fixPragmaCacheControl(resp.Header)

contentType := resp.Header.Get("Content-Type")
resp.IsSse = isSSEResponse(contentType)

err = readTransfer(resp, r)
if err != nil {
return nil, err
Expand All @@ -191,8 +213,11 @@ func ReadResponse(r *bfe_bufio.Reader, req *Request) (*Response, error) {
}

// RFC2616: Should treat
//
// Pragma: no-cache
//
// like
//
// Cache-Control: no-cache
func fixPragmaCacheControl(header Header) {
if hp, ok := header["Pragma"]; ok && len(hp) > 0 && hp[0] == "no-cache" {
Expand All @@ -212,16 +237,15 @@ func (r *Response) ProtoAtLeast(major, minor int) bool {
// Writes the response (header, body and trailer) in wire format. This method
// consults the following fields of the response:
//
// StatusCode
// ProtoMajor
// ProtoMinor
// Request.Method
// TransferEncoding
// Trailer
// Body
// ContentLength
// Header, values for non-canonical keys will have unpredictable behavior
//
// StatusCode
// ProtoMajor
// ProtoMinor
// Request.Method
// TransferEncoding
// Trailer
// Body
// ContentLength
// Header, values for non-canonical keys will have unpredictable behavior
func (r *Response) Write(w io.Writer) error {

// Status line
Expand Down
5 changes: 2 additions & 3 deletions bfe_modules/mod_redirect/action_url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,16 @@
package mod_redirect

import (
"net/http"
"net/url"
"testing"
)

import (
"github.com/bfenetworks/bfe/bfe_basic"
"github.com/bfenetworks/bfe/bfe_http"
)

func prepareRequest(urlStr string) *bfe_basic.Request {
req := new(bfe_http.Request)
req, _ := bfe_http.NewRequest(http.MethodGet, "http://example.org", nil)
req.URL, _ = url.Parse(urlStr)

freq := bfe_basic.NewRequest(req, nil, nil, nil, nil)
Expand Down
20 changes: 15 additions & 5 deletions bfe_server/http_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,9 @@ import (
"strings"
"sync"
"time"
)

import (
"github.com/baidu/go-lib/gotrack"
"github.com/baidu/go-lib/log"
)

import (
"github.com/bfenetworks/bfe/bfe_basic"
"github.com/bfenetworks/bfe/bfe_bufio"
"github.com/bfenetworks/bfe/bfe_http"
Expand Down Expand Up @@ -535,9 +530,24 @@ func (c *conn) serveRequest(w bfe_http.ResponseWriter, request *bfe_basic.Reques
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.

isClientReqSse := false
if request.IsSse {
isClientReqSse = true
proxyState.SseReqServed.Inc(1)

proxyState.SseReqActive.Inc(1)
defer proxyState.SseReqActive.Dec(1)
}

// serve the request
ret1 := c.server.ReverseProxy.ServeHTTP(w, request)

if !isClientReqSse && request.IsSse {
//sse tag in response header
proxyState.SseReqServed.Inc(1)
//ignore SseReqActive, since it almost has done
}

// if there is some response, count the time
if !request.Stat.ResponseStart.IsZero() {
request.Stat.ResponseEnd = time.Now()
Expand Down
6 changes: 5 additions & 1 deletion bfe_server/proxy_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ProxyState struct {
ErrBkNoBalance *metrics.Counter
ErrBkNoCluster *metrics.Counter
ErrBkBodyProcess *metrics.Counter

// backend side errors
ErrBkConnectBackend *metrics.Counter
ErrBkRequestBackend *metrics.Counter
Expand Down Expand Up @@ -119,6 +119,10 @@ type ProxyState struct {
StreamClientConnActive *metrics.Gauge
WsClientConnActive *metrics.Gauge
WssClientConnActive *metrics.Gauge

// sse
SseReqServed *metrics.Counter
SseReqActive *metrics.Gauge
}

func (s *ProxyState) ClientConnServedInc(proto string, value uint) {
Expand Down
17 changes: 6 additions & 11 deletions bfe_server/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,8 @@ import (
"sync"
"sync/atomic"
"time"
)

import (
"github.com/baidu/go-lib/log"
)

import (
"github.com/bfenetworks/bfe/bfe_bufio"
"github.com/bfenetworks/bfe/bfe_http"
)
Expand Down Expand Up @@ -299,12 +294,12 @@ func (w *response) bodyAllowed() bool {
//
// The Writers are wired together like:
//
// 1. *response (the ResponseWriter) ->
// 2. (*response).w, a *bufio.Writer of bufferBeforeChunkingSize bytes
// 3. chunkWriter.Writer (whose writeHeader finalizes Content-Length/Type)
// and which writes the chunk headers, if needed.
// 4. conn.buf, a bufio.Writer of default (4kB) bytes
// 5. the rwc, the net.Conn.
// 1. *response (the ResponseWriter) ->
// 2. (*response).w, a *bufio.Writer of bufferBeforeChunkingSize bytes
// 3. chunkWriter.Writer (whose writeHeader finalizes Content-Length/Type)
// and which writes the chunk headers, if needed.
// 4. conn.buf, a bufio.Writer of default (4kB) bytes
// 5. the rwc, the net.Conn.
//
// TODO(bradfitz): short-circuit some of the buffering when the
// initial header contains both a Content-Type and Content-Length.
Expand Down
Loading
Loading