@@ -375,109 +375,6 @@ impl NodeStore {
375375 let _: ( ) = pipe. query_async ( & mut con) . await ?;
376376 Ok ( ( ) )
377377 }
378- /// Count how many nodes are already in hash format
379- pub async fn count_non_hash_format_nodes ( & self ) -> Result < usize > {
380- let mut con = self . redis . client . get_multiplexed_async_connection ( ) . await ?;
381- let addresses: Vec < String > = con. smembers ( ORCHESTRATOR_NODE_INDEX ) . await ?;
382-
383- let mut non_hash_count = 0 ;
384-
385- // Process in batches for better performance
386- const BATCH_SIZE : usize = 100 ;
387-
388- for chunk in addresses. chunks ( BATCH_SIZE ) {
389- // Check types of all keys in this batch
390- let mut type_pipe = redis:: pipe ( ) ;
391- let keys: Vec < String > = chunk
392- . iter ( )
393- . map ( |addr| format ! ( "{}:{}" , ORCHESTRATOR_BASE_KEY , addr) )
394- . collect ( ) ;
395-
396- for key in & keys {
397- type_pipe. cmd ( "TYPE" ) . arg ( key) ;
398- }
399-
400- let types: Vec < String > = type_pipe. query_async ( & mut con) . await ?;
401-
402- // Count non-hash format keys (including "string" and "none" types)
403- for key_type in types. iter ( ) {
404- if key_type != "hash" && key_type != "none" {
405- non_hash_count += 1 ;
406- }
407- }
408- }
409-
410- Ok ( non_hash_count)
411- }
412-
413- /// One-time migration from JSON to hash format
414- /// Run this once to convert all existing nodes
415- pub async fn migrate_json_to_hash ( & self ) -> Result < ( usize , usize ) > {
416- let mut con = self . redis . client . get_multiplexed_async_connection ( ) . await ?;
417- let addresses: Vec < String > = con. smembers ( ORCHESTRATOR_NODE_INDEX ) . await ?;
418-
419- let mut migrated = 0 ;
420-
421- // Get count of nodes not yet in hash format
422- let non_hash_count = self . count_non_hash_format_nodes ( ) . await ?;
423-
424- // Process in batches for better performance
425- const BATCH_SIZE : usize = 100 ;
426-
427- for chunk in addresses. chunks ( BATCH_SIZE ) {
428- // First, check types of all keys in this batch
429- let mut type_pipe = redis:: pipe ( ) ;
430- let keys: Vec < String > = chunk
431- . iter ( )
432- . map ( |addr| format ! ( "{}:{}" , ORCHESTRATOR_BASE_KEY , addr) )
433- . collect ( ) ;
434-
435- for key in & keys {
436- type_pipe. cmd ( "TYPE" ) . arg ( key) ;
437- }
438-
439- let types: Vec < String > = type_pipe. query_async ( & mut con) . await ?;
440-
441- // Build migration pipeline for string keys
442- let mut migration_pipe = redis:: pipe ( ) ;
443- migration_pipe. atomic ( ) ;
444- let mut batch_migrated = 0 ;
445-
446- for ( i, key_type) in types. iter ( ) . enumerate ( ) {
447- if key_type == "string" {
448- let key = & keys[ i] ;
449-
450- // Get the JSON value
451- let value: Option < String > = con. get ( key) . await ?;
452-
453- if let Some ( json_str) = value {
454- if let Ok ( node) = serde_json:: from_str :: < OrchestratorNode > ( & json_str) {
455- let fields = Self :: node_to_hash_fields ( & node) ?;
456-
457- // Delete old string key and create new hash atomically
458- migration_pipe. del ( key) ;
459- migration_pipe. hset_multiple ( key, & fields) ;
460-
461- batch_migrated += 1 ;
462- }
463- }
464- }
465- }
466-
467- // Execute migration for this batch
468- if batch_migrated > 0 {
469- let _: ( ) = migration_pipe. query_async ( & mut con) . await ?;
470- migrated += batch_migrated;
471- }
472- }
473-
474- let already_hash = addresses. len ( ) - non_hash_count;
475- info ! (
476- "Migration complete: {} nodes migrated, {} already in hash format" ,
477- migrated, already_hash
478- ) ;
479- Ok ( ( migrated, already_hash) )
480- }
481378}
482379
483380#[ cfg( test) ]
0 commit comments