This repository was archived by the owner on Dec 17, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcache_simulator.py
More file actions
executable file
·172 lines (128 loc) · 7.14 KB
/
cache_simulator.py
File metadata and controls
executable file
·172 lines (128 loc) · 7.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from common import eprint
from cache import LRUCache, LFUCache, RRCache, DirectCache
# My Cache Simulator which controls all the caches in the hierarchy
class CacheSimulator():
# Initialises cache hierarchy
def __init__(self, cache_hierarchy_details, expected_mem_addr_bit_len):
# Create cache hierarchy from JSON description
caches_details = cache_hierarchy_details["caches"]
allowed_replacement_policies = ["rr", "lru", "lfu"]
self.expected_mem_addr_bit_len = expected_mem_addr_bit_len
self.cache_list = []
# Create each cache -> list order implicitly encodes hierarchy
for cache_details in caches_details:
replacement_policy = self._check_replacement_policy(cache_details, allowed_replacement_policies)
new_cache = self._cache_factory_method(replacement_policy, cache_details, expected_mem_addr_bit_len)
if new_cache == None:
eprint("[Error] Cache Simulation Aborted")
return None
self.cache_list.append(new_cache)
# Performance metric
self.main_memory_accesses = 0
# Handles any issues with the replacement policy such as it being omitted or not in the allowed set
def _check_replacement_policy(self, cache_details, allowed_replacement_policies):
replacement_policy = None
if cache_details["kind"] != "direct":
# Associative cache replacement policy
if "replacement_policy" in cache_details:
replacement_policy = cache_details["replacement_policy"]
# Check replacement policy is supported
if replacement_policy not in allowed_replacement_policies:
eprint("[Error] Invalid replacement policy - using Round Robin")
replacement_policy = "rr"
else:
# default to rr
replacement_policy = "rr"
return replacement_policy
# Create cache instance dependent on the caching strategy and replacement policy being used
def _cache_factory_method(self, replacement_policy, cache_details, expected_mem_addr_bit_len):
# Least recently used cache
if replacement_policy == "lru":
return LRUCache(
name = cache_details["name"],
size = cache_details["size"],
line_size = cache_details["line_size"],
kind = cache_details["kind"],
mem_addr_bit_length=expected_mem_addr_bit_len,
)
# Least Frequently used
elif replacement_policy == "lfu":
return LFUCache(
name = cache_details["name"],
size = cache_details["size"],
line_size = cache_details["line_size"],
kind = cache_details["kind"],
mem_addr_bit_length=expected_mem_addr_bit_len,
)
elif replacement_policy == "rr": # Round Robin
return RRCache(
name = cache_details["name"],
size = cache_details["size"],
line_size = cache_details["line_size"],
kind = cache_details["kind"],
mem_addr_bit_length=expected_mem_addr_bit_len,
)
elif replacement_policy == None:
return DirectCache( # direct mapped cache
name = cache_details["name"],
size = cache_details["size"],
line_size = cache_details["line_size"],
kind = cache_details["kind"],
mem_addr_bit_length=expected_mem_addr_bit_len,
)
else:
eprint("[Error] Issue creating cache object - aborting")
return None
"""
I have decided that memory operations which cross one or more cache line boundaries should be split into multiple
memory accesses which each fit within the cache line of the L1 cache. By doing this, larger caches (l2/3) may contain two accesses
in the same cache line, leading to improved recovery times from misses.
"""
def _split_mem_operation(self, mem_addr_int, mem_op_size, cache, mem_operations):
# Extract offset value from memory address (stored as an integer) through bitwise operations
offset_value = int(mem_addr_int & ((1 << cache.offset_length) - 1))
if cache.line_size < offset_value + mem_op_size: # crossing cache line boundary
# Extract index and tag values from memory address
index = int((mem_addr_int >> cache.offset_length) & ((1 << cache.index_length) - 1))
tag = int((mem_addr_int >> (cache.offset_length + cache.index_length)) & ((1 << cache.tag_length) - 1))
# Create mem operation using remainder of first cache line
new_mem_op_size = cache.line_size - offset_value
mem_operations.append((mem_addr_int, new_mem_op_size))
# Increase the 'index' and/or 'tag' of memory address to determine the next address for storing remaining data
if index + 1 == cache.number_of_sets: # If already in final cache set, need to increase tag
tag = tag + 1
index = 0
# check that tag doesn't need to be rolled over to 0, e.g. 1111 -> 0000
if tag > 2**cache.tag_length:
tag = 0
else: # 001 -> 010
index = index + 1
# New mem address containing updated tag and index
new_mem_addr_int = (tag << cache.index_length) | index
new_mem_addr_int = new_mem_addr_int << cache.offset_length # zero the offset
# Repeat splitting of memory operation using new cache line and reduced mem_op_size
self._split_mem_operation(new_mem_addr_int, mem_op_size - new_mem_op_size, cache, mem_operations)
else:
# memory operation fits in a single cache line
mem_operations.append((mem_addr_int, mem_op_size))
# Resets simulator state for processing trace file
def reset_simulation(self):
self.main_memory_accesses = 0
for cache in self.cache_list:
cache.reset_simulation()
# Step through trace file
def step(self, mem_addr, mem_op_size):
# Assuming L1 cache is smaller than lower caches (as it normally is) - split memory operation if it crosses cache boundaries
mem_operations = []
mem_addr_int = int(mem_addr, 16)
self._split_mem_operation(mem_addr_int, mem_op_size, self.cache_list[0], mem_operations)
# for each memory operation
for (mem_addr, mem_op_size) in mem_operations:
# check caches
for cache in self.cache_list:
is_hit = cache.step(mem_addr, mem_op_size)
if(is_hit):
break # No need to check lower caches
# Memory block could not be found -> has been added to caches during search (memory hierarchy basics)
if not(is_hit):
self.main_memory_accesses = self.main_memory_accesses + 1