Skip to content

GainSight Frontend Python API Documentation

This page documents the frontend Python scripts in gainsight/frontend/ using mkdocstrings. Please refer to the frontend wiki for a summary of implementation details and usage instructions.


Frontend Script for Accel-Sim Backend

gain_cell_frontend.py

Module for analyzing gain cell memory behavior in GPU workloads.

Provides the GainCellFrontend class to process profiling data and compute write frequencies, retention times, refresh requirements, area, and energy for different gain cell technologies.

Contains helper functions for JSON serialization and command line execution.

GainCellFrontend

Frontend class for analyzing gain cell memory behavior in GPU workloads.

Processes profiling data and computes write frequencies, retention times, refresh requirements, area, and energy for different gain cell technologies. Use the run method to execute the full analysis pipeline and return the results as a JSON-serializable dict.

Parameters:

Name Type Description Default
profile_results_path str

Path to the profiling results CSV file.

required
simulation bool

Whether the profiling results are from simulation. Defaults to True.

True
sample bool

Whether the profiling used sampling techniques. Defaults to False.

False
cluster_path str | None

CSV path for cluster data when sampling is used. Defaults to None.

None
freq_retention_dict_path str

Path to frequency-retention JSON dict. Defaults to "simple_gc_list.json".

'simple_gc_list.json'
area_power_dict_path str

Path to area-power JSON dict. Defaults to "area_power.json".

'area_power.json'

Returns:

Name Type Description
None

Initializes the frontend instance.

