Skip to content

Commit e2bfaf5

Browse files
committed
New change with channels
1 parent f59a4a2 commit e2bfaf5

File tree

1 file changed

+103
-87
lines changed

1 file changed

+103
-87
lines changed

pkg/oramExecutor/oramExecutor.go

Lines changed: 103 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,28 @@ type Operation struct {
2828
Index int
2929
}
3030

31-
type ResponseTracker struct {
32-
Values []string
33-
Count int32 // Use atomic operations
34-
Completed chan struct{}
31+
type KVPair struct {
32+
channelId string
33+
Key string
34+
Value string
35+
}
36+
37+
type responseChannel struct {
38+
m *sync.RWMutex
39+
channel chan KVPair
3540
}
3641

3742
type MyOram struct {
3843
executor.UnimplementedExecutorServer
3944
o *ORAM
4045

41-
opMutex sync.Mutex
42-
opCond *sync.Cond
43-
opQueue []Operation
46+
batchSize int
4447

45-
trackerMutex sync.Mutex
46-
trackers map[uint64]*ResponseTracker
48+
channelMap map[string]responseChannel
49+
requestNumber atomic.Int64
50+
channelLock sync.RWMutex
4751

48-
requestID uint64 // atomic
49-
batchSize int
52+
oramExecutorChannel chan *KVPair
5053
}
5154

5255
type tempBlock struct {
@@ -63,99 +66,108 @@ func (e *MyOram) ExecuteBatch(ctx context.Context, req *executor.RequestBatchORA
6366
return nil, fmt.Errorf("keys and values length mismatch")
6467
}
6568

66-
// Generate unique request ID
67-
id := atomic.AddUint64(&e.requestID, 1)
68-
numOps := len(req.Keys)
69+
reqNum := e.requestNumber.Add(1) // New id for this client/batch channel
70+
71+
recv_resp := make([]KVPair, 0, len(req.Keys)) // This will store completed key value pairs
72+
73+
channelId := fmt.Sprintf("%d-%d", req.RequestId, reqNum)
74+
localRespChannel := make(chan KVPair, len(req.Keys))
6975

70-
// Setup response tracker
71-
tracker := &ResponseTracker{
72-
Values: make([]string, numOps),
73-
Count: 0,
74-
Completed: make(chan struct{}),
76+
e.channelLock.Lock() // Add channel to global map
77+
e.channelMap[channelId] = responseChannel{
78+
m: &sync.RWMutex{},
79+
channel: localRespChannel,
80+
}
81+
e.channelLock.Unlock()
82+
83+
sent := 0
84+
for i, key := range req.Keys {
85+
value := req.Values[i]
86+
kv := &KVPair{
87+
channelId: channelId,
88+
Key: key,
89+
Value: value,
90+
}
91+
// Block if the channel is full
92+
sent++
93+
e.oramExecutorChannel <- kv
7594
}
7695

77-
// Register tracker
78-
e.trackerMutex.Lock()
79-
e.trackers[id] = tracker
80-
e.trackerMutex.Unlock()
81-
82-
// Queue operations
83-
e.opMutex.Lock()
84-
for i := 0; i < numOps; i++ {
85-
e.opQueue = append(e.opQueue, Operation{
86-
RequestID: id,
87-
Key: req.Keys[i],
88-
Value: req.Values[i],
89-
Index: i,
90-
})
96+
// Finished adding keys to ORAM channel
97+
98+
// Now wait for responses
99+
for i := 0; i < len(req.Keys); i++ {
100+
item := <-localRespChannel
101+
recv_resp = append(recv_resp, item)
91102
}
92-
e.opCond.Signal() // Notify batch processor
93-
e.opMutex.Unlock()
94103

95-
// Wait for completion
96-
<-tracker.Completed
104+
close(localRespChannel)
105+
106+
e.channelLock.Lock()
107+
delete(e.channelMap, channelId)
108+
e.channelLock.Unlock()
109+
110+
sendKeys := make([]string, 0, len(req.Keys))
111+
sendVal := make([]string, 0, len(req.Keys))
112+
113+
for _, v := range recv_resp {
114+
sendKeys = append(sendKeys, v.Key)
115+
sendVal = append(sendVal, v.Value)
116+
}
97117

98118
// Return response with original request ID
99119
return &executor.RespondBatchORAM{
100120
RequestId: req.RequestId,
101-
Keys: req.Keys,
102-
Values: tracker.Values,
121+
Keys: sendKeys,
122+
Values: sendVal,
103123
}, nil
104124
}
105125

106126
func (e *MyOram) processBatches() {
107127
for {
108-
e.opMutex.Lock()
109-
// Wait for operations to process
110-
for len(e.opQueue) == 0 {
111-
e.opCond.Wait()
112-
}
113128

114-
// Determine batch size
115-
batchSize := e.batchSize
116-
if len(e.opQueue) < batchSize {
117-
e.opCond.Wait()
118-
}
119-
batchOps := e.opQueue[:batchSize]
120-
e.opQueue = e.opQueue[batchSize:]
121-
e.opMutex.Unlock()
122-
123-
// Prepare ORAM batch request
124-
var requestList []Request
125-
for _, op := range batchOps {
126-
requestList = append(requestList, Request{
127-
Key: op.Key,
128-
Value: op.Value,
129-
})
130-
}
129+
if len(e.oramExecutorChannel) >= e.batchSize {
130+
var requestList []Request
131131

132-
// Execute ORAM batch
133-
returnValues, err := e.o.Batching(requestList, batchSize)
134-
if err != nil {
135-
// Handle error (e.g., log and continue)
136-
fmt.Printf("ORAM batch error: %v\n", err)
137-
continue
138-
}
132+
var chanIds []string
139133

140-
// Distribute responses to trackers
141-
e.trackerMutex.Lock()
142-
for i, op := range batchOps {
143-
tracker, exists := e.trackers[op.RequestID]
144-
if !exists {
145-
continue // Tracker already removed
134+
for i := 0; i < e.batchSize; i++ {
135+
op := <-e.oramExecutorChannel // Read from channel
136+
137+
chanIds = append(chanIds, op.channelId)
138+
139+
requestList = append(requestList, Request{
140+
Key: op.Key,
141+
Value: op.Value,
142+
})
143+
}
144+
// Execute ORAM batch
145+
returnValues, err := e.o.Batching(requestList, e.batchSize)
146+
if err != nil {
147+
// Handle error (e.g., log and continue)
148+
fmt.Printf("ORAM batch error: %v\n", err)
149+
continue
146150
}
147151

148-
// Update value atomically
149-
tracker.Values[op.Index] = returnValues[i]
150-
count := atomic.AddInt32(&tracker.Count, 1)
152+
channelCache := make(map[string]chan KVPair, e.batchSize)
153+
154+
e.channelLock.RLock()
155+
for _, v := range chanIds {
156+
157+
channelCache[v] = e.channelMap[v].channel
158+
159+
}
160+
e.channelLock.RUnlock()
151161

152-
// Check if all responses received
153-
if int(count) == len(tracker.Values) {
154-
close(tracker.Completed)
155-
delete(e.trackers, op.RequestID)
162+
for i := 0; i < e.batchSize; i++ {
163+
newKVPair := KVPair{
164+
Key: requestList[i].Key,
165+
Value: returnValues[i],
166+
}
167+
responseChannel := channelCache[chanIds[i]]
168+
responseChannel <- newKVPair
156169
}
157170
}
158-
e.trackerMutex.Unlock()
159171
}
160172
}
161173

@@ -187,6 +199,7 @@ func NewORAM(LogCapacity, Z, StashSize int, redisAddr string, tracefile string,
187199
// Load the Stashmap and Keymap into memory
188200
// Allow redis to update state using dump.rdb
189201
oram.loadSnapshotMaps()
202+
fmt.Println("ORAM snapshot loaded successfully!")
190203
} else {
191204
// Clear the Redis database to ensure a fresh start
192205
if err := client.FlushDB(); err != nil {
@@ -247,12 +260,15 @@ func NewORAM(LogCapacity, Z, StashSize int, redisAddr string, tracefile string,
247260
}
248261

249262
myOram := &MyOram{
250-
o: oram,
251-
batchSize: 60, // Set from config or constant
252-
trackers: make(map[uint64]*ResponseTracker),
253-
opQueue: make([]Operation, 0),
263+
o: oram,
264+
batchSize: 60, // Set from config or constant
265+
channelMap: make(map[string]responseChannel),
266+
channelLock: sync.RWMutex{},
267+
oramExecutorChannel: make(chan *KVPair),
254268
}
255-
myOram.opCond = sync.NewCond(&myOram.opMutex)
269+
270+
myOram.oramExecutorChannel = make(chan *KVPair, 100000)
271+
256272
go myOram.processBatches() // Start batch processing
257273

258274
return myOram, nil

0 commit comments

Comments
 (0)