1- use crate :: postgres_store:: {
2- VssDbRecord , LIST_KEY_VERSIONS_MAX_PAGE_SIZE , MAX_PUT_REQUEST_ITEM_COUNT ,
3- } ;
1+ use crate :: { VssDbRecord , LIST_KEY_VERSIONS_MAX_PAGE_SIZE , MAX_PUT_REQUEST_ITEM_COUNT } ;
42use api:: error:: VssError ;
53use api:: kv_store:: { KvStore , GLOBAL_VERSION_KEY , INITIAL_RECORD_VERSION } ;
64use api:: types:: {
@@ -10,41 +8,38 @@ use api::types::{
108use async_trait:: async_trait;
119use bytes:: Bytes ;
1210use chrono:: prelude:: Utc ;
13- use std:: collections:: HashMap ;
14- use std:: sync:: Arc ;
15- use tokio:: sync:: RwLock ;
11+ use std:: collections:: BTreeMap ;
12+ use tokio:: sync:: Mutex ;
1613
1714fn build_storage_key ( user_token : & str , store_id : & str , key : & str ) -> String {
1815 format ! ( "{}#{}#{}" , user_token, store_id, key)
1916}
2017
2118/// In-memory implementation of the VSS Store.
2219pub struct InMemoryBackendImpl {
23- store : Arc < RwLock < HashMap < String , VssDbRecord > > > ,
20+ store : Mutex < BTreeMap < String , VssDbRecord > > ,
2421}
2522
2623impl InMemoryBackendImpl {
2724 /// Creates an in-memory instance.
2825 pub fn new ( ) -> Self {
29- Self { store : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) }
26+ Self { store : Mutex :: new ( BTreeMap :: new ( ) ) }
3027 }
3128
3229 fn get_current_global_version (
33- & self , guard : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
30+ & self , guard : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
3431 ) -> i64 {
3532 let global_key = build_storage_key ( user_token, store_id, GLOBAL_VERSION_KEY ) ;
3633 guard. get ( & global_key) . map ( |r| r. version ) . unwrap_or ( 0 )
3734 }
3835}
3936
40- // Validation functions - check if operations can succeed without modifying data
4137fn validate_put_operation (
42- store : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
38+ store : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
4339) -> Result < ( ) , VssError > {
4440 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
4541
4642 if key_value. version == -1 {
47- // Non-conditional upsert always succeeds
4843 Ok ( ( ) )
4944 } else if key_value. version == 0 {
5045 if store. contains_key ( & key) {
@@ -75,12 +70,11 @@ fn validate_put_operation(
7570}
7671
7772fn validate_delete_operation (
78- store : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
73+ store : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
7974) -> Result < ( ) , VssError > {
8075 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
8176
8277 if key_value. version == -1 {
83- // Non-conditional delete always succeeds
8478 Ok ( ( ) )
8579 } else {
8680 if let Some ( existing) = store. get ( & key) {
@@ -101,20 +95,25 @@ fn validate_delete_operation(
10195 }
10296}
10397
104- fn execute_non_conditional_upsert (
105- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
98+ fn execute_put_object (
99+ store : & mut BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
100+ key_value : KeyValue ,
106101) {
107102 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
108103 let now = Utc :: now ( ) ;
109104
110105 match store. entry ( key) {
111- std:: collections:: hash_map :: Entry :: Occupied ( mut occ) => {
106+ std:: collections:: btree_map :: Entry :: Occupied ( mut occ) => {
112107 let existing = occ. get_mut ( ) ;
113- existing. version = INITIAL_RECORD_VERSION as i64 ;
108+ existing. version = if key_value. version == -1 {
109+ INITIAL_RECORD_VERSION as i64
110+ } else {
111+ existing. version . saturating_add ( 1 )
112+ } ;
114113 existing. value = key_value. value . to_vec ( ) ;
115114 existing. last_updated_at = now;
116115 } ,
117- std:: collections:: hash_map :: Entry :: Vacant ( vac) => {
116+ std:: collections:: btree_map :: Entry :: Vacant ( vac) => {
118117 let new_record = VssDbRecord {
119118 user_token : user_token. to_string ( ) ,
120119 store_id : store_id. to_string ( ) ,
@@ -129,51 +128,8 @@ fn execute_non_conditional_upsert(
129128 }
130129}
131130
132- fn execute_conditional_insert (
133- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
134- ) {
135- let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
136- let now = Utc :: now ( ) ;
137-
138- let new_record = VssDbRecord {
139- user_token : user_token. to_string ( ) ,
140- store_id : store_id. to_string ( ) ,
141- key : key_value. key ,
142- value : key_value. value . to_vec ( ) ,
143- version : INITIAL_RECORD_VERSION as i64 ,
144- created_at : now,
145- last_updated_at : now,
146- } ;
147- store. insert ( key, new_record) ;
148- }
149-
150- fn execute_conditional_update (
151- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
152- ) {
153- let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
154- let now = Utc :: now ( ) ;
155-
156- if let Some ( existing) = store. get_mut ( & key) {
157- existing. version = key_value. version . saturating_add ( 1 ) ;
158- existing. value = key_value. value . to_vec ( ) ;
159- existing. last_updated_at = now;
160- }
161- }
162-
163- fn execute_put_object (
164- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
165- ) {
166- if key_value. version == -1 {
167- execute_non_conditional_upsert ( store, user_token, store_id, key_value) ;
168- } else if key_value. version == 0 {
169- execute_conditional_insert ( store, user_token, store_id, key_value) ;
170- } else {
171- execute_conditional_update ( store, user_token, store_id, key_value) ;
172- }
173- }
174-
175131fn execute_delete_object (
176- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
132+ store : & mut BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
177133 key_value : & KeyValue ,
178134) {
179135 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
@@ -185,8 +141,21 @@ impl KvStore for InMemoryBackendImpl {
185141 async fn get (
186142 & self , user_token : String , request : GetObjectRequest ,
187143 ) -> Result < GetObjectResponse , VssError > {
144+ if request. key == GLOBAL_VERSION_KEY {
145+ let guard = self . store . lock ( ) . await ;
146+ let global_version =
147+ self . get_current_global_version ( & guard, & user_token, & request. store_id ) ;
148+ return Ok ( GetObjectResponse {
149+ value : Some ( KeyValue {
150+ key : GLOBAL_VERSION_KEY . to_string ( ) ,
151+ value : Bytes :: new ( ) ,
152+ version : global_version,
153+ } ) ,
154+ } ) ;
155+ }
156+
188157 let key = build_storage_key ( & user_token, & request. store_id , & request. key ) ;
189- let guard = self . store . read ( ) . await ;
158+ let guard = self . store . lock ( ) . await ;
190159
191160 if let Some ( record) = guard. get ( & key) {
192161 Ok ( GetObjectResponse {
@@ -196,14 +165,6 @@ impl KvStore for InMemoryBackendImpl {
196165 version : record. version ,
197166 } ) ,
198167 } )
199- } else if request. key == GLOBAL_VERSION_KEY {
200- Ok ( GetObjectResponse {
201- value : Some ( KeyValue {
202- key : GLOBAL_VERSION_KEY . to_string ( ) ,
203- value : Bytes :: new ( ) ,
204- version : 0 ,
205- } ) ,
206- } )
207168 } else {
208169 Err ( VssError :: NoSuchKeyError ( "Requested key not found." . to_string ( ) ) )
209170 }
@@ -220,8 +181,8 @@ impl KvStore for InMemoryBackendImpl {
220181 ) ) ) ;
221182 }
222183
223- let store_id = request. store_id . clone ( ) ;
224- let mut guard = self . store . write ( ) . await ;
184+ let store_id = request. store_id ;
185+ let mut guard = self . store . lock ( ) . await ;
225186
226187 if let Some ( version) = request. global_version {
227188 validate_put_operation (
@@ -267,8 +228,8 @@ impl KvStore for InMemoryBackendImpl {
267228 VssError :: InvalidRequestError ( "key_value missing in DeleteObjectRequest" . to_string ( ) )
268229 } ) ?;
269230
270- let store_id = request. store_id . clone ( ) ;
271- let mut guard = self . store . write ( ) . await ;
231+ let store_id = request. store_id ;
232+ let mut guard = self . store . lock ( ) . await ;
272233
273234 execute_delete_object ( & mut guard, & user_token, & store_id, & key_value) ;
274235
@@ -279,70 +240,42 @@ impl KvStore for InMemoryBackendImpl {
279240 & self , user_token : String , request : ListKeyVersionsRequest ,
280241 ) -> Result < ListKeyVersionsResponse , VssError > {
281242 let store_id = request. store_id ;
282- let key_prefix = request. key_prefix . unwrap_or ( "" . to_string ( ) ) ;
283- let page_token_option = request. page_token ;
284- let page_size = request. page_size . unwrap_or ( i32:: MAX ) ;
243+ let key_prefix = request. key_prefix . clone ( ) . unwrap_or ( "" . to_string ( ) ) ;
244+ let page_size = request. page_size . unwrap_or ( LIST_KEY_VERSIONS_MAX_PAGE_SIZE ) ;
285245 let limit = std:: cmp:: min ( page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE ) as usize ;
286246
287- let ( keys_with_versions , global_version ) = {
288- let guard = self . store . read ( ) . await ;
247+ let offset : usize =
248+ request . page_token . as_ref ( ) . and_then ( |s| s . parse :: < usize > ( ) . ok ( ) ) . unwrap_or ( 0 ) ;
289249
290- let mut global_version: Option < i64 > = None ;
291- if page_token_option. is_none ( ) {
292- global_version =
293- Some ( self . get_current_global_version ( & guard, & user_token, & store_id) ) ;
294- }
250+ let guard = self . store . lock ( ) . await ;
295251
296- let storage_prefix = format ! ( "{}#{}#" , user_token, store_id) ;
297- let mut temp: Vec < ( String , i64 ) > = Vec :: new ( ) ;
298-
299- for ( storage_key, r) in guard. iter ( ) {
300- if !storage_key. starts_with ( & storage_prefix) {
301- continue ;
302- }
303- let key = & storage_key[ storage_prefix. len ( ) ..] ;
304- if key == GLOBAL_VERSION_KEY {
305- continue ;
306- }
307- if !key_prefix. is_empty ( ) && !key. starts_with ( & key_prefix) {
308- continue ;
309- }
310- temp. push ( ( key. to_string ( ) , r. version ) ) ;
311- }
312-
313- ( temp, global_version)
314- } ;
315-
316- let mut keys_with_versions = keys_with_versions;
317- keys_with_versions. sort_by ( |a, b| a. 0 . cmp ( & b. 0 ) ) ;
318-
319- let start_idx = if page_token_option. is_none ( ) {
320- 0
321- } else if page_token_option. as_deref ( ) == Some ( "" ) {
322- keys_with_versions. len ( )
252+ let global_version = if offset == 0 {
253+ Some ( self . get_current_global_version ( & guard, & user_token, & store_id) )
323254 } else {
324- let token = page_token_option. as_deref ( ) . unwrap ( ) ;
325- keys_with_versions
326- . iter ( )
327- . position ( |( k, _) | k. as_str ( ) > token)
328- . unwrap_or ( keys_with_versions. len ( ) )
255+ None
329256 } ;
330257
331- let page_items: Vec < KeyValue > = keys_with_versions
258+ let storage_prefix = build_storage_key ( & user_token, & store_id, & key_prefix) ;
259+ let global_key = build_storage_key ( & user_token, & store_id, GLOBAL_VERSION_KEY ) ;
260+ let prefix_len = format ! ( "{}#{}#" , user_token, store_id) . len ( ) ;
261+ let page_items: Vec < KeyValue > = guard
332262 . iter ( )
333- . skip ( start_idx)
263+ . filter ( |( storage_key, _) | {
264+ storage_key. starts_with ( & storage_prefix) && * storage_key != & global_key
265+ } )
266+ . skip ( offset)
334267 . take ( limit)
335- . map ( |( key, version) | KeyValue {
336- key : key. clone ( ) ,
337- value : Bytes :: new ( ) ,
338- version : * version,
268+ . map ( |( storage_key, record) | {
269+ let key = & storage_key[ prefix_len..] ;
270+ KeyValue { key : key. to_string ( ) , value : Bytes :: new ( ) , version : record. version }
339271 } )
340272 . collect ( ) ;
341273
274+ let next_offset = offset + page_items. len ( ) ;
342275 let next_page_token = if page_items. is_empty ( ) {
343276 Some ( "" . to_string ( ) )
344277 } else {
345- page_items . last ( ) . map ( |kv| kv . key . clone ( ) )
278+ Some ( next_offset . to_string ( ) )
346279 } ;
347280
348281 Ok ( ListKeyVersionsResponse { key_versions : page_items, next_page_token, global_version } )
0 commit comments