Source code in frontend/gain_cell_frontend.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
class GainCellFrontend:
    """Frontend class for analyzing gain cell memory behavior in GPU workloads.

    Processes profiling data and computes write frequencies, retention times, refresh requirements,
    area, and energy for different gain cell technologies. Use the `run` method to execute
    the full analysis pipeline and return the results as a JSON-serializable dict.

    Args:
        profile_results_path (str): Path to the profiling results CSV file.
        simulation (bool): Whether the profiling results are from simulation. Defaults to True.
        sample (bool): Whether the profiling used sampling techniques. Defaults to False.
        cluster_path (str | None): CSV path for cluster data when sampling is used. Defaults to None.
        freq_retention_dict_path (str): Path to frequency-retention JSON dict. Defaults to "simple_gc_list.json".
        area_power_dict_path (str): Path to area-power JSON dict. Defaults to "area_power.json".

    Returns:
        None: Initializes the frontend instance.
    """

    def __init__(self,
                 profile_results_path: str,
                 simulation: bool = True,
                 sample: bool = False,
                 cluster_path: str = None,
                 freq_retention_dict_path: str = "simple_gc_list.json",
                 area_power_dict_path: str = "area_power.json") -> None:
        """Initialize the GainCellFrontend with profiling data.

        Sets up constants, loads dictionaries, and imports profile data for analysis.

        Args:
            profile_results_path (str): Path to the profiling results CSV file.
            simulation (bool): Indicates if data is from simulation. Defaults to True.
            sample (bool): Indicates if sampling was used. Defaults to False.
            cluster_path (str | None): Path to cluster CSV when sampling. Defaults to None.
            freq_retention_dict_path (str): Path to frequency-retention JSON dict. Defaults to "simple_gc_list.json".
            area_power_dict_path (str): Path to area-power JSON dict. Defaults to "area_power.json".

        Returns:
            None
        """
        # Constants
        # Lovelace GPU has frequency of 2235
        self.GPU_FREQ = int(os.getenv('GPU_FREQ', 2235))
        # Time in ns for one cycle
        self.CYCLE_TIME = 1e9 / (self.GPU_FREQ * 1e6)

        # profile_results_path is in the form of
        # logs/generate/generate_2025-03-03_18-20-21.sim.csv
        # Get the workload name from the path
        basename = os.path.basename(profile_results_path)

        try:
            basename_split = basename.split(".")
            basename_sans_ext = ".".join(basename_split[:-2])
            basename_parts = basename_sans_ext.split("_")
            self.time = basename_parts[-1]
            self.date = basename_parts[-2]
            self.workload_name = "_".join(basename_parts[:-2])
        except:
            print(
                f"Warning: Exception occurred while parsing the workload name from {basename}.")
            self.workload_name = basename
            # Get the current date and time
            now = datetime.now()
            self.date = now.strftime("%Y-%m-%d")
            self.time = now.strftime("%H-%M-%S")

        # Read in the gain cell dictionary
        self._import_gc_dict(freq_retention_dict_path)
        self.silicon_ret_16nm = 77  # from Giterman
        self.silicon_ret_5nm = 1  # from Shuhan
        self.SM_COUNT = 114 # number of SMs in the H100 GPU

        # Read in the area and power dictionary
        self.gain_cell_size = {}
        self.gain_cell_power = {}
        with open(area_power_dict_path, "r") as f:
            area_power_list = list(json.load(f).items())
            self.gain_cell_size = {
                i[0]: i[1].get("area", 0) for i in area_power_list}
            self.gain_cell_power = {
                i[0]: i[1].get("power", 0) for i in area_power_list}

        # Initialize profile result dataframes
        self.kernel_df = None
        self.l1_df = None
        self.l2_df = None
        self.cluster_df = None
        self.sample = sample
        # Read in the profile data
        self._import_profile_data(
            profile_path=profile_results_path,
            simulation=simulation,
            sample=self.sample,
            cluster_path=cluster_path
        )

        # Assert that if cluster is True, simulation must also be True
        assert not (simulation == False and sample == True), \
            "If cluster is True, simulation must also be True"

    def _import_gc_dict(self, gc_list_path: str = "simple_gc_list.json") -> None:
        """Import gain cell frequency and retention data from JSON file.

        Reads JSON file containing write frequencies and retention times and
        converts values to NumPy arrays stored in `self.freq_retention_dict`.

        Args:
            gc_list_path (str): Path to the frequency-retention JSON file. Defaults to "simple_gc_list.json".

        Returns:
            None: Populates `self.freq_retention_dict` attribute.
        """
        with open(gc_list_path, "r") as f:
            freq_retention_dict = json.load(f)

        write_freq = [freq_retention_dict[i]["write_freq"]
                      for i in range(len(freq_retention_dict))]
        hybrid_retention = [freq_retention_dict[i]["hybrid_retention"]
                            for i in range(len(freq_retention_dict))]
        oxide_retention = [freq_retention_dict[i]["oxide_retention"]
                           for i in range(len(freq_retention_dict))]

        # Convert to numpy arrays
        write_freq = np.array(write_freq)
        hybrid_retention = np.array(hybrid_retention)
        oxide_retention = np.array(oxide_retention)

        # Convert write_freq to MHz
        write_freq = write_freq / 1e6
        # Convert retention time to microseconds
        hybrid_retention = hybrid_retention * 1e6
        oxide_retention = oxide_retention * 1e6

        # Convert to Dict
        self.freq_retention_dict = {
            "write_freq": write_freq,
            "hybrid_retention": hybrid_retention,
            "oxide_retention": oxide_retention
        }

    def _import_profile_data(self, profile_path: str, simulation: bool = True,
                             sample: bool = False, cluster_path: str = None) -> None:
        """Import profiling data from CSV files for kernels, L1, and L2.

        Reads CSV files for kernel, L1, and L2 metrics, converts lifetimes,
        and attaches kernel counts based on sampling or default clustering.

        Args:
            profile_path (str): Path to the main profiling data CSV file.
            simulation (bool): Whether the data is from simulation. Defaults to True.
            sample (bool): Whether sampling was used. Defaults to False.
            cluster_path (str | None): Path to cluster CSV when sampling. Defaults to None.

        Returns:
            None: Initializes `self.kernel_df`, `self.l1_df`, `self.l2_df`, and `self.cluster_df`.
        """
        # Get the file name
        file_name = ".".join(os.path.basename(profile_path).split(".")[:-2])
        # Get the directory name
        dir_name = os.path.dirname(profile_path)

        kernel_dir = os.path.join(dir_name, file_name + ".sim.csv")
        l1_dir = os.path.join(dir_name, file_name + ".sim_l1.csv")
        l2_dir = os.path.join(dir_name, file_name + ".sim_l2.csv")

        self.kernel_df = pd.read_csv(kernel_dir)
        self.l1_df = pd.read_csv(l1_dir)
        self.l2_df = pd.read_csv(l2_dir)

        # Convert lifetime_ns to lifetime_us
        self.kernel_df = self.kernel_df.dropna()
        self.l1_df["lifetime_us"] = self.l1_df["lifetime_ns"] / 1e3
        self.l2_df["lifetime_us"] = self.l2_df["lifetime_ns"] / 1e3

        if sample:
            if not cluster_path:
                # Get the cluster data
                cluster_dir = os.path.join(dir_name, "traces", "clusters.csv")
                self.cluster_df = pd.read_csv(cluster_dir)
            else:
                # Get the cluster data
                self.cluster_df = pd.read_csv(cluster_path)
            assert self.cluster_df is not None, "Cluster data is None"
            self.kernel_df["Kernel Count"] = np.nan
            for i in range(len(self.kernel_df)):
                kernel_id = self.kernel_df["Kernel ID"][i]
                self.kernel_df.loc[i, "Kernel Count"] = \
                    self.cluster_df[self.cluster_df["Centroid Kernel ID"]
                                    == kernel_id]["Kernel Count"].values[0]
        else:
            self.kernel_df["Kernel Count"] = np.ones(len(self.kernel_df))
            # Dummy values for cluster_df
            self.cluster_df = pd.DataFrame({
                "Cluster ID": self.kernel_df["Kernel ID"],
                "Kernel Count": self.kernel_df["Kernel Count"],
                "Centroid Kernel ID": self.kernel_df["Kernel ID"],
                "Centroid Kernel Name": self.kernel_df["Mangled Names"]
            })
            print(self.cluster_df)

    def analyze_write_freq(self, percentile: int = 90) -> None:
        """Analyze write frequency for L1 and L2 caches across all kernels.

        Calculates maximum, percentile, and weighted average write frequencies based on kernel counts.

        Args:
            percentile (int): Percentile to compute (e.g., 90 for 90th percentile). Defaults to 90.

        Returns:
            None: Sets attributes `l1_write_freq`, `l2_write_freq`, `l1_total_writes`, `l2_total_writes`, and weighted lifetimes.
        """
        # TODO: weigh write frequency calculation by kernel count
        self.l1_total_writes = np.sum([
            self.kernel_df["L1 Write Count"][i] *
            self.kernel_df["Kernel Count"][i]
            for i in range(len(self.kernel_df))
        ])
        self.l2_total_writes = np.sum([
            self.kernel_df["L2 Write Count"][i] *
            self.kernel_df["Kernel Count"][i]
            for i in range(len(self.kernel_df))
        ])
        self.total_time = self.CYCLE_TIME * np.sum([
            self.kernel_df["Total Cycles"][i] *
            self.kernel_df["Kernel Count"][i]
            for i in range(len(self.kernel_df))
        ])
        self.l1_write_freq = {
            "max": self.kernel_df["L1 Write Frequency"].max(),
            "maxidx": self.kernel_df["L1 Write Frequency"].idxmax(),
            f"{percentile}%-tile": np.percentile(self.kernel_df["L1 Write Frequency"], percentile),
            "weighted": self.l1_total_writes / self.total_time * 1e3
        }
        self.l2_write_freq = {
            "max": self.kernel_df["L2 Write Frequency"].max(),
            "maxidx": self.kernel_df["L2 Write Frequency"].idxmax(),
            f"{percentile}%-tile": np.percentile(self.kernel_df["L2 Write Frequency"], percentile),
            "weighted": self.l2_total_writes / self.total_time * 1e3
        }
        # Count the number of lifetimes for each kernel
        l1_lifetimes_by_kernel = self.l1_df["kernel_id"].value_counts()
        l2_lifetimes_by_kernel = self.l2_df["kernel_id"].value_counts()
        print(f"L1 lifetimes by kernel: {l1_lifetimes_by_kernel}")
        print(f"L2 lifetimes by kernel: {l2_lifetimes_by_kernel}")
        # Multiply by kernel count
        # Match the entries in l1_lifetimes_by_kernel with cluster_df
        # Matching entries have the same value in l1_lifetimes_by_kernel and cluster_df["Centroid Kernel ID"]
        # Multiply by kernel count
        for kernel_id in l1_lifetimes_by_kernel.index:
            kernel_count = self.cluster_df[self.cluster_df["Centroid Kernel ID"]
                                           == kernel_id]["Kernel Count"].values[0]
            l1_lifetimes_by_kernel[kernel_id] *= kernel_count
        for kernel_id in l2_lifetimes_by_kernel.index:
            kernel_count = self.cluster_df[self.cluster_df["Centroid Kernel ID"]
                                           == kernel_id]["Kernel Count"].values[0]
            l2_lifetimes_by_kernel[kernel_id] *= kernel_count
        self.l1_lifetimes_weighted = np.sum(l1_lifetimes_by_kernel)
        self.l2_lifetimes_weighted = np.sum(l2_lifetimes_by_kernel)
        # Print the results
        print(f"L1 total writes: {self.l1_total_writes}")
        print(f"L2 total writes: {self.l2_total_writes}")
        print(f"L1 write frequency: {self.l1_write_freq}")
        print(f"L2 write frequency: {self.l2_write_freq}")

    def analyze_retention(self, percentile: int = 100) -> None:
        """Analyze retention times for different gain cell technologies.

        Determines retention times (in microseconds) for 5nm/16nm silicon, Hybrid, and Oxide
        based on computed write frequencies.

        Args:
            percentile (int): Percentile for selecting write frequency metric. Defaults to 100 (max).

        Returns:
            None: Populates `l1_gc_retention` and `l2_gc_retention` dictionaries.
        """
        if percentile >= 100:
            write_freq_key = "max"
        else:
            write_freq_key = f"{percentile}%-tile"

        # Find the entries of hybrid_retention and oxide_retention with write_freq
        # greater than the L1 and L2 write frequencies
        if self.l1_write_freq[write_freq_key] > self.freq_retention_dict["write_freq"][-1]:
            print("Warning: L1 write frequency is greater than the maximum write frequency in the retention dictionary.")
            l1_write_freq_index = len(
                self.freq_retention_dict["write_freq"]) - 1
        else:
            l1_write_freq_index = np.where(
                self.freq_retention_dict["write_freq"] >= self.l1_write_freq[write_freq_key])[0][0]

        if self.l2_write_freq[write_freq_key] > self.freq_retention_dict["write_freq"][-1]:
            print("Warning: L2 write frequency is greater than the maximum write frequency in the retention dictionary.")
            l2_write_freq_index = len(
                self.freq_retention_dict["write_freq"]) - 1
        else:
            l2_write_freq_index = np.where(
                self.freq_retention_dict["write_freq"] >= self.l2_write_freq[write_freq_key])[0][0]

        self.l1_gc_retention = {
            "5nm Silicon": self.silicon_ret_5nm,
            "16nm Silicon": self.silicon_ret_16nm,
            "Hybrid": self.freq_retention_dict["hybrid_retention"][l1_write_freq_index],
            "Oxide": self.freq_retention_dict["oxide_retention"][l1_write_freq_index],
        }
        self.l2_gc_retention = {
            "5nm Silicon": self.silicon_ret_5nm,
            "16nm Silicon": self.silicon_ret_16nm,
            "Hybrid": self.freq_retention_dict["hybrid_retention"][l2_write_freq_index],
            "Oxide": self.freq_retention_dict["oxide_retention"][l2_write_freq_index],
        }
        # TODO: What command line outputs should be printed?

    def analyze_refresh(self) -> None:
        """Analyze refresh requirements for gain cell technologies.

        Computes the total number of refresh operations needed for each device
        across all kernels for L1 and L2 caches.

        Args:
            None

        Returns:
            None: Sets `l1_refreshes` and `l2_refreshes` numpy arrays.
        """
        self.l1_refreshes = np.zeros(len(self.l1_gc_retention))
        self.l2_refreshes = np.zeros(len(self.l2_gc_retention))

        for l1_key, l2_key, i in zip(self.l1_gc_retention.keys(), self.l2_gc_retention.keys(), range(len(self.l1_refreshes))):
            # i is the index of the gain cell device under consideration
            # iterate through all kernels as recorded in the kernel_df
            for j in range(len(self.kernel_df)):
                # j is the index of the kernel under consideration
                # get all the L1 lifetime values for the kernel
                l1_lifetime = self.l1_df[self.l1_df["kernel_id"] ==
                                         self.kernel_df["Kernel ID"][j]]["lifetime_us"].to_numpy()
                # get all the L2 lifetime values for the kernel
                l2_lifetime = self.l2_df[self.l2_df["kernel_id"] ==
                                         self.kernel_df["Kernel ID"][j]]["lifetime_us"].to_numpy()
                # get the L1 refreshes for the kernel
                l1_refreshes = np.sum(
                    np.floor(l1_lifetime / self.l1_gc_retention[l1_key]))
                # get the L2 refreshes for the kernel
                l2_refreshes = np.sum(
                    np.floor(l2_lifetime / self.l2_gc_retention[l2_key]))
                # update the L1 and L2 refreshes for the gain cell device
                self.l1_refreshes[i] += l1_refreshes * \
                    self.kernel_df["Kernel Count"][j] * self.SM_COUNT
                self.l2_refreshes[i] += l2_refreshes * \
                    self.kernel_df["Kernel Count"][j]

        # print the results
        print(
            f"L1 refreshes for each gain cell device for total of {self.l1_lifetimes_weighted} lifetimes:")
        for cell, refreshes in zip(self.l1_gc_retention.keys(), self.l1_refreshes):
            print(
                f"\t{cell}: {refreshes}, or {refreshes / self.SM_COUNT / self.l1_lifetimes_weighted:.2%} of total")
        print(
            f"L2 refreshes for each gain cell device for total writes of {self.l2_lifetimes_weighted} lifetimes:")
        for cell, refreshes in zip(self.l2_gc_retention.keys(), self.l2_refreshes):
            print(
                f"\t{cell}: {refreshes}, or {refreshes / self.l2_lifetimes_weighted:.2%} of total")

    def analyze_area(self, block_size: int = 32, area_efficiency: float = 0.6) -> None:
        """Analyze area requirements for different gain cell technologies.

        Computes cache area based on unique addresses, block size, and device area factors.

        Args:
            block_size (int): Block size in bytes. Defaults to 32.
            area_efficiency (float): Area efficiency factor. Defaults to 0.6.

        Returns:
            None: Sets `l1_area` and `l2_area` numpy arrays.
        """
        # Get the number of unique addresses for L1 and L2 caches for each kernel
        l1_unique_addresses = self.kernel_df["L1 Unique Addresses"].to_numpy()
        l2_unique_addresses = self.kernel_df["L2 Unique Addresses"].to_numpy()

        # Number of bits for L1 and L2 caches
        l1_bits = np.max(l1_unique_addresses) * block_size * 8
        l2_bits = np.max(l2_unique_addresses) * block_size * 8
        # Round to the nearest power of 2
        l1_rounded = (2 ** np.ceil(np.log2(l1_bits)))
        l2_rounded = (2 ** np.ceil(np.log2(l2_bits)))
        # Print out the results in kilobytes
        print(f"L1 cache size: {l1_rounded / 1024 / 8:.1f} KB")
        print(f"L2 cache size: {l2_rounded / 1024 / 8:.1f} KB")

        # Calculate the area for each gain cell device
        self.l1_area = np.zeros(len(self.gain_cell_size))
        self.l2_area = np.zeros(len(self.gain_cell_size))

        for i, cell in enumerate(self.gain_cell_size.keys()):
            self.l1_area[i] = l1_rounded * self.gain_cell_size[cell]
            self.l2_area[i] = l2_rounded * self.gain_cell_size[cell]

        # Print out the results in um^2
        print(f"L1 cache area for each gain cell device:")
        for cell, area in zip(self.gain_cell_size.keys(), self.l1_area):
            print(f"\t{cell}: {area:.2f} um^2")
        print(f"L2 cache area for each gain cell device:")
        for cell, area in zip(self.gain_cell_size.keys(), self.l2_area):
            print(f"\t{cell}: {area:.2f} um^2")

    def analyze_energy(self, block_size: int = 32) -> None:
        """Analyze energy requirements for different gain cell technologies.

        Calculates energy (uJ) for SRAM, silicon, and hybrid designs including refresh overheads.

        Args:
            block_size (int): Block size in bytes. Defaults to 32.

        Returns:
            None: Sets `l1_power` and `l2_power` dictionaries.
        """
        # Get the total number of reads and writes for L1 and L2 caches
        l1_total_reads = np.sum([
            self.kernel_df["L1 Read Count"][i] *
            self.kernel_df["Kernel Count"][i]
            for i in range(len(self.kernel_df))
        ])
        l2_total_reads = np.sum([
            self.kernel_df["L2 Read Count"][i] *
            self.kernel_df["Kernel Count"][i]
            for i in range(len(self.kernel_df))
        ])
        l1_total_writes = np.sum([
            self.kernel_df["L1 Write Count"][i] *
            self.kernel_df["Kernel Count"][i]
            for i in range(len(self.kernel_df))
        ])
        l2_total_writes = np.sum([
            self.kernel_df["L2 Write Count"][i] *
            self.kernel_df["Kernel Count"][i]
            for i in range(len(self.kernel_df))
        ])

        # Get the number of refreshes needed for L1 and L2 caches
        l1_refreshes_dict, l2_refreshes_dict = {}, {}
        for cell, refreshes in zip(self.l1_gc_retention.keys(), self.l1_refreshes):
            l1_refreshes_dict[cell] = refreshes
        for cell, refreshes in zip(self.l2_gc_retention.keys(), self.l2_refreshes):
            l2_refreshes_dict[cell] = refreshes

        self.l1_power = {
            "sram": 0.0,
            "silicon": 0.0,
            "hybrid": 0.0,
        }

        self.l2_power = {
            "sram": 0.0,
            "silicon": 0.0,
            "hybrid": 0.0,
        }

        # Calculate SRAM power
        self.l1_power["sram"] = (l1_total_reads + l1_total_writes) * \
            self.gain_cell_power["sram"] * block_size * 8
        self.l2_power["sram"] = (l2_total_reads + l2_total_writes) * \
            self.gain_cell_power["sram"] * block_size * 8

        # Calculate silicon power
        self.l1_power["silicon"] = \
            (l1_total_reads + l1_total_writes +
                2 * l1_refreshes_dict["5nm Silicon"]) * \
            self.gain_cell_power["silicon"] * block_size * 8
        self.l2_power["silicon"] = \
            (l2_total_reads + l2_total_writes +
                2 * l2_refreshes_dict["5nm Silicon"]) * \
            self.gain_cell_power["silicon"] * block_size * 8

        # Calculate hybrid power
        self.l1_power["hybrid"] = \
            (l1_total_reads + l1_total_writes +
                2 * l1_refreshes_dict["Hybrid"]) * \
            self.gain_cell_power["hybrid"] * block_size * 8
        self.l2_power["hybrid"] = \
            (l2_total_reads + l2_total_writes +
                2 * l2_refreshes_dict["Hybrid"]) * \
            self.gain_cell_power["hybrid"] * block_size * 8

        # Print out the results in uJ
        print(f"L1 cache energy for each gain cell device:")
        for cell, power in self.l1_power.items():
            print(f"\t{cell}: {power:.2f} uJ")
        print(f"L2 cache energy for each gain cell device:")
        for cell, power in self.l2_power.items():
            print(f"\t{cell}: {power:.2f} uJ")

    def __dict__(self) -> dict:
        """Convert analysis results into a JSON-serializable dictionary.

        Gathers computed metrics for write frequency, retention, refresh, area, and energy.

        Args:
            None

        Returns:
            dict: Dictionary containing all analysis results ready for JSON serialization.
        """
        l1_refreshes_dict, l2_refreshes_dict = {}, {}
        for cell, refreshes in zip(self.l1_gc_retention.keys(), self.l1_refreshes):
            l1_refreshes_dict[cell] = refreshes
        for cell, refreshes in zip(self.l2_gc_retention.keys(), self.l2_refreshes):
            l2_refreshes_dict[cell] = refreshes
        l1_area_dict, l2_area_dict = {}, {}
        for cell, area in zip(self.gain_cell_size.keys(), self.l1_area):
            l1_area_dict[cell] = area
        for cell, area in zip(self.gain_cell_size.keys(), self.l2_area):
            l2_area_dict[cell] = area
        result = {
            "Name": self.workload_name,
            "Date": self.date,
            "Time": self.time,
            "L1 Lifetime Count": self.l1_lifetimes_weighted,
            "L2 Lifetime Count": self.l2_lifetimes_weighted,
            "L1 Write Frequency": self.l1_write_freq,
            "L2 Write Frequency": self.l2_write_freq,
            "L1 Refreshes": l1_refreshes_dict,
            "L2 Refreshes": l2_refreshes_dict,
            "L1 Area": l1_area_dict,
            "L2 Area": l2_area_dict,
            "L1 Energy": self.l1_power,
            "L2 Energy": self.l2_power,
        }
        return _convert_to_json_serializable(result)

    def run(self) -> dict:
        """Run the full analysis pipeline.

        Executes methods for write frequency, retention, refresh, area, and energy analysis sequentially.

        Args:
            None

        Returns:
            dict: JSON-serializable dictionary of complete analysis results.
        """
        self.analyze_write_freq()
        self.analyze_retention(percentile=90)
        self.analyze_refresh()
        self.analyze_area()
        self.analyze_energy()
        return self.__dict__()

