Skip to content

Commit aefd20f

Browse files
authored
Merge pull request #25 from bcmi-labs/concurrent_features
Concurrent features
2 parents 72b0eef + e2bb2ef commit aefd20f

File tree

7 files changed

+252
-87
lines changed

7 files changed

+252
-87
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#pragma once
2+
#ifndef RPCLITE_DECODER_TESTER_H
3+
#define RPCLITE_DECODER_TESTER_H
4+
5+
class DecoderTester {
6+
7+
RpcDecoder<>& decoder;
8+
9+
public:
10+
11+
DecoderTester(RpcDecoder<>& _d): decoder(_d){}
12+
13+
void crop_bytes(size_t size, size_t offset){
14+
decoder.consume(size, offset);
15+
}
16+
17+
void print_raw_buf(){
18+
19+
Serial.print("Decoder raw buffer content: ");
20+
21+
for (size_t i = 0; i < decoder._bytes_stored; i++) {
22+
23+
Serial.print(decoder._raw_buffer[i], HEX);
24+
Serial.print(" ");
25+
}
26+
Serial.println("");
27+
}
28+
29+
};
30+
31+
#endif // RPCLITE_DECODER_TESTER_H

examples/decoder_tests/decoder_tests.ino

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <Arduino_RPClite.h>
22
#include "DummyTransport.h"
3+
#include "decoder_tester.h"
34

45
// Shorthand
56
MsgPack::Packer packer;
@@ -36,6 +37,35 @@ void runDecoderTest(const char* label) {
3637
Serial.println("-- Done --\n");
3738
}
3839

