88
99using namespace RpcUtils ::detail;
1010
11- #define NO_MSG -1
12- #define CALL_MSG 0
13- #define RESP_MSG 1
14- #define NOTIFY_MSG 2
1511
16- #define REQUEST_SIZE 4
17- #define RESPONSE_SIZE 4
18- #define NOTIFY_SIZE 3
12+
13+ #define MIN_RPC_BYTES 4
1914
2015#define MAX_BUFFER_SIZE 1024
2116#define CHUNK_SIZE 32
@@ -56,118 +51,45 @@ class RpcDecoder {
5651 template <typename RType>
5752 bool get_response (const int msg_id, RType& result, RpcError& error) {
5853
59- if (packet_type ()!=RESP_MSG) return false ;
54+ if (! packet_incoming () || packet_type ()!=RESP_MSG) return false ;
6055
6156 MsgPack::Unpacker unpacker;
57+ unpacker.clear ();
6258
63- size_t bytes_checked = 0 ;
59+ if (!unpacker. feed (_raw_buffer, get_packet_size ())) return false ;
6460
65- while (bytes_checked < _bytes_stored) {
66- bytes_checked++;
67- unpacker.clear ();
68- if (!unpacker.feed (_raw_buffer, bytes_checked)) continue ;
69- MsgPack::arr_size_t resp_size;
70- int resp_type;
71- int resp_id;
72- if (!unpacker.deserialize (resp_size, resp_type, resp_id)) continue ;
73- if (resp_size.size () != RESPONSE_SIZE) continue ;
74- if (resp_type != RESP_MSG) continue ;
75- if (resp_id != msg_id) continue ;
76-
77- MsgPack::object::nil_t nil;
78- if (unpacker.unpackable (nil)){ // No error
79- if (!unpacker.deserialize (nil, result)) continue ;
80- } else { // RPC returned an error
81- if (!unpacker.deserialize (error, nil)) continue ;
82- }
83- consume (bytes_checked);
84- return true ;
85- }
86- return false ;
87- }
88-
89- template <typename RType>
90- bool send_response (const int msg_id, const RpcError& error, const RType& result) {
91- MsgPack::Packer packer;
92- MsgPack::arr_size_t resp_size (RESPONSE_SIZE);
93- MsgPack::object::nil_t nil;
61+ MsgPack::arr_size_t resp_size;
62+ int resp_type;
63+ int resp_id;
9464
95- packer.clear ();
96- packer.serialize (resp_size, RESP_MSG, msg_id);
65+ if (!unpacker.deserialize (resp_size, resp_type, resp_id)) return false ;
66+ if (resp_size.size () != RESPONSE_SIZE) return false ;
67+ if (resp_type != RESP_MSG) return false ;
68+ if (resp_id != msg_id) return false ;
9769
98- if (error.code == NO_ERR){
99- packer.serialize (nil, result);
100- } else {
101- packer.serialize (error, nil);
70+ MsgPack::object::nil_t nil;
71+ if (unpacker.unpackable (nil)){ // No error
72+ if (!unpacker.deserialize (nil, result)) return false ;
73+ } else { // RPC returned an error
74+ if (!unpacker.deserialize (error, nil)) return false ;
10275 }
10376
104- return send (reinterpret_cast <const uint8_t *>(packer.data ()), packer.size ()) == packer.size ();
77+ consume (get_packet_size ());
78+ return true ;
10579
10680 }
10781
108- template <size_t N>
109- void process_requests (RpcFunctionDispatcher<N>& dispatcher) {
110- if (_packet_type!=CALL_MSG && _packet_type!=NOTIFY_MSG) return ;
111-
112- MsgPack::Unpacker unpacker;
113- MsgPack::Packer packer;
114-
115- size_t bytes_checked = 0 ;
116-
117- while (bytes_checked < _bytes_stored) {
118- bytes_checked++;
119- unpacker.clear ();
120- if (!unpacker.feed (_raw_buffer, bytes_checked)) continue ;
82+ bool send_response (const MsgPack::Packer& packer) {
83+ return send (reinterpret_cast <const uint8_t *>(packer.data ()), packer.size ()) == packer.size ();
84+ }
12185
122- int msg_type;
123- int msg_id;
124- MsgPack::str_t method;
125- MsgPack::arr_size_t req_size;
126-
127- if (!unpacker.deserialize (req_size, msg_type)) continue ;
128- // todo HANDLE MALFORMED CLIENT REQ ERRORS
129- if ((req_size.size () == REQUEST_SIZE) && (msg_type == CALL_MSG)){
130- if (!unpacker.deserialize (msg_id, method)) continue ;
131- if (unpacker.size () < REQUEST_SIZE + 1 ) continue ; // there must be at least 5 indices
132- } else if ((req_size.size () == NOTIFY_SIZE) && (msg_type == NOTIFY_MSG)) {
133- if (!unpacker.deserialize (method)) continue ;
134- if (unpacker.size () < NOTIFY_SIZE + 1 ) continue ; // there must be at least 4 indices
135- } else if ((req_size.size () == RESPONSE_SIZE) && (msg_type == RESP_MSG)) { // this should never happen but it's addressed to a client
136- break ;
137- } else {
138- discard_packet ();
139- break ;
140- }
141- // Headers unpacked
142-
143- MsgPack::arr_size_t resp_size (RESPONSE_SIZE);
144- packer.clear ();
145- if (msg_type == CALL_MSG) packer.serialize (resp_size, RESP_MSG, msg_id);
146- size_t headers_size = packer.size ();
147-
148- if (!dispatcher.call (method, unpacker, packer)) {
149- if (packer.size ()==headers_size) {
150- // Call didn't go through bc parameters are not ready yet
151- continue ;
152- } else {
153- // something went wrong the call raised an error or the client issued a malformed request
154- if (msg_type == CALL_MSG) {
155- send (reinterpret_cast <const uint8_t *>(packer.data ()), packer.size ());
156- } // if notification client will never know something went wrong
157- discard_packet (); // agnostic pop
158- break ;
159- }
160- } else {
161- // all is well we can respond and pop the deserialized packet
162- if (msg_type == CALL_MSG){
163- send (reinterpret_cast <const uint8_t *>(packer.data ()), packer.size ());
164- }
165- consume (bytes_checked);
166- break ;
167- }
86+ size_t get_request (uint8_t * buffer, size_t buffer_size) {
16887
88+ if (packet_type () != CALL_MSG && packet_type () != NOTIFY_MSG) {
89+ return 0 ; // No RPC
16990 }
17091
92+ return pop_packet (buffer, buffer_size);
17193 }
17294
17395 void decode (){
@@ -187,52 +109,45 @@ class RpcDecoder {
187109
188110 void parse_packet (){
189111
190- if (packet_incoming () || _bytes_stored < 2 ){return ;}
191-
192- MsgPack::Unpacker unpacker;
193- unpacker.clear ();
194- unpacker.feed (_raw_buffer, 2 );
195-
196- MsgPack::arr_size_t elem_size;
197- int type;
198- if (unpacker.deserialize (elem_size, type)){
199- _packet_type = type;
200- }
201-
202- }
203-
204- // Check if a packet is available
205- inline bool packet_incoming () const { return _packet_type >= CALL_MSG; }
206-
207- int packet_type () const {return _packet_type;}
208-
209- // Get the size of the next packet in the buffer (must be array contained, no other requirements)
210- size_t get_packet_size () {
112+ if (packet_incoming ()){return ;}
211113
212114 size_t bytes_checked = 0 ;
213115 size_t container_size;
116+ int type;
214117 MsgPack::Unpacker unpacker;
215118
216119 while (bytes_checked < _bytes_stored){
217120 bytes_checked++;
218121 unpacker.clear ();
219122 if (!unpacker.feed (_raw_buffer, bytes_checked)) continue ;
220123
221- if (unpackArray (unpacker, container_size)) {
222- return bytes_checked;
124+ if (unpackTypedArray (unpacker, container_size, type)) {
125+
126+ if (type != CALL_MSG && type != RESP_MSG && type != NOTIFY_MSG) {
127+ consume (bytes_checked);
128+ break ; // Not a valid RPC type (could be type=WRONG_MSG)
129+ }
130+
131+ if ((type == CALL_MSG && container_size != REQUEST_SIZE) || (type == RESP_MSG && container_size != RESPONSE_SIZE) || (type == NOTIFY_MSG && container_size != NOTIFY_SIZE)) {
132+ consume (bytes_checked);
133+ break ; // Not a valid RPC format
134+ }
135+
136+ _packet_type = type;
137+ _packet_size = bytes_checked;
223138 } else {
224139 continue ;
225140 }
226141
227142 }
228143
229- return 0 ;
230144 }
231145
232- // Discard the next (array) packet in the buffer, returns the number of bytes consumed.
233- size_t discard_packet () {
234- return consume (get_packet_size ());
235- }
146+ inline bool packet_incoming () const { return _packet_size >= MIN_RPC_BYTES; }
147+
148+ inline int packet_type () const { return _packet_type; }
149+
150+ size_t get_packet_size () const { return _packet_size;}
236151
237152 inline size_t size () const {return _bytes_stored;}
238153
@@ -241,6 +156,7 @@ class RpcDecoder {
241156 uint8_t _raw_buffer[BufferSize];
242157 size_t _bytes_stored = 0 ;
243158 int _packet_type = NO_MSG;
159+ size_t _packet_size = 0 ;
244160 int _msg_id = 0 ;
245161
246162 inline bool buffer_full () const { return _bytes_stored == BufferSize; }
@@ -251,7 +167,27 @@ class RpcDecoder {
251167 return _transport.write (data, size);
252168 }
253169
254- // Consume the first 'size' bytes of the buffer, shifting remaining data forward
170+ size_t pop_packet (uint8_t * buffer, size_t buffer_size) {
171+
172+ if (!packet_incoming ()) return 0 ;
173+
174+ size_t packet_size = get_packet_size ();
175+ if (packet_size > buffer_size) return 0 ;
176+
177+ for (size_t i = 0 ; i < packet_size; i++) {
178+ buffer[i] = _raw_buffer[i];
179+ }
180+
181+ reset_packet ();
182+ return consume (packet_size);
183+ }
184+
185+
186+ void reset_packet () {
187+ _packet_type = NO_MSG;
188+ _packet_size = 0 ;
189+ }
190+
255191 size_t consume (size_t size) {
256192
257193 if (size > _bytes_stored) return 0 ;
@@ -264,7 +200,6 @@ class RpcDecoder {
264200 }
265201
266202 _bytes_stored = remaining_bytes;
267- _packet_type = NO_MSG;
268203
269204 return size;
270205 }
0 commit comments