Implement AsyncNextMessageDirect (fixes #150)#153
Conversation
| // - an error occurs when reading/decoding the message bytes from the underlying stream | ||
| // - the payload of the message is successfully read | ||
| func (s *Stream) AsyncNextMessageDirect(callback AsyncMessageDirectCallback) { | ||
| s.AsyncFlush(func(err error) { |
There was a problem hiding this comment.
I don't think this is needed as AsyncNextFrame flushes already before reading the next frame. Am I missing something here?
|
|
||
| // asyncNextMessageDirectFirstFrame reads the first data frame. | ||
| // If it is FIN, we do single-frame zero-copy. Otherwise, we start gathering frames. | ||
| func (s *Stream) asyncNextMessageDirectFirstFrame(cb AsyncMessageDirectCallback) { |
There was a problem hiding this comment.
The first idea is good! I think here we can reduce the amount of code to around half here.
When dealing with fragmented reads/writes or in general with any operation that involves multiple underlying calls, try to always think what is the information that you need on each underlying call to complete the full operation. In some cases the information is non-existent, which makes the call stateless. In other cases, the information is at least some set of variables, which makes the call stateful.
For example, AsyncNextMessage is a stateful operation - its state is baked into its arguments: continuation, readBytes, b. Another example are the reads/writes in async_adapter. There you introduced the state as struct which is a bit more explicit.
AsyncNextMessageDirect is a stateful operation. The state can be explicit (a struct, like in async_adapter) and is most likely comprised of:
[][]partsor[][]payloadswhich gets updated on each call - it's resulting length will be at least 1totalSoFarwhich is the rolling sum of all the read payloads so far. We need this to ensure we respect the max message size, as you've already done. Putting this one into the operation's "state" saves having to iterate over all read parts inasyncNextMessageDirectFragments. Instead, you can updated it on each call, so "state change" and then check that it's within bounds.
By introducing an explicit state, you'll be able to collapse asyncNextMessageDirectFragments and asyncNextMessageDirectFirstFrame into a single function.
| } | ||
|
|
||
| // If message is made of a multiple frames, we must copy each frame's payload | ||
| // This is because existing codec/ws stream designed around only holding on to one frame at a time |
There was a problem hiding this comment.
Nicely noted here: having the codec operate on a single frame allocated at init time prevents us from making truly "zero-copy" reads.
I think the only option we have here is to allow the FrameCodec to hold multiple frames at once. The codec connection in codec.go only deals with Encode and Decode so we're free to do whatever we want with the codec itself. In the end, we need to have a way to "modify" the codec while reading messages in stream.go. This modification can be part of the AsyncNextMessageDirect's state update. I'm thinking of the following, although this might not be the best:
- introduce a method
ReserveFramein codec that ensures the currently read frame won't be overwritten in the next decode call. This method will allocate a new frame to decode into, either withNewFrameor with some custom allocator- one idea is to have a single
[]byteslice for all frames, but this is an optimization is for later. We can size this to be at leastmaxMessageSize+ the size of say10frame headers.
- one idea is to have a single
- we call
ReserveFrameafter each fragment that's a continuation - after we read the last frame, we invoke the user's callback (passing it each frame's payloads, or a frame assembler), and then we can call
codec.ReleaseFramesto force the codec to reset its state before the next non-direct/direct read.
I think we should first focus on making the mechanism work i.e. create a correct interaction between stream.go and frame.go that allows us to store multiple frames throughout a read, as outline above. This is just one solution, there's probably something better so you'd need to tinker a bit here.
After that we can look at how to best keep multiple frames in codec.go. We don't want to allocate on each fragmented direct read, and it would be ideal if all the frames are close to one another in memory.
|
Hi Sergiu, First, thanks for you patience as I learn more about how Sonic works. I've made some changes following from your review, let me know what you think. First, I modified I'm bringing this up because my interpretation of your idea with My idea then was to just allocate a messageBuffer in With this, our logic in Let me know what you think of this solution, or if I am still misunderstanding how the WebSocket implementation works. I've also still left in Another optimization we can add if this is a good solution is to just return a slice of the |
|
Thank you for your changes! Sorry for slow replies, had a lot of work to tie up as I'm off for a couple of days. I'll reply faster from now on :) This change is good, I think we're going in the right direction. I still think we should explore getting rid of memory copies all together in Maybe we can use the By doing this we isolate the memory copies purely to the ByteBuffer. So we'd cut down from a memory copy in stream.go + a memory copy in byte buffer (to clear the read message through either I'd start with playing with the ByteBuffer. I think it's pretty well documented. See how we can use this I won't have any work for the next couple of days and the next few weeks are quite relaxed, so I'll reply promptly (and also code on sonic a little bit). Feel free to post any findings or questions here, no need to wait a lot for a good/perfect solution. |
|
Thanks for bearing with me while I learn how all of the components of the library work. ByteBuffer is super nice for tasks like this! I rewrote how |
|
Great! Was traveling the past few days, taking a look tomorrow! |
| return | ||
| } | ||
|
|
||
| // AsyncNextMessageDirect reads the next websocket message asynchronously, providing a buffer |
There was a problem hiding this comment.
nitpick: here it should be smth like: "providing one more slices representing each payload fragment of a message. If the next message is unfragmented, then a single byte slice is provided, representing the message's full payload. If the message is fragmented into n fragments, then n payloads are provided. Callers can reconstruct the full message payload by a copying out the provided slices into a continuous memory chunk, or by using the FrameAssembler. "
| // - an error occurs when reading/decoding the message bytes from the underlying stream | ||
| // - the payload of the message is successfully read | ||
| // | ||
| // The returned buffer is reused between messages, so its contents must be copied out |
There was a problem hiding this comment.
nitpick: should be: "the returned slices are only valid in the provided callback".
| // ReserveFrame saves a frame so that we can refer to it after we decode the next frame | ||
| func (c *FrameCodec) ReserveFrame() { | ||
| if c.decodeFrame == nil { | ||
| return |
There was a problem hiding this comment.
I think we can only arrive here if Decode is not invoked beforehand right? If so, this seems like a breached invariant, so we can panic.
| // ReservedFramePayloads returns references to the payloads of our reserved frames | ||
| func (c *FrameCodec) ReservedFramePayloads() [][]byte { | ||
| numFrames := len(c.messageFrames) | ||
| payloads := make([][]byte, 0, numFrames) |
There was a problem hiding this comment.
We can get rid of this inline allocation. Can just pre-allocate some fixed number beforehand, like we do for messageFrames
| } | ||
|
|
||
| // ReleaseFrames releases our reserved frames | ||
| func (c *FrameCodec) ReleaseFrames() { |
There was a problem hiding this comment.
I think you are correct in that you don't need decodeReset = true in this function but I would make a note of it.
The DiscardAll ensures proper cleanup after reading a fragmented message.
Do note though that there's now a difference in lifetimes of the frames between the direct and non direct versions. For the non-direct case, a frame is valid until the next AsyncNextMesssage invocation (which calls resetDecode). For the direct case in this PR, frames are valid only within the AsyncNextMessageDirect callback and not after, as we call DiscardAll immediately after the callback. That means we can't store references to the frame payloads after AsyncNextMessageDirect returns, and before the next call.
Do you think we can prolong the lifetime of the frames in the direct case to be valid between two AsyncNextMessageDirect calls and not only within the callback? Not a huge requirement especially if it complicates the code, but would be nice to have :) . I think the tricky case will be to support interleaved control messages here.
There was a problem hiding this comment.
I agree I should document decodeReset here, I know it was tricky for me to figure out how all this worked the first time so documenting it for the future will make this easier to come back to.
I think the simplest way to maintain lifetimes between calls is to just call ReleaseFrames() at the start of AsyncNextMessageDirect vs where we just have it now (after the callback is invoked). Then if AsyncNextMessage, AsyncNextFrame, NextFrame etc are called in between AsyncNextMessageDirect calls, the save area in the buffer remains as-is, and it is reset at the start of next AsyncNextMessageDirect.
This would mean we're holding on to some memory longer than we technically need to, but it keeps everything pretty simple (no need to reset save area in AsyncNextMessage or similar since it's only used with AsyncNextMessageDirect.
What do you think?
There was a problem hiding this comment.
I think this is the simplest solution, I like it!
There was a problem hiding this comment.
Would it also make sense to call ReleaseFrames in AsyncNextMessage? Unsure, what do you think?
There was a problem hiding this comment.
I think it would be simplest to just release in AsyncNextMessageDirect, since the save area isn't used anywhere else and it keeps the logic related to it structurally close together.
There was a problem hiding this comment.
Very clean and neat solution, I really like it! Thank you! I think we're very close to a prod-ready solution here. I left some inline comments. Besides those, I think we need a couple more tests:
- test with max message size breached by first fragment
- test with max message size breached by a fragment following the first one
- test with nth fragment where n > 1 without continuation set, to ensure
ErrExpectedContinuationpropagates correctly - test with multiple
AsyncNextMessageDirectdone in sequence - test with
AsyncNextMessageDirect,AsyncNextMessage(so normal, non zero-copy),AsyncNextMessageDirectand so on.. - test with the websocket client validator
- install docker make sure it's running
- call
./run.shin a terminal - modify
client.goto useAsyncNextMessageDirect - call
go run client.go - the tests results should be generated in some path of the
autobahnfolder, can see them by just openingindex.html
It would also be nice to have some example code using AsyncNextMessageDirect and the FrameAssembler. Perhaps by modifying the binance client, or by implementing a simple client for another exchange. AFAIK Deribit does send fragmented messages
| } | ||
|
|
||
| // Reassemble concatenates all slices into a single new []byte buffer. | ||
| func (fa *FrameAssembler) Reassemble() []byte { |
There was a problem hiding this comment.
I would also add a function ReassembleInto(b []byte) here, so callers can avoid any allocations if they want to.
|
I think I addressed the implementation changes you requested, can you double check before I move on to writing the additional tests? Thanks, Max |
| func (fa *FrameAssembler) ReassembleInto(b []byte) { | ||
| offset := 0 | ||
| for _, p := range fa.parts { | ||
| copy(b[offset:], p) |
There was a problem hiding this comment.
this might panic if b cannot fit the current part (out of bounds). If b is too small then we can just return false from this function. If the reassembly is successful, we can return true. Odds are the callers will already size b to the max messsage size configured for the stream, but better be safe :)
|
|
||
| // ReservedFramePayloads returns references to the payloads of our reserved frames | ||
| func (c *FrameCodec) ReservedFramePayloads() [][]byte { | ||
| return c.messagePayloads; |
There was a problem hiding this comment.
nice optimization here to remove the for loop and reference the payloads directly after each decode in ReserveFrame
Changes look good, can move on with testing, thank you! For the test where we combine |
|
I've made the requested changes, let me know what you think! |
|
Going to check it over the weekend, sry was swamped with work |
There was a problem hiding this comment.
All looks good, thank you for implementing the changes and all the work. 3 things left:
- derabit should be deribit
TestClientMixedAsyncNextMessageDirectAndAsyncNextMessageis failing for me, i thinkbufneeds to be sliced to a smaller length cuz the payloads look good but i'll let you take a look- pls rebase with master, I introduced some changes around setting the
MaxMessageSizefor a stream pre-handshake
Otherwise the code looks really good and this will get merged! Very curious to see the gains from this in production, I've ran some tests and I see that almost all exchanges Talos integrates with have a 1 fragment messages.
| @@ -0,0 +1,103 @@ | |||
| package main | |||
There was a problem hiding this comment.
should be renamed to "deribit" from "derabit"
| } | ||
|
|
||
| if len(pl) != 1 { | ||
| panic("Derabit should only be sending single frame messages.") |
|
Should all be fixed! |
I've implemented AsyncNextMessageDirect and FrameAssembler as defined in #150, plus tests for both.
The one issue I ran into is that the websocket stream and codec are both designed to only store one frame in memory at a time. I tried to rewrite this so we could buffer an entire message's worth of frames (so it would be zero-copy), but I kept running into issues as this was a pretty complicated refactor.
The solution I came up with was:
This obviously is not a perfect solution. I would be happy to continue trying to implement a true zero-copy method, but I would probably need some help or guidance.