Skip to content

Implement AsyncNextMessageDirect (fixes #150)#153

Open
MaxMaeder wants to merge 18 commits into
talostrading:masterfrom
MaxMaeder:master
Open

Implement AsyncNextMessageDirect (fixes #150)#153
MaxMaeder wants to merge 18 commits into
talostrading:masterfrom
MaxMaeder:master

Conversation

@MaxMaeder
Copy link
Copy Markdown
Contributor

I've implemented AsyncNextMessageDirect and FrameAssembler as defined in #150, plus tests for both.

The one issue I ran into is that the websocket stream and codec are both designed to only store one frame in memory at a time. I tried to rewrite this so we could buffer an entire message's worth of frames (so it would be zero-copy), but I kept running into issues as this was a pretty complicated refactor.

The solution I came up with was:

  1. If it's a single frame message, just return the reference to the payload (this is zero copy)
  2. If the message is multiple frames, copy the intermediate frames to an internal buffer

This obviously is not a perfect solution. I would be happy to continue trying to implement a true zero-copy method, but I would probably need some help or guidance.

@MaxMaeder MaxMaeder requested a review from sergiu128 as a code owner January 8, 2025 21:50
Comment thread codec/websocket/stream.go Outdated
// - an error occurs when reading/decoding the message bytes from the underlying stream
// - the payload of the message is successfully read
func (s *Stream) AsyncNextMessageDirect(callback AsyncMessageDirectCallback) {
s.AsyncFlush(func(err error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed as AsyncNextFrame flushes already before reading the next frame. Am I missing something here?

Comment thread codec/websocket/stream.go Outdated

// asyncNextMessageDirectFirstFrame reads the first data frame.
// If it is FIN, we do single-frame zero-copy. Otherwise, we start gathering frames.
func (s *Stream) asyncNextMessageDirectFirstFrame(cb AsyncMessageDirectCallback) {
Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first idea is good! I think here we can reduce the amount of code to around half here.

When dealing with fragmented reads/writes or in general with any operation that involves multiple underlying calls, try to always think what is the information that you need on each underlying call to complete the full operation. In some cases the information is non-existent, which makes the call stateless. In other cases, the information is at least some set of variables, which makes the call stateful.

For example, AsyncNextMessage is a stateful operation - its state is baked into its arguments: continuation, readBytes, b. Another example are the reads/writes in async_adapter. There you introduced the state as struct which is a bit more explicit.

AsyncNextMessageDirect is a stateful operation. The state can be explicit (a struct, like in async_adapter) and is most likely comprised of:

  • [][]parts or [][]payloads which gets updated on each call - it's resulting length will be at least 1
  • totalSoFar which is the rolling sum of all the read payloads so far. We need this to ensure we respect the max message size, as you've already done. Putting this one into the operation's "state" saves having to iterate over all read parts in asyncNextMessageDirectFragments. Instead, you can updated it on each call, so "state change" and then check that it's within bounds.

By introducing an explicit state, you'll be able to collapse asyncNextMessageDirectFragments and asyncNextMessageDirectFirstFrame into a single function.

Comment thread codec/websocket/stream.go Outdated
}

// If message is made of a multiple frames, we must copy each frame's payload
// This is because existing codec/ws stream designed around only holding on to one frame at a time
Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely noted here: having the codec operate on a single frame allocated at init time prevents us from making truly "zero-copy" reads.

I think the only option we have here is to allow the FrameCodec to hold multiple frames at once. The codec connection in codec.go only deals with Encode and Decode so we're free to do whatever we want with the codec itself. In the end, we need to have a way to "modify" the codec while reading messages in stream.go. This modification can be part of the AsyncNextMessageDirect's state update. I'm thinking of the following, although this might not be the best:

  • introduce a method ReserveFrame in codec that ensures the currently read frame won't be overwritten in the next decode call. This method will allocate a new frame to decode into, either with NewFrame or with some custom allocator
    • one idea is to have a single []byte slice for all frames, but this is an optimization is for later. We can size this to be at least maxMessageSize + the size of say 10 frame headers.
  • we call ReserveFrame after each fragment that's a continuation
  • after we read the last frame, we invoke the user's callback (passing it each frame's payloads, or a frame assembler), and then we can call codec.ReleaseFrames to force the codec to reset its state before the next non-direct/direct read.

I think we should first focus on making the mechanism work i.e. create a correct interaction between stream.go and frame.go that allows us to store multiple frames throughout a read, as outline above. This is just one solution, there's probably something better so you'd need to tinker a bit here.

After that we can look at how to best keep multiple frames in codec.go. We don't want to allocate on each fragmented direct read, and it would be ideal if all the frames are close to one another in memory.

@MaxMaeder
Copy link
Copy Markdown
Contributor Author

MaxMaeder commented Jan 18, 2025

Hi Sergiu,

First, thanks for you patience as I learn more about how Sonic works.

I've made some changes following from your review, let me know what you think.

First, I modified frame_codec.go to add ReserveFrame/ReleaseFrames. However I implemented this slightly differently to how you indicated. If you look in the frame codec, decodeFrame is initialized with an allocation of a frame using NewFrame(). But the codec immediately replaces this allocation by making decodeFrame reference memory in the underlying ByteBuffer when doing a decode. I think then that this allocation was originally done just to avoid nil-pointer edge cases if methods in the codec were called before a decode.

I'm bringing this up because my interpretation of your idea with ReserveFrame was to essentially swap out the region of memory we're going to read the next frame into and therefore be able to store multiple frames at once. Since FrameCodec doesn't actually own any memory, this wouldn't work. My understanding is that FrameCodec.Decode() just figures out what slice of the ByteBuffer we need to hold a reference to the next complete frame.

My idea then was to just allocate a messageBuffer in FrameCodec of maxMessageSize bytes. Then, ReserveFrame will just append the message payload from the latest decoded frame into this buffer (by doing a memcpy from the slice of the ByteBuffer). Then, we can simply add another method FrameCodec.MessagePayload which returns a reference to this buffer containing the message payload. ReleaseFrames resets this buffer for the next message.

With this, our logic in stream.go becomes much simpler, and although we are doing a memcpy for all the payload components, we are not doing any expensive dynamic allocation and we can avoid something like FrameAssembler altogether.

Let me know what you think of this solution, or if I am still misunderstanding how the WebSocket implementation works. I've also still left in FrameAssembler for the time being, if this solution is inadequate.

Another optimization we can add if this is a good solution is to just return a slice of the ByteBuffer holding the frame's payload directly if a message is a single frame, avoiding the memcpy (similar to my original solution).

@MaxMaeder MaxMaeder requested a review from sergiu128 January 18, 2025 20:14
@sergiu128
Copy link
Copy Markdown
Collaborator

sergiu128 commented Jan 22, 2025

Thank you for your changes! Sorry for slow replies, had a lot of work to tie up as I'm off for a couple of days. I'll reply faster from now on :)

This change is good, I think we're going in the right direction. stream.go is indeed much simpler here. And you observed right: the frame codec just figures out what part of the ByteBuffer we need to hold a reference to. This fully slipped my mind while I was typing out my ideas above, long time since I've dealt with ByteBuffer, sorry! The line that gives it away is c.decodeFrame = src.Data()[:readSoFar] in here.

I still think we should explore getting rid of memory copies all together in stream.go in this new read function. You are right that dynamic allocations are on average more costly, but memory copies can also blow out the same way. Usually the cost of a memory copy is linear to the amount of bytes copied, so we'd get bit on big messages.

Maybe we can use the ByteBuffer to our advantage. There is this method Save which, if invoked, will keep the bytes from the read area for later consumption. I think we will still be able to use the PrepareRead/Commit/Consume semantics without disturbing any saved bytes in this codec.

By doing this we isolate the memory copies purely to the ByteBuffer. So we'd cut down from a memory copy in stream.go + a memory copy in byte buffer (to clear the read message through either Consumer or Discard), to just a memory copy in the byte buffer. The next little project will then be to make the byte buffer zero-copy (there's a start on that here).

I'd start with playing with the ByteBuffer. I think it's pretty well documented. See how we can use this Save function and how we can adapt the codec to make use of that (and also how to reference the saved frames, probably through some sort of ReserveFrame function).

I won't have any work for the next couple of days and the next few weeks are quite relaxed, so I'll reply promptly (and also code on sonic a little bit). Feel free to post any findings or questions here, no need to wait a lot for a good/perfect solution.

@MaxMaeder
Copy link
Copy Markdown
Contributor Author

Thanks for bearing with me while I learn how all of the components of the library work.

ByteBuffer is super nice for tasks like this! I rewrote how AsyncNextMessageDirect works to take advantage of the save area, let me know what you think!

@sergiu128
Copy link
Copy Markdown
Collaborator

Great! Was traveling the past few days, taking a look tomorrow!

Comment thread codec/websocket/stream.go Outdated
return
}

// AsyncNextMessageDirect reads the next websocket message asynchronously, providing a buffer
Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: here it should be smth like: "providing one more slices representing each payload fragment of a message. If the next message is unfragmented, then a single byte slice is provided, representing the message's full payload. If the message is fragmented into n fragments, then n payloads are provided. Callers can reconstruct the full message payload by a copying out the provided slices into a continuous memory chunk, or by using the FrameAssembler. "

Comment thread codec/websocket/stream.go Outdated
// - an error occurs when reading/decoding the message bytes from the underlying stream
// - the payload of the message is successfully read
//
// The returned buffer is reused between messages, so its contents must be copied out
Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: should be: "the returned slices are only valid in the provided callback".

Comment thread codec/websocket/frame_codec.go Outdated
// ReserveFrame saves a frame so that we can refer to it after we decode the next frame
func (c *FrameCodec) ReserveFrame() {
if c.decodeFrame == nil {
return
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can only arrive here if Decode is not invoked beforehand right? If so, this seems like a breached invariant, so we can panic.

Comment thread codec/websocket/frame_codec.go Outdated
// ReservedFramePayloads returns references to the payloads of our reserved frames
func (c *FrameCodec) ReservedFramePayloads() [][]byte {
numFrames := len(c.messageFrames)
payloads := make([][]byte, 0, numFrames)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this inline allocation. Can just pre-allocate some fixed number beforehand, like we do for messageFrames

}

// ReleaseFrames releases our reserved frames
func (c *FrameCodec) ReleaseFrames() {
Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are correct in that you don't need decodeReset = true in this function but I would make a note of it.
The DiscardAll ensures proper cleanup after reading a fragmented message.

Do note though that there's now a difference in lifetimes of the frames between the direct and non direct versions. For the non-direct case, a frame is valid until the next AsyncNextMesssage invocation (which calls resetDecode). For the direct case in this PR, frames are valid only within the AsyncNextMessageDirect callback and not after, as we call DiscardAll immediately after the callback. That means we can't store references to the frame payloads after AsyncNextMessageDirect returns, and before the next call.

Do you think we can prolong the lifetime of the frames in the direct case to be valid between two AsyncNextMessageDirect calls and not only within the callback? Not a huge requirement especially if it complicates the code, but would be nice to have :) . I think the tricky case will be to support interleaved control messages here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree I should document decodeReset here, I know it was tricky for me to figure out how all this worked the first time so documenting it for the future will make this easier to come back to.

I think the simplest way to maintain lifetimes between calls is to just call ReleaseFrames() at the start of AsyncNextMessageDirect vs where we just have it now (after the callback is invoked). Then if AsyncNextMessage, AsyncNextFrame, NextFrame etc are called in between AsyncNextMessageDirect calls, the save area in the buffer remains as-is, and it is reset at the start of next AsyncNextMessageDirect.

This would mean we're holding on to some memory longer than we technically need to, but it keeps everything pretty simple (no need to reset save area in AsyncNextMessage or similar since it's only used with AsyncNextMessageDirect.

What do you think?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the simplest solution, I like it!

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it also make sense to call ReleaseFrames in AsyncNextMessage? Unsure, what do you think?

Copy link
Copy Markdown
Contributor Author

@MaxMaeder MaxMaeder Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be simplest to just release in AsyncNextMessageDirect, since the save area isn't used anywhere else and it keeps the logic related to it structurally close together.

Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clean and neat solution, I really like it! Thank you! I think we're very close to a prod-ready solution here. I left some inline comments. Besides those, I think we need a couple more tests:

  • test with max message size breached by first fragment
  • test with max message size breached by a fragment following the first one
  • test with nth fragment where n > 1 without continuation set, to ensure ErrExpectedContinuation propagates correctly
  • test with multiple AsyncNextMessageDirect done in sequence
  • test with AsyncNextMessageDirect, AsyncNextMessage (so normal, non zero-copy), AsyncNextMessageDirect and so on..
  • test with the websocket client validator
    • install docker make sure it's running
    • call ./run.sh in a terminal
    • modify client.go to use AsyncNextMessageDirect
    • call go run client.go
    • the tests results should be generated in some path of the autobahn folder, can see them by just opening index.html

It would also be nice to have some example code using AsyncNextMessageDirect and the FrameAssembler. Perhaps by modifying the binance client, or by implementing a simple client for another exchange. AFAIK Deribit does send fragmented messages

}

// Reassemble concatenates all slices into a single new []byte buffer.
func (fa *FrameAssembler) Reassemble() []byte {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add a function ReassembleInto(b []byte) here, so callers can avoid any allocations if they want to.

@MaxMaeder
Copy link
Copy Markdown
Contributor Author

I think I addressed the implementation changes you requested, can you double check before I move on to writing the additional tests?

Thanks, Max

@MaxMaeder MaxMaeder requested a review from sergiu128 February 6, 2025 15:27
func (fa *FrameAssembler) ReassembleInto(b []byte) {
offset := 0
for _, p := range fa.parts {
copy(b[offset:], p)
Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might panic if b cannot fit the current part (out of bounds). If b is too small then we can just return false from this function. If the reassembly is successful, we can return true. Odds are the callers will already size b to the max messsage size configured for the stream, but better be safe :)


// ReservedFramePayloads returns references to the payloads of our reserved frames
func (c *FrameCodec) ReservedFramePayloads() [][]byte {
return c.messagePayloads;
Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice optimization here to remove the for loop and reference the payloads directly after each decode in ReserveFrame

@sergiu128
Copy link
Copy Markdown
Collaborator

I think I addressed the implementation changes you requested, can you double check before I move on to writing the additional tests?

Thanks, Max

Changes look good, can move on with testing, thank you! For the test where we combine AsyncNextMessageDirect and AsyncNextMessage, we should also assert that the ByteBuffer save and read areas are in order. This depends on what you come up with here. Essentially we just want to make sure that all data in the buffer eventually gets flushed out. So nothing stays in forever. I think in the current solution you can have the read and save area occupied at the same time which is fine. We just want to make sure they don't grow indefinitely.

Comment thread tests/autobahn/client.go Outdated
Comment thread codec/websocket/stream.go
Comment thread examples/derabit/main.go
@MaxMaeder
Copy link
Copy Markdown
Contributor Author

I've made the requested changes, let me know what you think!

@MaxMaeder MaxMaeder requested a review from sergiu128 February 20, 2025 15:27
@sergiu128
Copy link
Copy Markdown
Collaborator

Going to check it over the weekend, sry was swamped with work

Copy link
Copy Markdown
Collaborator

@sergiu128 sergiu128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All looks good, thank you for implementing the changes and all the work. 3 things left:

  • derabit should be deribit
  • TestClientMixedAsyncNextMessageDirectAndAsyncNextMessage is failing for me, i think buf needs to be sliced to a smaller length cuz the payloads look good but i'll let you take a look
  • pls rebase with master, I introduced some changes around setting the MaxMessageSize for a stream pre-handshake

Otherwise the code looks really good and this will get merged! Very curious to see the gains from this in production, I've ran some tests and I see that almost all exchanges Talos integrates with have a 1 fragment messages.

Comment thread examples/derabit/main.go
@@ -0,0 +1,103 @@
package main
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be renamed to "deribit" from "derabit"

Comment thread examples/derabit/main.go Outdated
}

if len(pl) != 1 {
panic("Derabit should only be sending single frame messages.")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deribit*

@MaxMaeder MaxMaeder requested a review from sergiu128 March 30, 2025 16:43
@MaxMaeder
Copy link
Copy Markdown
Contributor Author

Should all be fixed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants