@@ -15,6 +15,8 @@ import (
1515
1616 "github.com/imroc/req/v3/internal/transport"
1717 "github.com/quic-go/quic-go"
18+ "github.com/quic-go/quic-go/http3/qlog"
19+ "github.com/quic-go/quic-go/qlogwriter"
1820 "github.com/quic-go/quic-go/quicvarint"
1921
2022 "github.com/quic-go/qpack"
@@ -24,6 +26,9 @@ const maxQuarterStreamID = 1<<60 - 1
2426
2527var errGoAway = errors .New ("connection in graceful shutdown" )
2628
29+ // invalidStreamID is a stream ID that is invalid. The first valid stream ID in QUIC is 0.
30+ const invalidStreamID = quic .StreamID (- 1 )
31+
2732// Conn is an HTTP/3 connection.
2833// It has all methods from the quic.Conn expect for AcceptStream, AcceptUniStream,
2934// SendDatagram and ReceiveDatagram.
@@ -33,8 +38,8 @@ type Conn struct {
3338
3439 ctx context.Context
3540
36- perspective Perspective
37- logger * slog.Logger
41+ isServer bool
42+ logger * slog.Logger
3843
3944 enableDatagrams bool
4045
@@ -50,22 +55,28 @@ type Conn struct {
5055
5156 idleTimeout time.Duration
5257 idleTimer * time.Timer
58+
59+ qlogger qlogwriter.Recorder
5360}
5461
5562func newConnection (
5663 ctx context.Context ,
5764 quicConn * quic.Conn ,
5865 enableDatagrams bool ,
59- perspective Perspective ,
66+ isServer bool ,
6067 logger * slog.Logger ,
6168 idleTimeout time.Duration ,
6269 options * transport.Options ,
6370) * Conn {
71+ var qlogger qlogwriter.Recorder
72+ if qlogTrace := quicConn .QlogTrace (); qlogTrace != nil && qlogTrace .SupportsSchemas (qlog .EventSchema ) {
73+ qlogger = qlogTrace .AddProducer ()
74+ }
6475 c := & Conn {
6576 ctx : ctx ,
6677 conn : quicConn ,
6778 Options : options ,
68- perspective : perspective ,
79+ isServer : isServer ,
6980 logger : logger ,
7081 idleTimeout : idleTimeout ,
7182 enableDatagrams : enableDatagrams ,
@@ -74,6 +85,7 @@ func newConnection(
7485 streams : make (map [quic.StreamID ]* stateTrackingStream ),
7586 maxStreamID : InvalidStreamID ,
7687 lastStreamID : InvalidStreamID ,
88+ qlogger : qlogger ,
7789 }
7890 if idleTimeout > 0 {
7991 c .idleTimer = time .AfterFunc (idleTimeout , c .onIdleTimer )
@@ -127,7 +139,7 @@ func (c *Conn) clearStream(id quic.StreamID) {
127139 }
128140 // The server is performing a graceful shutdown.
129141 // If no more streams are remaining, close the connection.
130- if c .maxStreamID != InvalidStreamID {
142+ if c .maxStreamID != invalidStreamID {
131143 if len (c .streams ) == 0 {
132144 c .CloseWithError (quic .ApplicationErrorCode (ErrCodeNoError ), "" )
133145 }
@@ -144,15 +156,15 @@ func (c *Conn) openRequestStream(
144156 c .streamMx .Lock ()
145157 maxStreamID := c .maxStreamID
146158 var nextStreamID quic.StreamID
147- if c .lastStreamID == InvalidStreamID {
159+ if c .lastStreamID == invalidStreamID {
148160 nextStreamID = 0
149161 } else {
150162 nextStreamID = c .lastStreamID + 4
151163 }
152164 c .streamMx .Unlock ()
153165 // Streams with stream ID equal to or greater than the stream ID carried in the GOAWAY frame
154166 // will be rejected, see section 5.2 of RFC 9114.
155- if maxStreamID != InvalidStreamID && nextStreamID >= maxStreamID {
167+ if maxStreamID != invalidStreamID && nextStreamID >= maxStreamID {
156168 return nil , errGoAway
157169 }
158170
@@ -170,14 +182,14 @@ func (c *Conn) openRequestStream(
170182 return newRequestStream (
171183 ctx ,
172184 c .Options ,
173- newStream (hstr , c , trace , func (r io.Reader , l uint64 ) error {
174- hdr , err := c .decodeTrailers (r , l , maxHeaderBytes )
185+ newStream (hstr , c , trace , func (r io.Reader , hf * headersFrame ) error {
186+ hdr , err := c .decodeTrailers (r , str . StreamID (), hf , maxHeaderBytes )
175187 if err != nil {
176188 return err
177189 }
178190 rsp .Trailer = hdr
179191 return nil
180- }),
192+ }, c . qlogger ),
181193 requestWriter ,
182194 reqDone ,
183195 c .decoder ,
@@ -187,19 +199,24 @@ func (c *Conn) openRequestStream(
187199 ), nil
188200}
189201
190- func (c * Conn ) decodeTrailers (r io.Reader , l , maxHeaderBytes uint64 ) (http.Header , error ) {
191- if l > maxHeaderBytes {
192- return nil , fmt .Errorf ("HEADERS frame too large: %d bytes (max: %d)" , l , maxHeaderBytes )
202+ func (c * Conn ) decodeTrailers (r io.Reader , streamID quic.StreamID , hf * headersFrame , maxHeaderBytes uint64 ) (http.Header , error ) {
203+ if hf .Length > maxHeaderBytes {
204+ maybeQlogInvalidHeadersFrame (c .qlogger , streamID , hf .Length )
205+ return nil , fmt .Errorf ("HEADERS frame too large: %d bytes (max: %d)" , hf .Length , maxHeaderBytes )
193206 }
194207
195- b := make ([]byte , l )
208+ b := make ([]byte , hf . Length )
196209 if _ , err := io .ReadFull (r , b ); err != nil {
197210 return nil , err
198211 }
199212 fields , err := c .decoder .DecodeFull (b )
200213 if err != nil {
214+ maybeQlogInvalidHeadersFrame (c .qlogger , streamID , hf .Length )
201215 return nil , err
202216 }
217+ if c .qlogger != nil {
218+ qlogParsedHeadersFrame (c .qlogger , streamID , hf , fields )
219+ }
203220 return parseTrailers (fields )
204221}
205222
@@ -254,13 +271,12 @@ func (c *Conn) handleUnidirectionalStreams(hijack func(StreamType, quic.Connecti
254271 // Our QPACK implementation doesn't use the dynamic table yet.
255272 return
256273 case streamTypePushStream :
257- switch c .perspective {
258- case PerspectiveClient :
259- // we never increased the Push ID, so we don't expect any push streams
260- c .CloseWithError (quic .ApplicationErrorCode (ErrCodeIDError ), "" )
261- case PerspectiveServer :
274+ if c .isServer {
262275 // only the server can push
263276 c .CloseWithError (quic .ApplicationErrorCode (ErrCodeStreamCreationError ), "" )
277+ } else {
278+ // we never increased the Push ID, so we don't expect any push streams
279+ c .CloseWithError (quic .ApplicationErrorCode (ErrCodeIDError ), "" )
264280 }
265281 return
266282 default :
@@ -288,8 +304,8 @@ func (c *Conn) handleUnidirectionalStreams(hijack func(StreamType, quic.Connecti
288304}
289305
290306func (c * Conn ) handleControlStream (str * quic.ReceiveStream ) {
291- fp := & frameParser {closeConn : c .conn .CloseWithError , r : str }
292- f , err := fp .ParseNext ()
307+ fp := & frameParser {closeConn : c .conn .CloseWithError , r : str , streamID : str . StreamID () }
308+ f , err := fp .ParseNext (c . qlogger )
293309 if err != nil {
294310 var serr * quic.StreamError
295311 if err == io .EOF || errors .As (err , & serr ) {
@@ -328,12 +344,12 @@ func (c *Conn) handleControlStream(str *quic.ReceiveStream) {
328344 }
329345
330346 // we don't support server push, hence we don't expect any GOAWAY frames from the client
331- if c .perspective == PerspectiveServer {
347+ if c .isServer {
332348 return
333349 }
334350
335351 for {
336- f , err := fp .ParseNext ()
352+ f , err := fp .ParseNext (c . qlogger )
337353 if err != nil {
338354 var serr * quic.StreamError
339355 if err == io .EOF || errors .As (err , & serr ) {
@@ -356,7 +372,7 @@ func (c *Conn) handleControlStream(str *quic.ReceiveStream) {
356372 return
357373 }
358374 c .streamMx .Lock ()
359- if c .maxStreamID != InvalidStreamID && goaway .StreamID > c .maxStreamID {
375+ if c .maxStreamID != invalidStreamID && goaway .StreamID > c .maxStreamID {
360376 c .streamMx .Unlock ()
361377 c .conn .CloseWithError (quic .ApplicationErrorCode (ErrCodeIDError ), "" )
362378 return
@@ -376,8 +392,18 @@ func (c *Conn) handleControlStream(str *quic.ReceiveStream) {
376392func (c * Conn ) sendDatagram (streamID quic.StreamID , b []byte ) error {
377393 // TODO: this creates a lot of garbage and an additional copy
378394 data := make ([]byte , 0 , len (b )+ 8 )
395+ quarterStreamID := uint64 (streamID / 4 )
379396 data = quicvarint .Append (data , uint64 (streamID / 4 ))
380397 data = append (data , b ... )
398+ if c .qlogger != nil {
399+ c .qlogger .RecordEvent (qlog.DatagramCreated {
400+ QuaterStreamID : quarterStreamID ,
401+ Raw : qlog.RawInfo {
402+ Length : len (data ),
403+ PayloadLength : len (b ),
404+ },
405+ })
406+ }
381407 return c .conn .SendDatagram (data )
382408}
383409
@@ -392,6 +418,15 @@ func (c *Conn) receiveDatagrams() error {
392418 c .CloseWithError (quic .ApplicationErrorCode (ErrCodeDatagramError ), "" )
393419 return fmt .Errorf ("could not read quarter stream id: %w" , err )
394420 }
421+ if c .qlogger != nil {
422+ c .qlogger .RecordEvent (qlog.DatagramParsed {
423+ QuaterStreamID : quarterStreamID ,
424+ Raw : qlog.RawInfo {
425+ Length : len (b ),
426+ PayloadLength : len (b ) - n ,
427+ },
428+ })
429+ }
395430 if quarterStreamID > maxQuarterStreamID {
396431 c .CloseWithError (quic .ApplicationErrorCode (ErrCodeDatagramError ), "" )
397432 return fmt .Errorf ("invalid quarter stream id: %w" , err )
0 commit comments