__dict__()

Convert analysis results into a JSON-serializable dictionary.

Gathers computed metrics for write frequency, retention, refresh, area, and energy.

Returns:

Name Type Description
dict dict

Dictionary containing all analysis results ready for JSON serialization.

Source code in frontend/gain_cell_frontend.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def __dict__(self) -> dict:
    """Convert analysis results into a JSON-serializable dictionary.

    Gathers computed metrics for write frequency, retention, refresh, area, and energy.

    Args:
        None

    Returns:
        dict: Dictionary containing all analysis results ready for JSON serialization.
    """
    l1_refreshes_dict, l2_refreshes_dict = {}, {}
    for cell, refreshes in zip(self.l1_gc_retention.keys(), self.l1_refreshes):
        l1_refreshes_dict[cell] = refreshes
    for cell, refreshes in zip(self.l2_gc_retention.keys(), self.l2_refreshes):
        l2_refreshes_dict[cell] = refreshes
    l1_area_dict, l2_area_dict = {}, {}
    for cell, area in zip(self.gain_cell_size.keys(), self.l1_area):
        l1_area_dict[cell] = area
    for cell, area in zip(self.gain_cell_size.keys(), self.l2_area):
        l2_area_dict[cell] = area
    result = {
        "Name": self.workload_name,
        "Date": self.date,
        "Time": self.time,
        "L1 Lifetime Count": self.l1_lifetimes_weighted,
        "L2 Lifetime Count": self.l2_lifetimes_weighted,
        "L1 Write Frequency": self.l1_write_freq,
        "L2 Write Frequency": self.l2_write_freq,
        "L1 Refreshes": l1_refreshes_dict,
        "L2 Refreshes": l2_refreshes_dict,
        "L1 Area": l1_area_dict,
        "L2 Area": l2_area_dict,
        "L1 Energy": self.l1_power,
        "L2 Energy": self.l2_power,
    }
    return _convert_to_json_serializable(result)