40+
void runDecoderConsumeTest(const char* label, size_t second_packet_sz) {
41+
Serial.println(label);
42+
43+
print_buf();
44+
DummyTransport dummy_transport(packer.data(), packer.size());
45+
RpcDecoder<> decoder(dummy_transport);
46+
47+
DecoderTester dt(decoder);
48+
49+
while (!decoder.packet_incoming()) {
50+
Serial.println("Packet not ready");
51+
decoder.decode();
52+
delay(50);
53+
}
54+
55+
size_t pack_size = decoder.get_packet_size();
56+
Serial.print("1st Packet size: ");
57+
Serial.println(pack_size);
58+
59+
Serial.print("Consuming 2nd packet of given size: ");
60+
Serial.println(second_packet_sz);
61+
62+
dt.crop_bytes(second_packet_sz, pack_size);
63+
64+
dt.print_raw_buf();
65+
66+
Serial.println("-- Done --\n");
67+
}
68+
3969
void testNestedArrayRequest() {
4070
packer.clear();
4171
MsgPack::arr_size_t outer_arr(3);
@@ -120,6 +150,9 @@ void testMultipleRpcPackets() {
120150
packer.serialize(req_sz, 0, 2, "echo", par_sz, "Hello", true);
121151

122152
runDecoderTest("== Test: Multiple RPCs in Buffer ==");
153+
154+
runDecoderConsumeTest("== Test: Mid-buffer consume ==", 5);
155+
123156
}
124157

125158
// Binary parameter (e.g., binary blob)
@@ -170,6 +203,8 @@ void testCombinedComplexBuffer() {
170203

171204
void setup() {
172205
Serial.begin(115200);
206+
while(!Serial);
207+
173208
delay(1000);
174209
Serial.println("=== RPC Decoder Nested Tests ===");
175210

src/client.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
class RPCClient {
99
RpcDecoder<>* decoder = nullptr;
10-
uint32_t _waiting_msg_id;
1110

1211
public:
1312
RpcError lastError;
@@ -29,36 +28,39 @@ class RPCClient {
2928
template<typename RType, typename... Args>
3029
bool call(const MsgPack::str_t method, RType& result, Args&&... args) {
3130

32-
if(!send_rpc(method, std::forward<Args>(args)...)) {
31+
uint32_t msg_id_wait;
32+
33+
if(!send_rpc(method, msg_id_wait, std::forward<Args>(args)...)) {
3334
lastError.code = GENERIC_ERR;
3435
lastError.traceback = "Failed to send RPC call";
3536
return false;
3637
}
3738

3839
// blocking call
39-
while (!get_response(result)){
40+
while (!get_response(msg_id_wait, result)){
41+
//delay(1);
4042
}
4143

4244
return (lastError.code == NO_ERR);
4345

4446
}
4547

4648
template<typename... Args>
47-
bool send_rpc(const MsgPack::str_t method, Args&&... args) {
49+
bool send_rpc(const MsgPack::str_t method, uint32_t& wait_id, Args&&... args) {
4850
uint32_t msg_id;
4951
if (decoder->send_call(CALL_MSG, method, msg_id, std::forward<Args>(args)...)) {
50-
_waiting_msg_id = msg_id;
52+
wait_id = msg_id;
5153
return true;
5254
}
5355
return false;
5456
}
5557

5658
template<typename RType>
57-
bool get_response(RType& result) {
59+
bool get_response(const uint32_t wait_id, RType& result) {
5860
RpcError tmp_error;
5961
decoder->decode();
6062

61-
if (decoder->get_response(_waiting_msg_id, result, tmp_error)) {
63+
if (decoder->get_response(wait_id, result, tmp_error)) {
6264
lastError.code = tmp_error.code;
6365
lastError.traceback = tmp_error.traceback;
6466
return true;

src/decoder.h

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,56 @@ class RpcDecoder {
8282
return send(reinterpret_cast<const uint8_t*>(packer.data()), packer.size()) == packer.size();
8383
}
8484

85+
MsgPack::str_t fetch_rpc_method(){
86+
87+
if (!packet_incoming()){return "";}
88+
89+
if (_packet_type != CALL_MSG && _packet_type != NOTIFY_MSG) {
90+
return ""; // No RPC
91+
}
92+
93+
MsgPack::Unpacker unpacker;
94+
95+
unpacker.clear();
96+
if (!unpacker.feed(_raw_buffer, _packet_size)) { // feed should not fail at this point
97+
consume(_packet_size);
98+
reset_packet();
99+
return "";
100+
};
101+
102+
int msg_type;
103+
int msg_id;
104+
MsgPack::str_t method;
105+
MsgPack::arr_size_t req_size;
106+
107+
if (!unpacker.deserialize(req_size, msg_type)) {
108+
consume(_packet_size);
109+
reset_packet();
110+
return ""; // Header not unpackable
111+
}
112+
113+
if (msg_type == CALL_MSG && req_size.size() == REQUEST_SIZE) {
114+
if (!unpacker.deserialize(msg_id, method)) {
115+
consume(_packet_size);
116+
reset_packet();
117+
return ""; // Method not unpackable
118+
}
119+
} else if (msg_type == NOTIFY_MSG && req_size.size() == NOTIFY_SIZE) {
120+
if (!unpacker.deserialize(method)) {
121+
consume(_packet_size);
122+
reset_packet();
123+
return ""; // Method not unpackable
124+
}
125+
} else {
126+
consume(_packet_size);
127+
reset_packet();
128+
return ""; // Invalid request size/type
129+
}
130+
131+
return method;
132+
133+
}
134+
85135
size_t get_request(uint8_t* buffer, size_t buffer_size) {
86136

87137
if (_packet_type != CALL_MSG && _packet_type != NOTIFY_MSG) {
@@ -151,6 +201,8 @@ class RpcDecoder {
151201

152202
inline size_t size() const {return _bytes_stored;}
153203

204+
friend class DecoderTester;
205+
154206
private:
155207
ITransport& _transport;
156208
uint8_t _raw_buffer[BufferSize];
@@ -197,22 +249,19 @@ class RpcDecoder {
197249
_packet_size = 0;
198250
}
199251

200-
size_t consume(size_t size) {
201-
202-
if (size > _bytes_stored) return 0;
203-
204-
const size_t remaining_bytes = _bytes_stored - size;
205-
206-
// Shift remaining data forward (manual memmove for compatibility)
207-
for (size_t i = 0; i < remaining_bytes; i++) {
208-
_raw_buffer[i] = _raw_buffer[size + i];
209-
}
210-
211-
_bytes_stored = remaining_bytes;
212-
213-
return size;
252+
size_t consume(size_t size, size_t offset = 0) {
253+
// Boundary checks
254+
if (offset + size > _bytes_stored || size == 0) return 0;
255+
256+
size_t remaining_bytes = _bytes_stored - size;
257+
for (size_t i=offset; i<remaining_bytes; i++){
258+
_raw_buffer[i] = _raw_buffer[i+size];
214259
}
215260

261+
_bytes_stored = remaining_bytes;
262+
return size;
263+
}
264+
216265
};
217266

218267
#endif

src/dispatcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class RpcFunctionDispatcher {
3434
return false;
3535
}
3636

37-
bool hasTag(MsgPack::str_t name,MsgPack::str_t tag) const {
37+
bool hasTag(MsgPack::str_t name, MsgPack::str_t tag) const {
3838
for (size_t i = 0; i < _count; ++i) {
3939
if (_entries[i].name == name && _entries[i].tag == tag) {
4040
return true;

src/request.h

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#ifndef RPCLITE_REQUEST_H
2+
#define RPCLITE_REQUEST_H
3+
4+
#define DEFAULT_RPC_BUFFER_SIZE 256
5+
6+
7+
#include "rpclite_utils.h"
8+
9+
template<size_t BufferSize = DEFAULT_RPC_BUFFER_SIZE>
10+
class RPCRequest {
11+
12+
public:
13+
uint8_t buffer[BufferSize];
14+
size_t size = 0;
15+
int type = NO_MSG;
16+
uint32_t msg_id = 0;
17+
MsgPack::str_t method;
18+
MsgPack::Packer packer;
19+
MsgPack::Unpacker unpacker;
20+
21+
// void print(){
22+
23+
// Serial.print("internal buffer ");
24+
// for (size_t i=0; i<size; i++){
25+
// Serial.print(buffer[i], HEX);
26+
// }
27+
// Serial.println("");
28+
// }
29+
30+
size_t get_buffer_size() const {
31+
return BufferSize;
32+
}
33+
34+
bool unpack_request_headers(){
35+
if (size == 0) return false;
36+
37+
unpacker.clear();
38+
if (!unpacker.feed(buffer, size)) return false;
39+
40+
int msg_type;
41+
uint32_t req_id;
42+
MsgPack::str_t req_method;
43+
MsgPack::arr_size_t req_size;
44+
45+
if (!unpacker.deserialize(req_size, msg_type)) {
46+
return false; // Header not unpackable
47+
}
48+
49+
if (msg_type == CALL_MSG && req_size.size() == REQUEST_SIZE) {
50+
if (!unpacker.deserialize(req_id, req_method)) {
51+
return false; // Method not unpackable
52+
}
53+
} else if (msg_type == NOTIFY_MSG && req_size.size() == NOTIFY_SIZE) {
54+
if (!unpacker.deserialize(req_method)) {
55+
return false; // Method not unpackable
56+
}
57+
} else {
58+
return false; // Invalid request size/type
59+
}
60+
61+
method = req_method;
62+
type = msg_type;
63+
msg_id = req_id;
64+
65+
return true;
66+
67+
}
68+
69+
void pack_response_headers(){
70+
packer.clear();
71+
MsgPack::arr_size_t resp_size(RESPONSE_SIZE);
72+
if (type == CALL_MSG) packer.serialize(resp_size, RESP_MSG, msg_id);
73+
}
74+
75+
void reset(){
76+
size = 0;
77+
type = NO_MSG;
78+
msg_id = 0;
79+
method = "";
80+
unpacker.clear();
81+
packer.clear();
82+
}
83+
84+
};
85+
86+
#endif RPCLITE_REQUEST_H

0 commit comments

Comments
 (0)