__init__(profile_results_path, simulation=True, sample=False, cluster_path=None, freq_retention_dict_path='simple_gc_list.json', area_power_dict_path='area_power.json')

Initialize the GainCellFrontend with profiling data.

Sets up constants, loads dictionaries, and imports profile data for analysis.

Parameters:

Name Type Description Default
profile_results_path str

Path to the profiling results CSV file.

required
simulation bool

Indicates if data is from simulation. Defaults to True.

True
sample bool

Indicates if sampling was used. Defaults to False.

False
cluster_path str | None

Path to cluster CSV when sampling. Defaults to None.

None
freq_retention_dict_path str

Path to frequency-retention JSON dict. Defaults to "simple_gc_list.json".

'simple_gc_list.json'
area_power_dict_path str

Path to area-power JSON dict. Defaults to "area_power.json".

'area_power.json'

Returns:

Type Description
None

None

Source code in frontend/gain_cell_frontend.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def __init__(self,
             profile_results_path: str,
             simulation: bool = True,
             sample: bool = False,
             cluster_path: str = None,
             freq_retention_dict_path: str = "simple_gc_list.json",
             area_power_dict_path: str = "area_power.json") -> None:
    """Initialize the GainCellFrontend with profiling data.

    Sets up constants, loads dictionaries, and imports profile data for analysis.

    Args:
        profile_results_path (str): Path to the profiling results CSV file.
        simulation (bool): Indicates if data is from simulation. Defaults to True.
        sample (bool): Indicates if sampling was used. Defaults to False.
        cluster_path (str | None): Path to cluster CSV when sampling. Defaults to None.
        freq_retention_dict_path (str): Path to frequency-retention JSON dict. Defaults to "simple_gc_list.json".
        area_power_dict_path (str): Path to area-power JSON dict. Defaults to "area_power.json".

    Returns:
        None
    """
    # Constants
    # Lovelace GPU has frequency of 2235
    self.GPU_FREQ = int(os.getenv('GPU_FREQ', 2235))
    # Time in ns for one cycle
    self.CYCLE_TIME = 1e9 / (self.GPU_FREQ * 1e6)

    # profile_results_path is in the form of
    # logs/generate/generate_2025-03-03_18-20-21.sim.csv
    # Get the workload name from the path
    basename = os.path.basename(profile_results_path)

    try:
        basename_split = basename.split(".")
        basename_sans_ext = ".".join(basename_split[:-2])
        basename_parts = basename_sans_ext.split("_")
        self.time = basename_parts[-1]
        self.date = basename_parts[-2]
        self.workload_name = "_".join(basename_parts[:-2])
    except:
        print(
            f"Warning: Exception occurred while parsing the workload name from {basename}.")
        self.workload_name = basename
        # Get the current date and time
        now = datetime.now()
        self.date = now.strftime("%Y-%m-%d")
        self.time = now.strftime("%H-%M-%S")

    # Read in the gain cell dictionary
    self._import_gc_dict(freq_retention_dict_path)
    self.silicon_ret_16nm = 77  # from Giterman
    self.silicon_ret_5nm = 1  # from Shuhan
    self.SM_COUNT = 114 # number of SMs in the H100 GPU

    # Read in the area and power dictionary
    self.gain_cell_size = {}
    self.gain_cell_power = {}
    with open(area_power_dict_path, "r") as f:
        area_power_list = list(json.load(f).items())
        self.gain_cell_size = {
            i[0]: i[1].get("area", 0) for i in area_power_list}
        self.gain_cell_power = {
            i[0]: i[1].get("power", 0) for i in area_power_list}

    # Initialize profile result dataframes
    self.kernel_df = None
    self.l1_df = None
    self.l2_df = None
    self.cluster_df = None
    self.sample = sample
    # Read in the profile data
    self._import_profile_data(
        profile_path=profile_results_path,
        simulation=simulation,
        sample=self.sample,
        cluster_path=cluster_path
    )

    # Assert that if cluster is True, simulation must also be True
    assert not (simulation == False and sample == True), \
        "If cluster is True, simulation must also be True"

analyze_area(block_size=32, area_efficiency=0.6)

Analyze area requirements for different gain cell technologies.

Computes cache area based on unique addresses, block size, and device area factors.

Parameters:

Name Type Description Default
block_size int

Block size in bytes. Defaults to 32.

32
area_efficiency float

Area efficiency factor. Defaults to 0.6.

0.6

Returns:

Name Type Description
None None

Sets l1_area and l2_area numpy arrays.

Source code in frontend/gain_cell_frontend.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def analyze_area(self, block_size: int = 32, area_efficiency: float = 0.6) -> None:
    """Analyze area requirements for different gain cell technologies.

    Computes cache area based on unique addresses, block size, and device area factors.

    Args:
        block_size (int): Block size in bytes. Defaults to 32.
        area_efficiency (float): Area efficiency factor. Defaults to 0.6.

    Returns:
        None: Sets `l1_area` and `l2_area` numpy arrays.
    """
    # Get the number of unique addresses for L1 and L2 caches for each kernel
    l1_unique_addresses = self.kernel_df["L1 Unique Addresses"].to_numpy()
    l2_unique_addresses = self.kernel_df["L2 Unique Addresses"].to_numpy()

    # Number of bits for L1 and L2 caches
    l1_bits = np.max(l1_unique_addresses) * block_size * 8
    l2_bits = np.max(l2_unique_addresses) * block_size * 8
    # Round to the nearest power of 2
    l1_rounded = (2 ** np.ceil(np.log2(l1_bits)))
    l2_rounded = (2 ** np.ceil(np.log2(l2_bits)))
    # Print out the results in kilobytes
    print(f"L1 cache size: {l1_rounded / 1024 / 8:.1f} KB")
    print(f"L2 cache size: {l2_rounded / 1024 / 8:.1f} KB")

    # Calculate the area for each gain cell device
    self.l1_area = np.zeros(len(self.gain_cell_size))
    self.l2_area = np.zeros(len(self.gain_cell_size))

    for i, cell in enumerate(self.gain_cell_size.keys()):
        self.l1_area[i] = l1_rounded * self.gain_cell_size[cell]
        self.l2_area[i] = l2_rounded * self.gain_cell_size[cell]

    # Print out the results in um^2
    print(f"L1 cache area for each gain cell device:")
    for cell, area in zip(self.gain_cell_size.keys(), self.l1_area):
        print(f"\t{cell}: {area:.2f} um^2")
    print(f"L2 cache area for each gain cell device:")
    for cell, area in zip(self.gain_cell_size.keys(), self.l2_area):
        print(f"\t{cell}: {area:.2f} um^2")

analyze_energy(block_size=32)

Analyze energy requirements for different gain cell technologies.

Calculates energy (uJ) for SRAM, silicon, and hybrid designs including refresh overheads.

Parameters:

Name Type Description Default
block_size int

Block size in bytes. Defaults to 32.

32

Returns:

Name Type Description
None None

Sets l1_power and l2_power dictionaries.

Source code in frontend/gain_cell_frontend.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def analyze_energy(self, block_size: int = 32) -> None:
    """Analyze energy requirements for different gain cell technologies.

    Calculates energy (uJ) for SRAM, silicon, and hybrid designs including refresh overheads.

    Args:
        block_size (int): Block size in bytes. Defaults to 32.

    Returns:
        None: Sets `l1_power` and `l2_power` dictionaries.
    """
    # Get the total number of reads and writes for L1 and L2 caches
    l1_total_reads = np.sum([
        self.kernel_df["L1 Read Count"][i] *
        self.kernel_df["Kernel Count"][i]
        for i in range(len(self.kernel_df))
    ])
    l2_total_reads = np.sum([
        self.kernel_df["L2 Read Count"][i] *
        self.kernel_df["Kernel Count"][i]
        for i in range(len(self.kernel_df))
    ])
    l1_total_writes = np.sum([
        self.kernel_df["L1 Write Count"][i] *
        self.kernel_df["Kernel Count"][i]
        for i in range(len(self.kernel_df))
    ])
    l2_total_writes = np.sum([
        self.kernel_df["L2 Write Count"][i] *
        self.kernel_df["Kernel Count"][i]
        for i in range(len(self.kernel_df))
    ])

    # Get the number of refreshes needed for L1 and L2 caches
    l1_refreshes_dict, l2_refreshes_dict = {}, {}
    for cell, refreshes in zip(self.l1_gc_retention.keys(), self.l1_refreshes):
        l1_refreshes_dict[cell] = refreshes
    for cell, refreshes in zip(self.l2_gc_retention.keys(), self.l2_refreshes):
        l2_refreshes_dict[cell] = refreshes

    self.l1_power = {
        "sram": 0.0,
        "silicon": 0.0,
        "hybrid": 0.0,
    }

    self.l2_power = {
        "sram": 0.0,
        "silicon": 0.0,
        "hybrid": 0.0,
    }

    # Calculate SRAM power
    self.l1_power["sram"] = (l1_total_reads + l1_total_writes) * \
        self.gain_cell_power["sram"] * block_size * 8
    self.l2_power["sram"] = (l2_total_reads + l2_total_writes) * \
        self.gain_cell_power["sram"] * block_size * 8

    # Calculate silicon power
    self.l1_power["silicon"] = \
        (l1_total_reads + l1_total_writes +
            2 * l1_refreshes_dict["5nm Silicon"]) * \
        self.gain_cell_power["silicon"] * block_size * 8
    self.l2_power["silicon"] = \
        (l2_total_reads + l2_total_writes +
            2 * l2_refreshes_dict["5nm Silicon"]) * \
        self.gain_cell_power["silicon"] * block_size * 8

    # Calculate hybrid power
    self.l1_power["hybrid"] = \
        (l1_total_reads + l1_total_writes +
            2 * l1_refreshes_dict["Hybrid"]) * \
        self.gain_cell_power["hybrid"] * block_size * 8
    self.l2_power["hybrid"] = \
        (l2_total_reads + l2_total_writes +
            2 * l2_refreshes_dict["Hybrid"]) * \
        self.gain_cell_power["hybrid"] * block_size * 8

    # Print out the results in uJ
    print(f"L1 cache energy for each gain cell device:")
    for cell, power in self.l1_power.items():
        print(f"\t{cell}: {power:.2f} uJ")
    print(f"L2 cache energy for each gain cell device:")
    for cell, power in self.l2_power.items():
        print(f"\t{cell}: {power:.2f} uJ")

analyze_refresh()

Analyze refresh requirements for gain cell technologies.

Computes the total number of refresh operations needed for each device across all kernels for L1 and L2 caches.

Returns:

Name Type Description
None None

Sets l1_refreshes and l2_refreshes numpy arrays.

Source code in frontend/gain_cell_frontend.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def analyze_refresh(self) -> None:
    """Analyze refresh requirements for gain cell technologies.

    Computes the total number of refresh operations needed for each device
    across all kernels for L1 and L2 caches.

    Args:
        None

    Returns:
        None: Sets `l1_refreshes` and `l2_refreshes` numpy arrays.
    """
    self.l1_refreshes = np.zeros(len(self.l1_gc_retention))
    self.l2_refreshes = np.zeros(len(self.l2_gc_retention))

    for l1_key, l2_key, i in zip(self.l1_gc_retention.keys(), self.l2_gc_retention.keys(), range(len(self.l1_refreshes))):
        # i is the index of the gain cell device under consideration
        # iterate through all kernels as recorded in the kernel_df
        for j in range(len(self.kernel_df)):
            # j is the index of the kernel under consideration
            # get all the L1 lifetime values for the kernel
            l1_lifetime = self.l1_df[self.l1_df["kernel_id"] ==
                                     self.kernel_df["Kernel ID"][j]]["lifetime_us"].to_numpy()
            # get all the L2 lifetime values for the kernel
            l2_lifetime = self.l2_df[self.l2_df["kernel_id"] ==
                                     self.kernel_df["Kernel ID"][j]]["lifetime_us"].to_numpy()
            # get the L1 refreshes for the kernel
            l1_refreshes = np.sum(
                np.floor(l1_lifetime / self.l1_gc_retention[l1_key]))
            # get the L2 refreshes for the kernel
            l2_refreshes = np.sum(
                np.floor(l2_lifetime / self.l2_gc_retention[l2_key]))
            # update the L1 and L2 refreshes for the gain cell device
            self.l1_refreshes[i] += l1_refreshes * \
                self.kernel_df["Kernel Count"][j] * self.SM_COUNT
            self.l2_refreshes[i] += l2_refreshes * \
                self.kernel_df["Kernel Count"][j]

    # print the results
    print(
        f"L1 refreshes for each gain cell device for total of {self.l1_lifetimes_weighted} lifetimes:")
    for cell, refreshes in zip(self.l1_gc_retention.keys(), self.l1_refreshes):
        print(
            f"\t{cell}: {refreshes}, or {refreshes / self.SM_COUNT / self.l1_lifetimes_weighted:.2%} of total")
    print(
        f"L2 refreshes for each gain cell device for total writes of {self.l2_lifetimes_weighted} lifetimes:")
    for cell, refreshes in zip(self.l2_gc_retention.keys(), self.l2_refreshes):
        print(
            f"\t{cell}: {refreshes}, or {refreshes / self.l2_lifetimes_weighted:.2%} of total")

analyze_retention(percentile=100)

Analyze retention times for different gain cell technologies.

Determines retention times (in microseconds) for 5nm/16nm silicon, Hybrid, and Oxide based on computed write frequencies.

Parameters:

Name Type Description Default
percentile int

Percentile for selecting write frequency metric. Defaults to 100 (max).

100

Returns:

Name Type Description
None None

Populates l1_gc_retention and l2_gc_retention dictionaries.

Source code in frontend/gain_cell_frontend.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def analyze_retention(self, percentile: int = 100) -> None:
    """Analyze retention times for different gain cell technologies.

    Determines retention times (in microseconds) for 5nm/16nm silicon, Hybrid, and Oxide
    based on computed write frequencies.

    Args:
        percentile (int): Percentile for selecting write frequency metric. Defaults to 100 (max).

    Returns:
        None: Populates `l1_gc_retention` and `l2_gc_retention` dictionaries.
    """
    if percentile >= 100:
        write_freq_key = "max"
    else:
        write_freq_key = f"{percentile}%-tile"

    # Find the entries of hybrid_retention and oxide_retention with write_freq
    # greater than the L1 and L2 write frequencies
    if self.l1_write_freq[write_freq_key] > self.freq_retention_dict["write_freq"][-1]:
        print("Warning: L1 write frequency is greater than the maximum write frequency in the retention dictionary.")
        l1_write_freq_index = len(
            self.freq_retention_dict["write_freq"]) - 1
    else:
        l1_write_freq_index = np.where(
            self.freq_retention_dict["write_freq"] >= self.l1_write_freq[write_freq_key])[0][0]

    if self.l2_write_freq[write_freq_key] > self.freq_retention_dict["write_freq"][-1]:
        print("Warning: L2 write frequency is greater than the maximum write frequency in the retention dictionary.")
        l2_write_freq_index = len(
            self.freq_retention_dict["write_freq"]) - 1
    else:
        l2_write_freq_index = np.where(
            self.freq_retention_dict["write_freq"] >= self.l2_write_freq[write_freq_key])[0][0]

    self.l1_gc_retention = {
        "5nm Silicon": self.silicon_ret_5nm,
        "16nm Silicon": self.silicon_ret_16nm,
        "Hybrid": self.freq_retention_dict["hybrid_retention"][l1_write_freq_index],
        "Oxide": self.freq_retention_dict["oxide_retention"][l1_write_freq_index],
    }
    self.l2_gc_retention = {
        "5nm Silicon": self.silicon_ret_5nm,
        "16nm Silicon": self.silicon_ret_16nm,
        "Hybrid": self.freq_retention_dict["hybrid_retention"][l2_write_freq_index],
        "Oxide": self.freq_retention_dict["oxide_retention"][l2_write_freq_index],
    }

analyze_write_freq(percentile=90)

Analyze write frequency for L1 and L2 caches across all kernels.

Calculates maximum, percentile, and weighted average write frequencies based on kernel counts.

Parameters:

Name Type Description Default
percentile int

Percentile to compute (e.g., 90 for 90th percentile). Defaults to 90.

90

Returns:

Name Type Description
None None

Sets attributes l1_write_freq, l2_write_freq, l1_total_writes, l2_total_writes, and weighted lifetimes.

Source code in frontend/gain_cell_frontend.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def analyze_write_freq(self, percentile: int = 90) -> None:
    """Analyze write frequency for L1 and L2 caches across all kernels.

    Calculates maximum, percentile, and weighted average write frequencies based on kernel counts.

    Args:
        percentile (int): Percentile to compute (e.g., 90 for 90th percentile). Defaults to 90.

    Returns:
        None: Sets attributes `l1_write_freq`, `l2_write_freq`, `l1_total_writes`, `l2_total_writes`, and weighted lifetimes.
    """
    # TODO: weigh write frequency calculation by kernel count
    self.l1_total_writes = np.sum([
        self.kernel_df["L1 Write Count"][i] *
        self.kernel_df["Kernel Count"][i]
        for i in range(len(self.kernel_df))
    ])
    self.l2_total_writes = np.sum([
        self.kernel_df["L2 Write Count"][i] *
        self.kernel_df["Kernel Count"][i]
        for i in range(len(self.kernel_df))
    ])
    self.total_time = self.CYCLE_TIME * np.sum([
        self.kernel_df["Total Cycles"][i] *
        self.kernel_df["Kernel Count"][i]
        for i in range(len(self.kernel_df))
    ])
    self.l1_write_freq = {
        "max": self.kernel_df["L1 Write Frequency"].max(),
        "maxidx": self.kernel_df["L1 Write Frequency"].idxmax(),
        f"{percentile}%-tile": np.percentile(self.kernel_df["L1 Write Frequency"], percentile),
        "weighted": self.l1_total_writes / self.total_time * 1e3
    }
    self.l2_write_freq = {
        "max": self.kernel_df["L2 Write Frequency"].max(),
        "maxidx": self.kernel_df["L2 Write Frequency"].idxmax(),
        f"{percentile}%-tile": np.percentile(self.kernel_df["L2 Write Frequency"], percentile),
        "weighted": self.l2_total_writes / self.total_time * 1e3
    }
    # Count the number of lifetimes for each kernel
    l1_lifetimes_by_kernel = self.l1_df["kernel_id"].value_counts()
    l2_lifetimes_by_kernel = self.l2_df["kernel_id"].value_counts()
    print(f"L1 lifetimes by kernel: {l1_lifetimes_by_kernel}")
    print(f"L2 lifetimes by kernel: {l2_lifetimes_by_kernel}")
    # Multiply by kernel count
    # Match the entries in l1_lifetimes_by_kernel with cluster_df
    # Matching entries have the same value in l1_lifetimes_by_kernel and cluster_df["Centroid Kernel ID"]
    # Multiply by kernel count
    for kernel_id in l1_lifetimes_by_kernel.index:
        kernel_count = self.cluster_df[self.cluster_df["Centroid Kernel ID"]
                                       == kernel_id]["Kernel Count"].values[0]
        l1_lifetimes_by_kernel[kernel_id] *= kernel_count
    for kernel_id in l2_lifetimes_by_kernel.index:
        kernel_count = self.cluster_df[self.cluster_df["Centroid Kernel ID"]
                                       == kernel_id]["Kernel Count"].values[0]
        l2_lifetimes_by_kernel[kernel_id] *= kernel_count
    self.l1_lifetimes_weighted = np.sum(l1_lifetimes_by_kernel)
    self.l2_lifetimes_weighted = np.sum(l2_lifetimes_by_kernel)
    # Print the results
    print(f"L1 total writes: {self.l1_total_writes}")
    print(f"L2 total writes: {self.l2_total_writes}")
    print(f"L1 write frequency: {self.l1_write_freq}")
    print(f"L2 write frequency: {self.l2_write_freq}")

run()

Run the full analysis pipeline.

Executes methods for write frequency, retention, refresh, area, and energy analysis sequentially.

Returns:

Name Type Description
dict dict

JSON-serializable dictionary of complete analysis results.

Source code in frontend/gain_cell_frontend.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
def run(self) -> dict:
    """Run the full analysis pipeline.

    Executes methods for write frequency, retention, refresh, area, and energy analysis sequentially.

    Args:
        None

    Returns:
        dict: JSON-serializable dictionary of complete analysis results.
    """
    self.analyze_write_freq()
    self.analyze_retention(percentile=90)
    self.analyze_refresh()
    self.analyze_area()
    self.analyze_energy()
    return self.__dict__()

Frontend Script for SCALE-Sim Backend

convert_to_json_serializable(obj)

Convert NumPy types to native Python types for JSON serialization.

Handles numpy integers, floats, arrays, dicts, and lists by converting them to native Python integers, floats, lists, and dicts.

Parameters:

Name Type Description Default
obj object

The object to convert (list, dict, numpy types).

required

Returns:

Name Type Description
object object

JSON-serializable version of the input object.

Source code in frontend/scale_sim_frontend.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def convert_to_json_serializable(obj: object) -> object:
    """Convert NumPy types to native Python types for JSON serialization.

    Handles numpy integers, floats, arrays, dicts, and lists by converting them
    to native Python integers, floats, lists, and dicts.

    Args:
        obj (object): The object to convert (list, dict, numpy types).

    Returns:
        object: JSON-serializable version of the input object.
    """
    if isinstance(obj, (np.integer, np.int64)):
        return int(obj)
    elif isinstance(obj, (np.floating, np.float64)):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, dict):
        return {k: convert_to_json_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_to_json_serializable(i) for i in obj]
    else:
        return obj

get_area_power(area_power_dict_path=None)

Load area and power dictionaries from JSON file.

Parameters:

Name Type Description Default
area_power_dict_path str

Path to area-power JSON dict. Defaults to None.

None

Returns:

Name Type Description
tuple tuple

Two dictionaries (gain_cell_size, gain_cell_power) mapping device names to area and power values.

Source code in frontend/scale_sim_frontend.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def get_area_power(area_power_dict_path: str = None) -> tuple:
    """Load area and power dictionaries from JSON file.

    Args:
        area_power_dict_path (str): Path to area-power JSON dict. Defaults to None.

    Returns:
        tuple: Two dictionaries (gain_cell_size, gain_cell_power) mapping device names to area and power values.
    """
    if area_power_dict_path is None:
        area_power_dict_path = os.path.join(
            os.path.dirname(__file__), "area_power.json")
    with open(area_power_dict_path, "r") as f:
        area_power_list = list(json.load(f).items())
        gain_cell_size = {
            i[0]: i[1].get("area", 0) for i in area_power_list}
        gain_cell_power = {
            i[0]: i[1].get("power", 0) for i in area_power_list}
        return gain_cell_size, gain_cell_power

get_freq_retention(gc_list_path=None)

Load and convert gain cell frequency retention data.

Reads JSON gain cell list and converts to NumPy arrays for write frequencies and retention times.

Parameters:

Name Type Description Default
gc_list_path str

Path to gain cell frequency retention JSON. Defaults to None.

None

Returns:

Name Type Description
dict dict

Dictionary with keys 'write_freq', 'hybrid_retention', 'oxide_retention' containing numpy arrays.

Source code in frontend/scale_sim_frontend.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def get_freq_retention(gc_list_path: str = None) -> dict:
    """Load and convert gain cell frequency retention data.

    Reads JSON gain cell list and converts to NumPy arrays for write frequencies and retention times.

    Args:
        gc_list_path (str): Path to gain cell frequency retention JSON. Defaults to None.

    Returns:
        dict: Dictionary with keys 'write_freq', 'hybrid_retention', 'oxide_retention' containing numpy arrays.
    """
    if gc_list_path is None:
        gc_list_path = os.path.join(
            os.path.dirname(__file__), "simple_gc_list.json")

    freq_retention_dict = None
    with open(gc_list_path, "r") as f:
        freq_retention_dict = json.load(f)

    write_freq = [freq_retention_dict[i]["write_freq"]
                  for i in range(len(freq_retention_dict))]
    hybrid_retention = [freq_retention_dict[i]["hybrid_retention"]
                        for i in range(len(freq_retention_dict))]
    oxide_retention = [freq_retention_dict[i]["oxide_retention"]
                       for i in range(len(freq_retention_dict))]

    # Convert to numpy arrays
    write_freq = np.array(write_freq)
    hybrid_retention = np.array(hybrid_retention)
    oxide_retention = np.array(oxide_retention)

    # Convert write_freq to MHz
    write_freq = write_freq / 1e6
    # Convert retention time to microseconds
    hybrid_retention = hybrid_retention * 1e6
    oxide_retention = oxide_retention * 1e6

    # Convert to Dict
    return {
        "write_freq": write_freq,
        "hybrid_retention": hybrid_retention,
        "oxide_retention": oxide_retention
    }

get_full_path(workload_name, workload_size, dataflow)

Construct full path to Scale-Sim log directory for a given workload.

Parameters:

Name Type Description Default
workload_name str

Name of the workload.

required
workload_size str

Size identifier of the workload.

required
dataflow str

Dataflow type string.

required

Returns:

Name Type Description
str str

Full filesystem path to the workload's Scale-Sim logs.

Source code in frontend/scale_sim_frontend.py
87
88
89
90
91
92
93
94
95
96
97
98
def get_full_path(workload_name: str, workload_size: str, dataflow: str) -> str:
    """Construct full path to Scale-Sim log directory for a given workload.

    Args:
        workload_name (str): Name of the workload.
        workload_size (str): Size identifier of the workload.
        dataflow (str): Dataflow type string.

    Returns:
        str: Full filesystem path to the workload's Scale-Sim logs.
    """
    return os.path.join(scale_sim_log_dir, f"{workload_name}_{workload_size}_{dataflow}")

process_workload(full_path)

Process a Scale-Sim workload directory and generate frontend JSON summary.

Loads aggregate and detail CSVs for each layer, computes write frequencies, refresh counts, area, and energy for different gain cell devices, and dumps the results to a JSON file.

Parameters:

Name Type Description Default
full_path str

Filesystem path to the workload logs directory.

required

Returns:

Name Type Description
dict dict

JSON-serializable dictionary summarizing the workload results.

Source code in frontend/scale_sim_frontend.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def process_workload(full_path: str) -> dict:
    """Process a Scale-Sim workload directory and generate frontend JSON summary.

    Loads aggregate and detail CSVs for each layer, computes write frequencies,
    refresh counts, area, and energy for different gain cell devices, and dumps
    the results to a JSON file.

    Args:
        full_path (str): Filesystem path to the workload logs directory.

    Returns:
        dict: JSON-serializable dictionary summarizing the workload results.
    """
    freq_retention_dict = get_freq_retention()
    gain_cell_size, gain_cell_power = get_area_power()

    # Cell 2: import data
    workload_name = os.path.basename(full_path)
    workload_name_split = workload_name.split("_")
    dataflow = workload_name_split[-1]
    workload_size = workload_name_split[-2]
    workload_name = "_".join(workload_name_split[:-2])
    print(f"Processing {workload_name} with size {workload_size} and dataflow {dataflow}")
    layers = [d for d in os.listdir(full_path) if os.path.isdir(
        os.path.join(full_path, d))]
    layers.sort()

    print(f"Loading data from {len(layers)} layers")
    aggregate_data = {
        layer: pd.read_csv(os.path.join(
            full_path, layer, f"{layer}_aggregate_data.csv"))
        for layer in layers
    }
    detail_data = {
        layer: pd.read_csv(os.path.join(full_path, layer, f"{layer}_lifetime_data.csv"),
                           header=None,
                           names=["subdivision", "address", "lifetime_cycles"])
        for layer in layers
    }
    print(f"Loaded data from {len(layers)} layers")

    # Cell 3: process data
    # Add the layer name to both aggregate and detail dataframes as a new column
    # TODO: frquency adjustments, right now assumed to be 1GHz
    # All lifetimes assumed to be in nanoseconds
    # All read and write frequencies assumed to be in MHz

    # This new column will be the first column in the dataframe
    for layer in layers:
        # Drop the rows in detail_data where the lifetime is not positive
        detail_data[layer] = detail_data[layer][detail_data[layer]['lifetime_cycles'] > 0]
        # Add the layer name as a new column
        aggregate_data[layer]['layer'] = layer
        detail_data[layer]['layer'] = layer
        # Concatenate the layer column with the subdivision column and call it 'kernel_id'
        aggregate_data[layer]['kernel_id'] = aggregate_data[layer]['layer'] + \
            "_" + aggregate_data[layer]['subdivision']
        detail_data[layer]['kernel_id'] = detail_data[layer]['layer'] + \
            "_" + detail_data[layer]['subdivision']
        # Move the 'kernel_id' column to the first position
        # and the 'layer' column to the second position
        aggregate_data[layer] = \
            aggregate_data[layer][['kernel_id', 'layer'] +
            [col for col in aggregate_data[layer].columns \
                if col not in ['kernel_id', 'layer']]]
        detail_data[layer] = \
            detail_data[layer][['kernel_id', 'layer'] +
            [col for col in detail_data[layer].columns \
                if col not in ['kernel_id', 'layer']]]
        detail_data[layer]['lifetime_ns'] = detail_data[layer]['lifetime_cycles']
    print(f"Processed data from {len(layers)} layers")

    # Cell 4
    # Concatenate all aggregate dataframes into one
    # Put layer_name as the first column
    aggregate_data_combined = pd.concat(aggregate_data.values(), ignore_index=True)
    detail_data_combined = pd.concat(detail_data.values(), ignore_index=True)
    # Drop NaN values in any column
    aggregate_data_combined = aggregate_data_combined.dropna(how='any')
    detail_data_combined = detail_data_combined.dropna(how='any')
    # Reset Indices
    aggregate_data_combined = aggregate_data_combined.reset_index(drop=True)
    detail_data_combined = detail_data_combined.reset_index(drop=True)
    aggregate_data_csv_path = os.path.join(
        scale_sim_output_dir, f"{workload_name}_{workload_size}.sim_{dataflow}_aggregate.csv")
    aggregate_data_combined.to_csv(aggregate_data_csv_path, index=False)
    # if len(detail_data_combined) < 200000000:
    # detail_data_combined.to_csv(os.path.join(
    #     scale_sim_output_dir, f"{workload_name}_{workload_size}.sim_{dataflow}_detail.csv"), index=False)
    import dask.dataframe as dd
    # Convert to dask dataframe
    # Only keep the rows with indices divisible by 20
    detail_data_combined = detail_data_combined[detail_data_combined.index % 20 == 0]
    dask_df = dd.from_pandas(detail_data_combined, npartitions=os.cpu_count())
    # Write to CSV in parallel
    dask_df.to_csv(os.path.join(
        scale_sim_output_dir, f"{workload_name}_{workload_size}.sim_{dataflow}_detail_*.csv"),
        index=False)
    # else:
    #     print("Warning: detail data is too large to write to CSV")
    print(f"Written concatenated data to {scale_sim_output_dir}")

    # Cell 7
    # write_freq_max = aggregate_data_combined["write freq"].max()
    write_freq_max_dict = {
        "all": aggregate_data_combined["write freq"].max(),
        "ifmap": aggregate_data_combined[aggregate_data_combined["subdivision"] == "ifmap"]["write freq"].max(),
        "ofmap": aggregate_data_combined[aggregate_data_combined["subdivision"] == "ofmap"]["write freq"].max(),
        "filter": aggregate_data_combined[aggregate_data_combined["subdivision"] == "filter"]["write freq"].max(),
    }
    refresh_dict = {
        "all": {},
        "ifmap": {},
        "ofmap": {},
        "filter": {},
    }
    for key, value in write_freq_max_dict.items():
        if value > freq_retention_dict["write_freq"].max():
            write_freq_index = len(freq_retention_dict["write_freq"]) - 1
        else:
            write_freq_index = np.where(
                freq_retention_dict["write_freq"] >= value)[0][0]
    # if write_freq_max > freq_retention_dict["write_freq"].max():
    #     write_freq_index = len(freq_retention_dict["write_freq"]) - 1
    # else:
    #     write_freq_index = np.where(
    #         freq_retention_dict["write_freq"] >= write_freq_max)[0][0]
        gc_retention = {
            "silicon": 1,
            "hybrid": freq_retention_dict["hybrid_retention"][write_freq_index],
            "oxide": freq_retention_dict["oxide_retention"][write_freq_index],
        }

        # Cell 8
        # analyze refresh
        refreshes = {
            "silicon": 0,
            "hybrid": 0,
            "oxide": 0,
        }
        for device, i in zip(gc_retention.keys(), range(len(gc_retention))):
            if key == "all":
                lifetime = detail_data_combined["lifetime_cycles"].to_numpy() / 1e3
            else:
                lifetime = detail_data_combined[detail_data_combined["subdivision"] == key]["lifetime_cycles"].to_numpy() / 1e3
            refresh = np.sum(np.floor(lifetime / gc_retention[device]))
            # print(
                # f"Device: {device}, Lifetime: {lifetime}, Refresh: {refresh}")
            refreshes[device] += refresh
        refresh_dict[key] = refreshes
    print(f"Finished calculating refreshes")

    # Cell 9
    # analyze area  
    try:
        unique_addresses = aggregate_data_combined["unique addresses"].to_numpy()
        bit_size = np.max(unique_addresses) * 8
        bit_size_dict = {
            "all": np.max(aggregate_data_combined["unique addresses"].to_numpy()) * 8,
            "ifmap": np.max(aggregate_data_combined[aggregate_data_combined["subdivision"] == "ifmap"]["unique addresses"].to_numpy()) * 8,
            "ofmap": np.max(aggregate_data_combined[aggregate_data_combined["subdivision"] == "ofmap"]["unique addresses"].to_numpy()) * 8,
            "filter": np.max(aggregate_data_combined[aggregate_data_combined["subdivision"] == "filter"]["unique addresses"].to_numpy()) * 8,
        }
        for key, value in bit_size_dict.items():
            value_rounded = np.ceil(2 ** np.ceil(np.log2(value)))
            bit_size_dict[key] = value_rounded
        area = np.zeros(len(gain_cell_size))
        area_dict = {
            "all": area,
            "ifmap": area,
            "ofmap": area,
            "filter": area,
        }

        for key, value in bit_size_dict.items():
            for i, cell in enumerate(gain_cell_size.keys()):
                area_dict[key][i] = gain_cell_size[cell] * value

        area_keys = ["sram", "silicon", "hybrid",
                     "planar_oxide", "stacked_oxide"]
        area = {key: area[i] for key, i in zip(area_keys, range(len(area_keys)))}

        area_dict = {
            subdivision: {
                key: area_dict[subdivision][i]
                for key, i in zip(area_keys, range(len(area_keys)))
            }
            for subdivision in area_dict.keys()
        }
        print("Finished calculating area")
    except Exception as e:
        print(f"Error calculating area: {e}")
        area = None

    # Cell 10
    try:
        # total_writes = np.sum(
        #     aggregate_data_combined["num writes"].to_numpy())
        # total_reads = np.sum(
        #     aggregate_data_combined["num reads"].to_numpy())
        write_dict = {
            "all": np.sum(aggregate_data_combined["num writes"].to_numpy()),
            "ifmap": np.sum(aggregate_data_combined[aggregate_data_combined["subdivision"] == "ifmap"]["num writes"].to_numpy()),
            "ofmap": np.sum(aggregate_data_combined[aggregate_data_combined["subdivision"] == "ofmap"]["num writes"].to_numpy()),
            "filter": np.sum(aggregate_data_combined[aggregate_data_combined["subdivision"] == "filter"]["num writes"].to_numpy()),
        }
        read_dict = {
            "all": np.sum(aggregate_data_combined["num reads"].to_numpy()),
            "ifmap": np.sum(aggregate_data_combined[aggregate_data_combined["subdivision"] == "ifmap"]["num reads"].to_numpy()),
            "ofmap": np.sum(aggregate_data_combined[aggregate_data_combined["subdivision"] == "ofmap"]["num reads"].to_numpy()),
            "filter": np.sum(aggregate_data_combined[aggregate_data_combined["subdivision"] == "filter"]["num reads"].to_numpy()),
        }

        energy_dict = {
            "all": {},
            "ifmap": {},
            "ofmap": {},
            "filter": {},
        }

        for key, total_writes, total_reads in zip(write_dict.keys(), write_dict.values(), read_dict.values()):
            energy = {}
            energy["sram"] = (total_writes + total_reads) * gain_cell_power["sram"] * 8
            energy["silicon"] = \
                (total_writes + total_reads + refreshes["silicon"]) * \
                gain_cell_power["silicon"] * 8
            energy["hybrid"] = \
                (total_writes + total_reads + refreshes["hybrid"]) * \
                gain_cell_power["hybrid"] * 8
            energy_dict[key] = energy

        print("Finished calculating energy")
    except Exception as e:
        print(f"Error calculating energy: {e}")
        energy = None

    result = convert_to_json_serializable({
        "Name": "_".join([workload_name, workload_size, dataflow]),
        "Specification": {
            "Workload": workload_name,
            "Size": workload_size,
            "Dataflow": dataflow,
        },
        "Lifetime Count": detail_data_combined["lifetime_cycles"].count(),
        "Write Frequency": write_freq_max_dict,
        "Refresh Count": refresh_dict,
        "Area": area_dict,
        "Energy": energy_dict,
    })

    # Dump to JSON
    output_path = os.path.join(
        scale_sim_output_dir, f"{workload_name}_{workload_size}.frontend_{dataflow}.json")
    with open(output_path, "w") as f:
        json.dump(result, f, indent=4)
    print(f"Output written to {output_path}")
    return result