Skip to content

slurm-usage-history

slurm-usage-history: Dashboard for to display the usage history of a Slurm scheduler.

app

account_formatter

AccountFormatter

Configurable formatter for cluster account names.

Source code in src/slurm_usage_history/app/account_formatter.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class AccountFormatter:
    """Configurable formatter for cluster account names."""

    def __init__(self, max_segments=3, separator="-"):
        self.max_segments = max_segments
        self.separator = separator
        self.enabled = True

    def format_account(self, account):
        """Format account name by keeping first N segments."""
        if not self.enabled or not isinstance(account, str):
            return account

        # Clear cache if needed
        self._format_account_cached.cache_clear()

        return self._format_account_cached(account)

    @lru_cache(maxsize=1000)
    def _format_account_cached(self, account):
        """Cached implementation of account formatting."""
        if self.max_segments == 0:  # Keep full account name
            return account

        segments = account.split(self.separator)
        return self.separator.join(segments[: self.max_segments])
format_account(account)

Format account name by keeping first N segments.

Source code in src/slurm_usage_history/app/account_formatter.py
13
14
15
16
17
18
19
20
21
def format_account(self, account):
    """Format account name by keeping first N segments."""
    if not self.enabled or not isinstance(account, str):
        return account

    # Clear cache if needed
    self._format_account_cached.cache_clear()

    return self._format_account_cached(account)

app

create_dash_app(args, server=True, url_base_pathname='/')

Create a Dash app that visualizes data from the specified Parquet files.

Source code in src/slurm_usage_history/app/app.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def create_dash_app(args, server=True, url_base_pathname="/"):
    """
    Create a Dash app that visualizes data from the specified Parquet files.
    """

    load_dotenv()

    # Initialize the Dash app
    app = dash.Dash(
        __name__,
        external_stylesheets=[
            dbc.themes.BOOTSTRAP,
            '/assets/custom-styles.css'
    ],
        assets_folder='src/slurm_usage_history/assets',
        server=server,
        url_base_pathname=url_base_pathname,
    )

    cache = diskcache.Cache("./cache")
    DiskcacheManager(cache)

    # Create DataStore instance
    datastore = DataStore(directory=args.data_path)
    datastore.load_data()
    datastore.start_auto_refresh(interval=60)

    # Layout of the app
    app.layout = layout
    #add_callbacks(app, datastore, cache, background_callback_manager)
    add_callbacks(app, datastore, None, None)

    app.title = "Slurm Usage History Dashboard"

    server = app.server
    server.secret_key = os.getenv('FLASK_SECRET_KEY')

    return app

datastore

PandasDataStore

DataStore implementation using Pandas with enhanced filtering capabilities.

Implemented as a Singleton to ensure only one instance exists throughout the application lifecycle.

Source code in src/slurm_usage_history/app/datastore.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
class PandasDataStore(metaclass=Singleton):
    """DataStore implementation using Pandas with enhanced filtering capabilities.

    Implemented as a Singleton to ensure only one instance exists throughout
    the application lifecycle.
    """

    def __init__(
        self,
        directory: str | Path | None = None,
        auto_refresh_interval: int = 600,
        account_formatter: Any | None = None
    ):
        """Initialize the PandasDataStore.

        Args:
            directory: Path to the data directory. Defaults to current working directory if None.
            auto_refresh_interval: Refresh interval in seconds. Defaults to 600 seconds (10 minutes).
            account_formatter: Formatter for account names. Defaults to None.
        """
        self.directory = Path(directory).expanduser() if directory else Path.cwd()
        self.hosts: dict[str, dict[str, Any]] = {}
        self.auto_refresh_interval = auto_refresh_interval
        self._refresh_thread: threading.Thread | None = None
        self._stop_refresh_flag: threading.Event = threading.Event()
        self._file_timestamps: dict[str, dict[Path, float]] = {}

        # Import the account formatter if not provided
        if account_formatter is None:
            try:
                from .account_formatter import formatter as default_formatter
                self.account_formatter = default_formatter
            except ImportError:
                self.account_formatter = None
        else:
            self.account_formatter = account_formatter

        self._initialize_hosts()

    def _initialize_hosts(self) -> None:
        """Populate the hosts dictionary with subdirectories.

        Scans the specified directory for subdirectories and initializes
        data structures for each detected host.
        """
        for entry in self.directory.iterdir():
            if entry.is_dir():
                self.hosts[entry.name] = {
                    "max_date": None,
                    "min_date": None,
                    "data": None,
                    "partitions": None,
                    "accounts": None,
                    "users": None,
                    "qos": None,
                    "states": None,
                }

    def get_hostnames(self) -> list[str]:
        """Retrieve the list of hostnames.

        Returns:
            List of available host names found in the data directory.
        """
        return list(self.hosts.keys())

    def get_min_max_dates(self, hostname: str) -> tuple[str | None, str | None]:
        """Get minimum and maximum dates for the specified hostname.

        Args:
            hostname: The cluster hostname.

        Returns:
            A tuple containing (min_date, max_date) for the specified hostname.
        """
        min_date = self.hosts[hostname]["min_date"]
        max_date = self.hosts[hostname]["max_date"]
        return min_date, max_date

    def get_partitions(self, hostname: str) -> list[str]:
        """Get available partitions for the specified hostname.

        Args:
            hostname: The cluster hostname.

        Returns:
            List of available partitions for the specified hostname.
        """
        return self.hosts[hostname]["partitions"] or []

    def get_accounts(self, hostname: str) -> list[str]:
        """Get available accounts for the specified hostname.

        Args:
            hostname: The cluster hostname.

        Returns:
            List of available accounts for the specified hostname.
        """
        return self.hosts[hostname]["accounts"] or []

    def get_users(self, hostname: str) -> list[str]:
        """Get available users for the specified hostname.

        Args:
            hostname: The cluster hostname.

        Returns:
            List of available users for the specified hostname.
        """
        return self.hosts[hostname]["users"] or []

    def get_qos(self, hostname: str) -> list[str]:
        """Get available QOS options for the specified hostname.

        Args:
            hostname: The cluster hostname.

        Returns:
            List of available QOS options for the specified hostname.
        """
        return self.hosts[hostname]["qos"] or []

    def get_states(self, hostname: str) -> list[str]:
        """Get available states for the specified hostname.

        Args:
            hostname: The cluster hostname.

        Returns:
            List of available states for the specified hostname.
        """
        return self.hosts[hostname]["states"] or []

    def start_auto_refresh(self, interval: int | None = None) -> None:
        """Start the background thread for automatic data refresh.

        Args:
            interval: Optional refresh interval in seconds. If provided, overrides
                     the interval set during initialization.

        Raises:
            ValueError: If the provided interval is not a positive integer.
        """
        if interval is not None:
            if not isinstance(interval, int) or interval <= 0:
                msg = "Refresh interval must be a positive integer"
                raise ValueError(msg)
            self.auto_refresh_interval = interval

        if self._refresh_thread is not None and self._refresh_thread.is_alive():
            print("Auto-refresh is already running")
            return

        self._stop_refresh_flag.clear()
        self._refresh_thread = threading.Thread(
            target=self._auto_refresh_worker,
            daemon=True,
            name="DataStore-AutoRefresh"
        )
        self._refresh_thread.start()
        print(f"Started auto-refresh thread (every {self.auto_refresh_interval} seconds)")

    def stop_auto_refresh(self) -> None:
        """Stop the background thread for automatic data refresh.

        Signals the auto-refresh thread to stop and waits for its termination.
        """
        if self._refresh_thread is None or not self._refresh_thread.is_alive():
            print("No auto-refresh thread is running")
            return

        print("Stopping auto-refresh thread...")
        self._stop_refresh_flag.set()
        self._refresh_thread.join(timeout=5.0)
        if self._refresh_thread.is_alive():
            print("Warning: Auto-refresh thread did not terminate gracefully")
        else:
            print("Auto-refresh thread stopped successfully")

    def _auto_refresh_worker(self) -> None:
        """Worker method for the auto-refresh thread.

        Periodically checks for updates in the data and reloads if necessary.
        Runs in a background thread until signaled to stop.
        """
        while not self._stop_refresh_flag.is_set():
            try:
                updated = self.check_for_updates()
                if updated:
                    print(f"Auto-refresh: Data was updated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
                else:
                    print(f"Auto-refresh: No updates found at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            except Exception as e:
                print(f"Error during auto-refresh: {e!s}")

            # Sleep for the specified interval, but check periodically if we should stop
            check_interval = 2
            for _ in range(self.auto_refresh_interval // check_interval):
                if self._stop_refresh_flag.is_set():
                    break
                time.sleep(check_interval)

    def set_refresh_interval(self, interval: int) -> bool:
        """Change the auto-refresh interval.

        Args:
            interval: New refresh interval in seconds.

        Returns:
            True if the interval was updated and auto-refresh is running,
            False if auto-refresh is not running.

        Raises:
            ValueError: If the interval is not a positive integer.
        """
        if not isinstance(interval, int) or interval <= 0:
            msg = "Refresh interval must be a positive integer"
            raise ValueError(msg)

        self.auto_refresh_interval = interval
        print(f"Auto-refresh interval set to {interval} seconds")

        # Return status based on whether auto-refresh is running
        return self._refresh_thread is not None and self._refresh_thread.is_alive()

    @timeit
    def load_data(self) -> None:
        """Load all data files into the hosts dictionary.

        Iterates through all hostnames and loads their respective data.
        Performance is measured using the timeit decorator.
        """
        for hostname in self.get_hostnames():
            print(f"Loading data for {hostname}...")
            self._load_host_data(hostname)

    def _load_host_data(self, hostname: str) -> None:
        """Load data for a specific hostname and update metadata.

        Args:
            hostname: The hostname to load data for.

        Processes the raw data, applies transformations, and updates metadata
        for the specified hostname.
        """
        raw_data = self._load_raw_data(hostname)
        transformed_data = self._transform_data(raw_data)
        self.hosts[hostname]["data"] = transformed_data

        # Store metadata
        self.hosts[hostname]["min_date"] = raw_data["Submit"].dt.date.min().isoformat()
        self.hosts[hostname]["max_date"] = raw_data["Submit"].dt.date.max().isoformat()

        # Store unique values for filtering
        for col, key in [
            ("Partition", "partitions"),
            ("Account", "accounts"),
            ("User", "users"),
            ("QOS", "qos"),
            ("State", "states")
        ]:
            if col in transformed_data.columns:
                self.hosts[hostname][key] = transformed_data[col].sort_values().unique().tolist()
            else:
                self.hosts[hostname][key] = []

        # Store file timestamps for future change detection
        host_dir = self.directory / hostname / "weekly-data"
        self._file_timestamps[hostname] = {}
        for file_path in host_dir.glob("*.parquet"):
            self._file_timestamps[hostname][file_path] = file_path.stat().st_mtime

    def _load_raw_data(self, hostname: str) -> pd.DataFrame:
        """Load all Parquet files in the directory for a specific hostname.

        Args:
            hostname: The hostname to load data for.

        Returns:
            DataFrame containing the concatenated data from all Parquet files.

        Raises:
            FileNotFoundError: If the directory or Parquet files are not found.
        """
        host_dir = self.directory / hostname / "weekly-data"
        if not host_dir.exists() or not host_dir.is_dir():
            msg = f"Directory not found for hostname: {hostname}"
            raise FileNotFoundError(msg)

        parquet_files = list(host_dir.glob("*.parquet"))
        if not parquet_files:
            msg = f"No Parquet files found in directory: {host_dir}"
            raise FileNotFoundError(msg)

        return pd.concat([pd.read_parquet(file) for file in parquet_files], ignore_index=True)

    @timeit
    def _transform_data(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        """Apply necessary transformations to the raw data.

        Args:
            raw_data: The raw DataFrame to transform.

        Returns:
            Transformed DataFrame with standardized formats.

        Handles column renaming and data type conversions if needed.
        The major time-based columns (SubmitYearMonth, SubmitYearWeek, etc.)
        are already present in the data.
        """
        # Ensure column existence and data types
        if "Partition" not in raw_data.columns and "Partitions" in raw_data.columns:
            raw_data["Partition"] = raw_data["Partitions"]

        # Handle multiple partitions per job (if stored as a list or string)
        if "Partition" in raw_data.columns and raw_data["Partition"].dtype == "object":
            raw_data["Partition"] = raw_data["Partition"].apply(
                lambda x: x.split(",")[0].strip() if isinstance(x, str) else x
            )

        # Extract SubmitYear for period filtering if not present
        # This is needed for the get_complete_periods method
        if "Submit" in raw_data.columns and "SubmitYear" not in raw_data.columns:
            raw_data["SubmitYear"] = raw_data["Submit"].dt.year

        # Add StartDay if not present
        if "StartDay" not in raw_data.columns and "Start" in raw_data.columns:
            raw_data["StartDay"] = raw_data["Start"].dt.normalize()
            logging.info("Added StartDay column")

        # Add SubmitDay if not present
        if "SubmitDay" not in raw_data.columns and "Submit" in raw_data.columns:
            raw_data["SubmitDay"] = raw_data["Submit"].dt.normalize()
            logging.info("Added SubmitDay column")

        return raw_data

    def check_for_updates(self) -> bool:
        """Check all hosts for new or changed files and reload if necessary.

        Returns:
            True if any host was updated, False otherwise.
        """
        updated = False

        for hostname in self.get_hostnames():
            host_updates = self._check_host_updates(hostname)
            if host_updates:
                print(f"Updates detected for host {hostname}, reloading data...")
                try:
                    self._load_host_data(hostname)
                    updated = True
                    # Clear the cache since data has changed
                    self._filter_data.cache_clear()
                except Exception as e:
                    print(f"Error reloading data for {hostname}: {e!s}")

        return updated

    def _check_host_updates(self, hostname: str) -> bool:
        """Check if files for a specific host have been updated or new files added.

        Args:
            hostname: The hostname to check for updates.

        Returns:
            True if there are updates, False otherwise.
        """
        host_dir = self.directory / hostname / "weekly-data"
        if not host_dir.exists() or not host_dir.is_dir():
            return False

        # Get current files and their timestamps
        current_files = {}
        for file_path in host_dir.glob("*.parquet"):
            current_files[file_path] = file_path.stat().st_mtime

        # If this is our first check for this hostname, store timestamps and return
        if hostname not in self._file_timestamps:
            self._file_timestamps[hostname] = current_files
            return False

        # Check for changes or new files
        old_timestamps = self._file_timestamps[hostname]

        # New files
        new_files = set(current_files.keys()) - set(old_timestamps.keys())
        if new_files:
            print(f"New files found for {hostname}: {len(new_files)} files")
            return True

        # Changed files (timestamp differs)
        changed_files = []
        for file_path, current_time in current_files.items():
            if file_path in old_timestamps and current_time != old_timestamps[file_path]:
                changed_files.append(file_path)

        if changed_files:
            print(f"Changed files found for {hostname}: {len(changed_files)} files")
            return True

        # No changes detected
        return False

    @lru_cache(maxsize=10)
    def _filter_data(
        self,
        hostname: str | None = None,
        start_date: str | None = None,
        end_date: str | None = None,
        partitions: frozenset[str] | None = None,
        accounts: frozenset[str] | None = None,
        users: frozenset[str] | None = None,
        qos: frozenset[str] | None = None,
        states: frozenset[str] | None = None,
    ) -> pd.DataFrame:
        """Filter data based on multiple criteria.

        Args:
            hostname: The cluster hostname.
            start_date: Start date filter.
            end_date: End date filter.
            partitions: Set of partitions to include.
            accounts: Set of accounts to include.
            users: Set of users to include.
            qos: Set of QOS values to include.
            states: Set of job states to include.

        Returns:
            Filtered DataFrame.

        Uses caching to improve performance for repeated similar queries.
        """
        df_filtered = self.hosts[hostname]["data"]

        if start_date:
            start_date = pd.to_datetime(start_date)
            df_filtered = df_filtered[df_filtered["Submit"] >= start_date]

        if end_date:
            end_date = pd.to_datetime(end_date)
            df_filtered = df_filtered[df_filtered["Submit"] <= end_date]

        if partitions and "Partition" in df_filtered.columns:
            df_filtered = df_filtered[df_filtered["Partition"].isin(partitions)]

        if accounts and "Account" in df_filtered.columns:
            df_filtered = df_filtered[df_filtered["Account"].isin(accounts)]

        if users and "User" in df_filtered.columns:
            df_filtered = df_filtered[df_filtered["User"].isin(users)]

        if qos and "QOS" in df_filtered.columns:
            df_filtered = df_filtered[df_filtered["QOS"].isin(qos)]

        if states and "State" in df_filtered.columns:
            df_filtered = df_filtered[df_filtered["State"].isin(states)]

        return df_filtered

    def get_complete_periods(self, hostname: str, period_type: str = "month") -> list[str]:
        """Get list of complete time periods available in the data.

        Args:
            hostname: The cluster hostname.
            period_type: Type of period ('day', 'week', 'month', 'year').

        Returns:
            List of complete periods.
        """
        if hostname not in self.hosts or self.hosts[hostname]["data"] is None:
            return []

        df = self.hosts[hostname]["data"]
        now = pd.Timestamp.now()

        if period_type == "month":
            # Get year-month periods
            current_year_month = now.strftime("%Y-%m")

            # All periods except current (incomplete) month
            periods = sorted(df["SubmitYearMonth"].unique().tolist())
            if current_year_month in periods:
                periods.remove(current_year_month)

            return periods

        if period_type == "week":
            # Get week start dates
            # Calculate current week's start date
            current_week_start = now - pd.to_timedelta(now.dayofweek, unit="D")
            current_week_start_str = current_week_start.strftime("%Y-%m-%d")

            # All periods except current (incomplete) week
            periods = sorted(df["SubmitYearWeek"].unique().tolist())
            if current_week_start_str in periods:
                periods.remove(current_week_start_str)

            return periods

        if period_type == "year":
            # Get years
            current_year = now.year

            # All years except current (incomplete) year
            years = sorted(df["SubmitYear"].unique().tolist())
            if current_year in years:
                years.remove(current_year)

            return [str(year) for year in years]

        return []

    def filter(
        self,
        hostname: str,
        start_date: str | None = None,
        end_date: str | None = None,
        partitions: list[str] | None = None,
        accounts: list[str] | None = None,
        users: list[str] | None = None,
        qos: list[str] | None = None,
        states: list[str] | None = None,
        complete_periods_only: bool = False,
        period_type: str = "month",
        format_accounts: bool = True,
        account_segments: int | None = None,
    ) -> pd.DataFrame:
        """Public method to filter data with enhanced options.

        Args:
            hostname: The cluster hostname.
            start_date: Start date filter.
            end_date: End date filter.
            partitions: List of partitions to include.
            accounts: List of accounts to include.
            users: List of users to include.
            qos: List of QOS values to include.
            states: List of job states to include.
            complete_periods_only: Whether to include only complete time periods.
            period_type: Type of period when using complete_periods_only ('day', 'week', 'month', 'year').
            format_accounts: Whether to apply account name formatting.
            account_segments: Number of segments to keep.

        Returns:
            Filtered DataFrame.
        """
        if not hostname or hostname not in self.hosts or self.hosts[hostname]["data"] is None:
            # Return empty DataFrame if no data is available
            return pd.DataFrame()

        # Start with basic filtering
        df_filtered = self._filter_data(
            hostname=hostname,
            start_date=start_date if start_date else self.hosts[hostname]["min_date"],
            end_date=end_date if end_date else self.hosts[hostname]["max_date"],
            partitions=frozenset(partitions) if partitions else None,
            accounts=frozenset(accounts) if accounts else None,
            users=frozenset(users) if users else None,
            qos=frozenset(qos) if qos else None,
            states=frozenset(states) if states else None,
        )

        # Apply complete periods filter if requested
        if complete_periods_only and not df_filtered.empty:
            now = pd.Timestamp.now()

            if period_type == "month":
                # Exclude current month
                current_year_month = now.strftime("%Y-%m")
                df_filtered = df_filtered[df_filtered["SubmitYearMonth"] != current_year_month]

            elif period_type == "week":
                # Exclude current week based on SubmitYearWeek
                current_week_start = now - pd.to_timedelta(now.dayofweek, unit="D")
                current_week_start_str = current_week_start.strftime("%Y-%m-%d")
                df_filtered = df_filtered[df_filtered["SubmitYearWeek"] != current_week_start_str]

            elif period_type == "year":
                # Exclude current year
                current_year = now.year
                df_filtered = df_filtered[df_filtered["SubmitYear"] != current_year]

        # Apply account formatting if requested
        if format_accounts and "Account" in df_filtered.columns and not df_filtered.empty:
            # Create a copy to avoid modifying the cached data
            df_filtered = df_filtered.copy()

            if self.account_formatter:
                try:
                    if account_segments is not None:
                        # Temporarily store the current setting
                        original_segments = self.account_formatter.max_segments

                        # Apply custom segments just for this filter operation
                        self.account_formatter.max_segments = account_segments
                        df_filtered["Account"] = df_filtered["Account"].apply(self.account_formatter.format_account)

                        # Restore original setting
                        self.account_formatter.max_segments = original_segments
                    else:
                        # Use current global setting
                        df_filtered["Account"] = df_filtered["Account"].apply(self.account_formatter.format_account)
                except Exception as e:
                    print(f"Error applying account formatting: {e}. Using original account names.")

        return df_filtered
__init__(directory=None, auto_refresh_interval=600, account_formatter=None)

Initialize the PandasDataStore.

Parameters:

Name Type Description Default
directory str | Path | None

Path to the data directory. Defaults to current working directory if None.

None
auto_refresh_interval int

Refresh interval in seconds. Defaults to 600 seconds (10 minutes).

600
account_formatter Any | None

Formatter for account names. Defaults to None.

None
Source code in src/slurm_usage_history/app/datastore.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self,
    directory: str | Path | None = None,
    auto_refresh_interval: int = 600,
    account_formatter: Any | None = None
):
    """Initialize the PandasDataStore.

    Args:
        directory: Path to the data directory. Defaults to current working directory if None.
        auto_refresh_interval: Refresh interval in seconds. Defaults to 600 seconds (10 minutes).
        account_formatter: Formatter for account names. Defaults to None.
    """
    self.directory = Path(directory).expanduser() if directory else Path.cwd()
    self.hosts: dict[str, dict[str, Any]] = {}
    self.auto_refresh_interval = auto_refresh_interval
    self._refresh_thread: threading.Thread | None = None
    self._stop_refresh_flag: threading.Event = threading.Event()
    self._file_timestamps: dict[str, dict[Path, float]] = {}

    # Import the account formatter if not provided
    if account_formatter is None:
        try:
            from .account_formatter import formatter as default_formatter
            self.account_formatter = default_formatter
        except ImportError:
            self.account_formatter = None
    else:
        self.account_formatter = account_formatter

    self._initialize_hosts()
check_for_updates()

Check all hosts for new or changed files and reload if necessary.

Returns:

Type Description
bool

True if any host was updated, False otherwise.

Source code in src/slurm_usage_history/app/datastore.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def check_for_updates(self) -> bool:
    """Check all hosts for new or changed files and reload if necessary.

    Returns:
        True if any host was updated, False otherwise.
    """
    updated = False

    for hostname in self.get_hostnames():
        host_updates = self._check_host_updates(hostname)
        if host_updates:
            print(f"Updates detected for host {hostname}, reloading data...")
            try:
                self._load_host_data(hostname)
                updated = True
                # Clear the cache since data has changed
                self._filter_data.cache_clear()
            except Exception as e:
                print(f"Error reloading data for {hostname}: {e!s}")

    return updated
filter(hostname, start_date=None, end_date=None, partitions=None, accounts=None, users=None, qos=None, states=None, complete_periods_only=False, period_type='month', format_accounts=True, account_segments=None)

Public method to filter data with enhanced options.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required
start_date str | None

Start date filter.

None
end_date str | None

End date filter.

None
partitions list[str] | None

List of partitions to include.

None
accounts list[str] | None

List of accounts to include.

None
users list[str] | None

List of users to include.

None
qos list[str] | None

List of QOS values to include.

None
states list[str] | None

List of job states to include.

None
complete_periods_only bool

Whether to include only complete time periods.

False
period_type str

Type of period when using complete_periods_only ('day', 'week', 'month', 'year').

'month'
format_accounts bool

Whether to apply account name formatting.

True
account_segments int | None

Number of segments to keep.

None

Returns:

Type Description
DataFrame

Filtered DataFrame.

Source code in src/slurm_usage_history/app/datastore.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
def filter(
    self,
    hostname: str,
    start_date: str | None = None,
    end_date: str | None = None,
    partitions: list[str] | None = None,
    accounts: list[str] | None = None,
    users: list[str] | None = None,
    qos: list[str] | None = None,
    states: list[str] | None = None,
    complete_periods_only: bool = False,
    period_type: str = "month",
    format_accounts: bool = True,
    account_segments: int | None = None,
) -> pd.DataFrame:
    """Public method to filter data with enhanced options.

    Args:
        hostname: The cluster hostname.
        start_date: Start date filter.
        end_date: End date filter.
        partitions: List of partitions to include.
        accounts: List of accounts to include.
        users: List of users to include.
        qos: List of QOS values to include.
        states: List of job states to include.
        complete_periods_only: Whether to include only complete time periods.
        period_type: Type of period when using complete_periods_only ('day', 'week', 'month', 'year').
        format_accounts: Whether to apply account name formatting.
        account_segments: Number of segments to keep.

    Returns:
        Filtered DataFrame.
    """
    if not hostname or hostname not in self.hosts or self.hosts[hostname]["data"] is None:
        # Return empty DataFrame if no data is available
        return pd.DataFrame()

    # Start with basic filtering
    df_filtered = self._filter_data(
        hostname=hostname,
        start_date=start_date if start_date else self.hosts[hostname]["min_date"],
        end_date=end_date if end_date else self.hosts[hostname]["max_date"],
        partitions=frozenset(partitions) if partitions else None,
        accounts=frozenset(accounts) if accounts else None,
        users=frozenset(users) if users else None,
        qos=frozenset(qos) if qos else None,
        states=frozenset(states) if states else None,
    )

    # Apply complete periods filter if requested
    if complete_periods_only and not df_filtered.empty:
        now = pd.Timestamp.now()

        if period_type == "month":
            # Exclude current month
            current_year_month = now.strftime("%Y-%m")
            df_filtered = df_filtered[df_filtered["SubmitYearMonth"] != current_year_month]

        elif period_type == "week":
            # Exclude current week based on SubmitYearWeek
            current_week_start = now - pd.to_timedelta(now.dayofweek, unit="D")
            current_week_start_str = current_week_start.strftime("%Y-%m-%d")
            df_filtered = df_filtered[df_filtered["SubmitYearWeek"] != current_week_start_str]

        elif period_type == "year":
            # Exclude current year
            current_year = now.year
            df_filtered = df_filtered[df_filtered["SubmitYear"] != current_year]

    # Apply account formatting if requested
    if format_accounts and "Account" in df_filtered.columns and not df_filtered.empty:
        # Create a copy to avoid modifying the cached data
        df_filtered = df_filtered.copy()

        if self.account_formatter:
            try:
                if account_segments is not None:
                    # Temporarily store the current setting
                    original_segments = self.account_formatter.max_segments

                    # Apply custom segments just for this filter operation
                    self.account_formatter.max_segments = account_segments
                    df_filtered["Account"] = df_filtered["Account"].apply(self.account_formatter.format_account)

                    # Restore original setting
                    self.account_formatter.max_segments = original_segments
                else:
                    # Use current global setting
                    df_filtered["Account"] = df_filtered["Account"].apply(self.account_formatter.format_account)
            except Exception as e:
                print(f"Error applying account formatting: {e}. Using original account names.")

    return df_filtered
get_accounts(hostname)

Get available accounts for the specified hostname.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required

Returns:

Type Description
list[str]

List of available accounts for the specified hostname.

Source code in src/slurm_usage_history/app/datastore.py
132
133
134
135
136
137
138
139
140
141
def get_accounts(self, hostname: str) -> list[str]:
    """Get available accounts for the specified hostname.

    Args:
        hostname: The cluster hostname.

    Returns:
        List of available accounts for the specified hostname.
    """
    return self.hosts[hostname]["accounts"] or []
get_complete_periods(hostname, period_type='month')

Get list of complete time periods available in the data.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required
period_type str

Type of period ('day', 'week', 'month', 'year').

'month'

Returns:

Type Description
list[str]

List of complete periods.

Source code in src/slurm_usage_history/app/datastore.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
def get_complete_periods(self, hostname: str, period_type: str = "month") -> list[str]:
    """Get list of complete time periods available in the data.

    Args:
        hostname: The cluster hostname.
        period_type: Type of period ('day', 'week', 'month', 'year').

    Returns:
        List of complete periods.
    """
    if hostname not in self.hosts or self.hosts[hostname]["data"] is None:
        return []

    df = self.hosts[hostname]["data"]
    now = pd.Timestamp.now()

    if period_type == "month":
        # Get year-month periods
        current_year_month = now.strftime("%Y-%m")

        # All periods except current (incomplete) month
        periods = sorted(df["SubmitYearMonth"].unique().tolist())
        if current_year_month in periods:
            periods.remove(current_year_month)

        return periods

    if period_type == "week":
        # Get week start dates
        # Calculate current week's start date
        current_week_start = now - pd.to_timedelta(now.dayofweek, unit="D")
        current_week_start_str = current_week_start.strftime("%Y-%m-%d")

        # All periods except current (incomplete) week
        periods = sorted(df["SubmitYearWeek"].unique().tolist())
        if current_week_start_str in periods:
            periods.remove(current_week_start_str)

        return periods

    if period_type == "year":
        # Get years
        current_year = now.year

        # All years except current (incomplete) year
        years = sorted(df["SubmitYear"].unique().tolist())
        if current_year in years:
            years.remove(current_year)

        return [str(year) for year in years]

    return []
get_hostnames()

Retrieve the list of hostnames.

Returns:

Type Description
list[str]

List of available host names found in the data directory.

Source code in src/slurm_usage_history/app/datastore.py
100
101
102
103
104
105
106
def get_hostnames(self) -> list[str]:
    """Retrieve the list of hostnames.

    Returns:
        List of available host names found in the data directory.
    """
    return list(self.hosts.keys())
get_min_max_dates(hostname)

Get minimum and maximum dates for the specified hostname.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required

Returns:

Type Description
tuple[str | None, str | None]

A tuple containing (min_date, max_date) for the specified hostname.

Source code in src/slurm_usage_history/app/datastore.py
108
109
110
111
112
113
114
115
116
117
118
119
def get_min_max_dates(self, hostname: str) -> tuple[str | None, str | None]:
    """Get minimum and maximum dates for the specified hostname.

    Args:
        hostname: The cluster hostname.

    Returns:
        A tuple containing (min_date, max_date) for the specified hostname.
    """
    min_date = self.hosts[hostname]["min_date"]
    max_date = self.hosts[hostname]["max_date"]
    return min_date, max_date
get_partitions(hostname)

Get available partitions for the specified hostname.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required

Returns:

Type Description
list[str]

List of available partitions for the specified hostname.

Source code in src/slurm_usage_history/app/datastore.py
121
122
123
124
125
126
127
128
129
130
def get_partitions(self, hostname: str) -> list[str]:
    """Get available partitions for the specified hostname.

    Args:
        hostname: The cluster hostname.

    Returns:
        List of available partitions for the specified hostname.
    """
    return self.hosts[hostname]["partitions"] or []
get_qos(hostname)

Get available QOS options for the specified hostname.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required

Returns:

Type Description
list[str]

List of available QOS options for the specified hostname.

Source code in src/slurm_usage_history/app/datastore.py
154
155
156
157
158
159
160
161
162
163
def get_qos(self, hostname: str) -> list[str]:
    """Get available QOS options for the specified hostname.

    Args:
        hostname: The cluster hostname.

    Returns:
        List of available QOS options for the specified hostname.
    """
    return self.hosts[hostname]["qos"] or []
get_states(hostname)

Get available states for the specified hostname.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required

Returns:

Type Description
list[str]

List of available states for the specified hostname.

Source code in src/slurm_usage_history/app/datastore.py
165
166
167
168
169
170
171
172
173
174
def get_states(self, hostname: str) -> list[str]:
    """Get available states for the specified hostname.

    Args:
        hostname: The cluster hostname.

    Returns:
        List of available states for the specified hostname.
    """
    return self.hosts[hostname]["states"] or []
get_users(hostname)

Get available users for the specified hostname.

Parameters:

Name Type Description Default
hostname str

The cluster hostname.

required

Returns:

Type Description
list[str]

List of available users for the specified hostname.

Source code in src/slurm_usage_history/app/datastore.py
143
144
145
146
147
148
149
150
151
152
def get_users(self, hostname: str) -> list[str]:
    """Get available users for the specified hostname.

    Args:
        hostname: The cluster hostname.

    Returns:
        List of available users for the specified hostname.
    """
    return self.hosts[hostname]["users"] or []
load_data()

Load all data files into the hosts dictionary.

Iterates through all hostnames and loads their respective data. Performance is measured using the timeit decorator.

Source code in src/slurm_usage_history/app/datastore.py
268
269
270
271
272
273
274
275
276
277
@timeit
def load_data(self) -> None:
    """Load all data files into the hosts dictionary.

    Iterates through all hostnames and loads their respective data.
    Performance is measured using the timeit decorator.
    """
    for hostname in self.get_hostnames():
        print(f"Loading data for {hostname}...")
        self._load_host_data(hostname)
set_refresh_interval(interval)

Change the auto-refresh interval.

Parameters:

Name Type Description Default
interval int

New refresh interval in seconds.

required

Returns:

Type Description
bool

True if the interval was updated and auto-refresh is running,

bool

False if auto-refresh is not running.

Raises:

Type Description
ValueError

If the interval is not a positive integer.

Source code in src/slurm_usage_history/app/datastore.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def set_refresh_interval(self, interval: int) -> bool:
    """Change the auto-refresh interval.

    Args:
        interval: New refresh interval in seconds.

    Returns:
        True if the interval was updated and auto-refresh is running,
        False if auto-refresh is not running.

    Raises:
        ValueError: If the interval is not a positive integer.
    """
    if not isinstance(interval, int) or interval <= 0:
        msg = "Refresh interval must be a positive integer"
        raise ValueError(msg)

    self.auto_refresh_interval = interval
    print(f"Auto-refresh interval set to {interval} seconds")

    # Return status based on whether auto-refresh is running
    return self._refresh_thread is not None and self._refresh_thread.is_alive()
start_auto_refresh(interval=None)

Start the background thread for automatic data refresh.

Parameters:

Name Type Description Default
interval int | None

Optional refresh interval in seconds. If provided, overrides the interval set during initialization.

None

Raises:

Type Description
ValueError

If the provided interval is not a positive integer.

Source code in src/slurm_usage_history/app/datastore.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def start_auto_refresh(self, interval: int | None = None) -> None:
    """Start the background thread for automatic data refresh.

    Args:
        interval: Optional refresh interval in seconds. If provided, overrides
                 the interval set during initialization.

    Raises:
        ValueError: If the provided interval is not a positive integer.
    """
    if interval is not None:
        if not isinstance(interval, int) or interval <= 0:
            msg = "Refresh interval must be a positive integer"
            raise ValueError(msg)
        self.auto_refresh_interval = interval

    if self._refresh_thread is not None and self._refresh_thread.is_alive():
        print("Auto-refresh is already running")
        return

    self._stop_refresh_flag.clear()
    self._refresh_thread = threading.Thread(
        target=self._auto_refresh_worker,
        daemon=True,
        name="DataStore-AutoRefresh"
    )
    self._refresh_thread.start()
    print(f"Started auto-refresh thread (every {self.auto_refresh_interval} seconds)")
stop_auto_refresh()

Stop the background thread for automatic data refresh.

Signals the auto-refresh thread to stop and waits for its termination.

Source code in src/slurm_usage_history/app/datastore.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def stop_auto_refresh(self) -> None:
    """Stop the background thread for automatic data refresh.

    Signals the auto-refresh thread to stop and waits for its termination.
    """
    if self._refresh_thread is None or not self._refresh_thread.is_alive():
        print("No auto-refresh thread is running")
        return

    print("Stopping auto-refresh thread...")
    self._stop_refresh_flag.set()
    self._refresh_thread.join(timeout=5.0)
    if self._refresh_thread.is_alive():
        print("Warning: Auto-refresh thread did not terminate gracefully")
    else:
        print("Auto-refresh thread stopped successfully")

Singleton

Bases: type

Metaclass to implement the Singleton pattern.

Ensures only one instance of a class exists.

Source code in src/slurm_usage_history/app/datastore.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Singleton(type):
    """Metaclass to implement the Singleton pattern.

    Ensures only one instance of a class exists.
    """
    _instances: dict[type, Any] = {}
    _lock: threading.Lock = threading.Lock()

    def __call__(cls, *args: Any, **kwargs: Any) -> Any:
        """Override the call method to implement singleton behavior.

        Args:
            *args: Variable positional arguments to pass to the class constructor.
            **kwargs: Variable keyword arguments to pass to the class constructor.

        Returns:
            The singleton instance of the class.
        """
        with cls._lock:
            if cls not in cls._instances:
                cls._instances[cls] = super().__call__(*args, **kwargs)
            return cls._instances[cls]
__call__(*args, **kwargs)

Override the call method to implement singleton behavior.

Parameters:

Name Type Description Default
*args Any

Variable positional arguments to pass to the class constructor.

()
**kwargs Any

Variable keyword arguments to pass to the class constructor.

{}

Returns:

Type Description
Any

The singleton instance of the class.

Source code in src/slurm_usage_history/app/datastore.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __call__(cls, *args: Any, **kwargs: Any) -> Any:
    """Override the call method to implement singleton behavior.

    Args:
        *args: Variable positional arguments to pass to the class constructor.
        **kwargs: Variable keyword arguments to pass to the class constructor.

    Returns:
        The singleton instance of the class.
    """
    with cls._lock:
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

get_datastore(directory=None, auto_refresh_interval=600, account_formatter=formatter)

Get the singleton instance of PandasDataStore.

Parameters:

Name Type Description Default
directory str | Path | None

Path to the data directory (only used if this is the first call).

None
auto_refresh_interval int

Refresh interval in seconds (only used if this is the first call).

600
account_formatter Any | None

Formatter for account names. Defaults to the imported formatter.

formatter

Returns:

Type Description
PandasDataStore

The singleton instance of PandasDataStore.

Source code in src/slurm_usage_history/app/datastore.py
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
def get_datastore(
    directory: str | Path | None = None,
    auto_refresh_interval: int = 600,
    account_formatter: Any | None = formatter
) -> PandasDataStore:
    """Get the singleton instance of PandasDataStore.

    Args:
        directory: Path to the data directory (only used if this is the first call).
        auto_refresh_interval: Refresh interval in seconds (only used if this is the first call).
        account_formatter: Formatter for account names. Defaults to the imported formatter.

    Returns:
        The singleton instance of PandasDataStore.
    """
    return PandasDataStore(directory, auto_refresh_interval, account_formatter)

layout

create_account_formatter_controls()

Create UI controls for account name formatting.

Source code in src/slurm_usage_history/app/layout.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def create_account_formatter_controls():
    """Create UI controls for account name formatting."""
    return html.Div(
        [
            html.Label("Account Name Format", className="font-weight-bold mb-2"),
            dbc.RadioItems(
                id="account-format-segments",
                options=[{"label": "Full names", "value": 0}, {"label": "First segment", "value": 1}, {"label": "First two segments", "value": 2}, {"label": "First three segments", "value": 3}],
                value=formatter.max_segments,
                inline=False,
                className="mb-2",
            ),
            html.Div(
                [
                    html.Small(
                        [
                            "Example: physics-theory-quantum-project",
                            html.Br(),
                            "• Full: physics-theory-quantum-project",
                            html.Br(),
                            "• First segment: physics",
                            html.Br(),
                            "• First two: physics-theory",
                            html.Br(),
                            "• First three: physics-theory-quantum",
                        ],
                        className="text-muted",
                    )
                ]
            ),
        ],
        className="mb-3",
    )

create_filter(component, title=None)

Create a consistently styled filter with an optional title

Source code in src/slurm_usage_history/app/layout.py
65
66
67
68
69
70
71
72
def create_filter(component, title=None):
    """Create a consistently styled filter with an optional title"""
    if title:
        return html.Div(
            [html.Label(title, className="font-weight-bold mb-2"), component],
            className="mb-3",
        )
    return html.Div(component, className="mb-3")

create_section(title, children, id=None)

Create a section with a title and content

Source code in src/slurm_usage_history/app/layout.py
270
271
272
273
274
275
276
277
278
279
def create_section(title, children, id=None):
    """Create a section with a title and content"""
    return html.Div(
        [
            html.H3(title, className="mb-4", style={"color": COLORS["secondary"]}),
            html.Div(children, className="mb-4"),
        ],
        id=id,
        className="mt-4 mb-5",
    )

main

main()

Main function to run the Dash app.

Source code in src/slurm_usage_history/app/main.py
28
29
30
31
32
33
34
35
36
37
38
39
def main():
    """
    Main function to run the Dash app.
    """
    # Parse command line arguments
    args = parse_arguments()

    # Create Dash app
    app = create_dash_app(args)

    # Run the server
    app.run_server(debug=args.debug, port=args.port)

parse_arguments()

Parse command line arguments.

Source code in src/slurm_usage_history/app/main.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def parse_arguments():
    """
    Parse command line arguments.
    """
    parser = argparse.ArgumentParser(description="Run a Plotly Dash app.")
    parser.add_argument(
        "--data-path",
        default="data",
        type=str,
        help="Path to the directory containing Parquet data files.",
    )
    parser.add_argument(
        "--port", type=int, default=8050, help="Port to run the server on."
    )
    parser.add_argument(
        "--debug", action="store_true", help="Run the server in development mode."
    )
    return parser.parse_args()

node_config

NodeConfiguration

Class to handle node configuration for resources normalization.

Source code in src/slurm_usage_history/app/node_config.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class NodeConfiguration:
    """Class to handle node configuration for resources normalization."""

    def __init__(self, config_file: Optional[str] = None):
        """
        Initialize the Node Configuration.

        Args:
            config_file: Path to config file (YAML or JSON)
        """
        self.config: Dict[str, Any] = {}
        self.config_file: Optional[str] = config_file

        if config_file:
            self.load_config(config_file)
        else:
            # Look for config in default locations
            self._load_default_config()

    def _load_default_config(self) -> None:
        """Try to load configuration from default locations."""
        # Check for configs in common locations
        possible_paths: List[str] = [
            os.path.expanduser("~/.config/slurm_usage/node_config.yaml"),
            os.path.expanduser("~/.config/slurm_usage/node_config.json"),
            "/etc/slurm_usage/node_config.yaml",
            "/etc/slurm_usage/node_config.json",
            "node_config.yaml",
            "node_config.json",
        ]

        for path in possible_paths:
            if os.path.exists(path):
                self.load_config(path)
                return

    def load_config(self, config_file: str) -> bool:
        """
        Load configuration from a file.

        Args:
            config_file: Path to config file (YAML or JSON)

        Returns:
            bool: True if loaded successfully, False otherwise
        """
        try:
            path = Path(config_file)

            if not path.exists():
                print(f"Config file not found: {config_file}")
                return False

            if path.suffix.lower() in [".yaml", ".yml"]:
                with open(path) as f:
                    self.config = yaml.safe_load(f)
            elif path.suffix.lower() == ".json":
                with open(path) as f:
                    self.config = json.load(f)
            else:
                print(f"Unsupported config file format: {path.suffix}")
                return False

            self.config_file = config_file
            print(f"Loaded node configuration from {config_file}")
            return True

        except Exception as e:
            print(f"Error loading config: {e!s}")
            return False

    def get_node_cpu_count(self, node_name: str) -> int:
        """
        Get CPU count for a specific node.

        Args:
            node_name: Name of the node

        Returns:
            int: Number of CPUs for the node, or 1 if not configured
        """
        if not self.config or "nodes" not in self.config:
            return 1

        # Try exact match
        if node_name in self.config["nodes"]:
            return self.config["nodes"][node_name].get("cpus", 1)

        # Try pattern matching
        for pattern, node_config in self.config.get("node_patterns", {}).items():
            if self._match_pattern(node_name, pattern):
                return node_config.get("cpus", 1)

        # Return default
        return self.config.get("default_cpus", 1)

    def get_node_gpu_count(self, node_name: str) -> int:
        """
        Get GPU count for a specific node.

        Args:
            node_name: Name of the node

        Returns:
            int: Number of GPUs for the node, or 0 if not configured
        """
        if not self.config or "nodes" not in self.config:
            return 0

        # Try exact match
        if node_name in self.config["nodes"]:
            return self.config["nodes"][node_name].get("gpus", 0)

        # Try pattern matching
        for pattern, node_config in self.config.get("node_patterns", {}).items():
            if self._match_pattern(node_name, pattern):
                return node_config.get("gpus", 0)

        # Return default
        return self.config.get("default_gpus", 0)

    def _match_pattern(self, node_name: str, pattern: str) -> bool:
        """
        Check if a node name matches a pattern.

        Args:
            node_name: Name of the node
            pattern: Pattern to match (supports * wildcard)

        Returns:
            bool: True if node name matches pattern
        """
        # Convert pattern to regex
        regex_pattern = pattern.replace("*", ".*")
        return re.match(f"^{regex_pattern}$", node_name) is not None

    def get_all_node_resources(self, node_names: List[str]) -> Dict[str, Dict[str, int]]:
        """
        Get CPU and GPU counts for a list of nodes.

        Args:
            node_names: List of node names

        Returns:
            dict: Dictionary mapping node names to resource dictionaries
        """
        resources: Dict[str, Dict[str, int]] = {}

        for node in node_names:
            resources[node] = {
                "cpus": self.get_node_cpu_count(node),
                "gpus": self.get_node_gpu_count(node),
            }

        return resources
__init__(config_file=None)

Initialize the Node Configuration.

Parameters:

Name Type Description Default
config_file Optional[str]

Path to config file (YAML or JSON)

None
Source code in src/slurm_usage_history/app/node_config.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, config_file: Optional[str] = None):
    """
    Initialize the Node Configuration.

    Args:
        config_file: Path to config file (YAML or JSON)
    """
    self.config: Dict[str, Any] = {}
    self.config_file: Optional[str] = config_file

    if config_file:
        self.load_config(config_file)
    else:
        # Look for config in default locations
        self._load_default_config()
get_all_node_resources(node_names)

Get CPU and GPU counts for a list of nodes.

Parameters:

Name Type Description Default
node_names List[str]

List of node names

required

Returns:

Name Type Description
dict Dict[str, Dict[str, int]]

Dictionary mapping node names to resource dictionaries

Source code in src/slurm_usage_history/app/node_config.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def get_all_node_resources(self, node_names: List[str]) -> Dict[str, Dict[str, int]]:
    """
    Get CPU and GPU counts for a list of nodes.

    Args:
        node_names: List of node names

    Returns:
        dict: Dictionary mapping node names to resource dictionaries
    """
    resources: Dict[str, Dict[str, int]] = {}

    for node in node_names:
        resources[node] = {
            "cpus": self.get_node_cpu_count(node),
            "gpus": self.get_node_gpu_count(node),
        }

    return resources
get_node_cpu_count(node_name)

Get CPU count for a specific node.

Parameters:

Name Type Description Default
node_name str

Name of the node

required

Returns:

Name Type Description
int int

Number of CPUs for the node, or 1 if not configured

Source code in src/slurm_usage_history/app/node_config.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def get_node_cpu_count(self, node_name: str) -> int:
    """
    Get CPU count for a specific node.

    Args:
        node_name: Name of the node

    Returns:
        int: Number of CPUs for the node, or 1 if not configured
    """
    if not self.config or "nodes" not in self.config:
        return 1

    # Try exact match
    if node_name in self.config["nodes"]:
        return self.config["nodes"][node_name].get("cpus", 1)

    # Try pattern matching
    for pattern, node_config in self.config.get("node_patterns", {}).items():
        if self._match_pattern(node_name, pattern):
            return node_config.get("cpus", 1)

    # Return default
    return self.config.get("default_cpus", 1)
get_node_gpu_count(node_name)

Get GPU count for a specific node.

Parameters:

Name Type Description Default
node_name str

Name of the node

required

Returns:

Name Type Description
int int

Number of GPUs for the node, or 0 if not configured

Source code in src/slurm_usage_history/app/node_config.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_node_gpu_count(self, node_name: str) -> int:
    """
    Get GPU count for a specific node.

    Args:
        node_name: Name of the node

    Returns:
        int: Number of GPUs for the node, or 0 if not configured
    """
    if not self.config or "nodes" not in self.config:
        return 0

    # Try exact match
    if node_name in self.config["nodes"]:
        return self.config["nodes"][node_name].get("gpus", 0)

    # Try pattern matching
    for pattern, node_config in self.config.get("node_patterns", {}).items():
        if self._match_pattern(node_name, pattern):
            return node_config.get("gpus", 0)

    # Return default
    return self.config.get("default_gpus", 0)
load_config(config_file)

Load configuration from a file.

Parameters:

Name Type Description Default
config_file str

Path to config file (YAML or JSON)

required

Returns:

Name Type Description
bool bool

True if loaded successfully, False otherwise

Source code in src/slurm_usage_history/app/node_config.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def load_config(self, config_file: str) -> bool:
    """
    Load configuration from a file.

    Args:
        config_file: Path to config file (YAML or JSON)

    Returns:
        bool: True if loaded successfully, False otherwise
    """
    try:
        path = Path(config_file)

        if not path.exists():
            print(f"Config file not found: {config_file}")
            return False

        if path.suffix.lower() in [".yaml", ".yml"]:
            with open(path) as f:
                self.config = yaml.safe_load(f)
        elif path.suffix.lower() == ".json":
            with open(path) as f:
                self.config = json.load(f)
        else:
            print(f"Unsupported config file format: {path.suffix}")
            return False

        self.config_file = config_file
        print(f"Loaded node configuration from {config_file}")
        return True

    except Exception as e:
        print(f"Error loading config: {e!s}")
        return False

scripts

get_weekly_usage

DataExporter

Exports the fetched and formatted data.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
class DataExporter:
    """
    Exports the fetched and formatted data.
    """

    def __init__(self, data_fetcher: UsageDataFetcher, data_formatter: UsageDataFormatter):
        """
        Initialize the DataExporter.

        Args:
            data_fetcher: UsageDataFetcher instance
            data_formatter: UsageDataFormatter instance
        """
        self.data_fetcher = data_fetcher
        self.data_formatter = data_formatter

        # Try to get config if it's been initialized
        try:
            from .config import get_config
            self.config: Any = get_config()
        except RuntimeError:
            self.config = None

    def fetch_data_weekly(
        self,
        from_year: int,
        from_week: int,
        until_year: Optional[int] = None,
        until_week: Optional[int] = None,
        output_dir: str = "slurmo_weekly_data",
        overwrite: bool = False,
        verbose: bool = False,
    ) -> List[str]:
        """
        Fetch data weekly and save to Parquet files.

        Args:
            from_year: Starting year
            from_week: Starting week
            until_year: Ending year (default: from_year)
            until_week: Ending week (default: from_week)
            output_dir: Directory to save Parquet files
            overwrite: Whether to overwrite existing files
            verbose: Whether to print verbose output

        Returns:
            List of paths to saved Parquet files
        """
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)

        # Set until_year and until_week to from_year and from_week if not provided
        if until_year is None:
            until_year = from_year
        if until_week is None:
            until_week = from_week

        current_year = from_year
        current_week = from_week
        data_files: List[str] = []

        # Use tqdm for progress bar
        pbar = tqdm(
            total=((until_year - from_year) * 52) + (until_week - from_week) + 1,
            desc="Fetching Weekly Data",
            unit="week",
        )

        while (current_year < until_year) or (
            current_year == until_year and current_week <= until_week
        ):
            try:
                # Check if the data file already exists
                file_path = os.path.join(
                    output_dir, f"week_{current_year}_W{current_week:02.0f}.parquet"
                )
                if overwrite or not os.path.exists(file_path):
                    # Export usage data for the current week
                    weekly_data = self.data_fetcher.export_usage_data(
                        current_year,
                        current_week,
                        current_year,
                        current_week,
                        verbose=verbose,
                    )

                    # Format the usage data
                    formatted_data = self.data_formatter.format_usage_data(weekly_data)

                    # Save formatted data as Parquet file
                    formatted_data.to_parquet(file_path, index=False, engine="pyarrow")

                    # Get delay from config or use default
                    delay = 5
                    if self.config and "sacct" in self.config.config:
                        delay = self.config.config["sacct"].get("delay_between_calls", 5)
                    time.sleep(delay)

                # Append file path to list of saved files
                data_files.append(file_path)

                # Update progress bar
                pbar.update(1)
            except Exception as e:
                msg = f"Error fetching data for week {current_year}-W{current_week:02.0f}: {e}"
                print(msg)

            # Move to the next week
            if current_week == 52:
                current_week = 1
                current_year += 1
            else:
                current_week += 1

        pbar.close()
        return data_files
__init__(data_fetcher, data_formatter)

Initialize the DataExporter.

Parameters:

Name Type Description Default
data_fetcher UsageDataFetcher

UsageDataFetcher instance

required
data_formatter UsageDataFormatter

UsageDataFormatter instance

required
Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def __init__(self, data_fetcher: UsageDataFetcher, data_formatter: UsageDataFormatter):
    """
    Initialize the DataExporter.

    Args:
        data_fetcher: UsageDataFetcher instance
        data_formatter: UsageDataFormatter instance
    """
    self.data_fetcher = data_fetcher
    self.data_formatter = data_formatter

    # Try to get config if it's been initialized
    try:
        from .config import get_config
        self.config: Any = get_config()
    except RuntimeError:
        self.config = None
fetch_data_weekly(from_year, from_week, until_year=None, until_week=None, output_dir='slurmo_weekly_data', overwrite=False, verbose=False)

Fetch data weekly and save to Parquet files.

Parameters:

Name Type Description Default
from_year int

Starting year

required
from_week int

Starting week

required
until_year Optional[int]

Ending year (default: from_year)

None
until_week Optional[int]

Ending week (default: from_week)

None
output_dir str

Directory to save Parquet files

'slurmo_weekly_data'
overwrite bool

Whether to overwrite existing files

False
verbose bool

Whether to print verbose output

False

Returns:

Type Description
List[str]

List of paths to saved Parquet files

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def fetch_data_weekly(
    self,
    from_year: int,
    from_week: int,
    until_year: Optional[int] = None,
    until_week: Optional[int] = None,
    output_dir: str = "slurmo_weekly_data",
    overwrite: bool = False,
    verbose: bool = False,
) -> List[str]:
    """
    Fetch data weekly and save to Parquet files.

    Args:
        from_year: Starting year
        from_week: Starting week
        until_year: Ending year (default: from_year)
        until_week: Ending week (default: from_week)
        output_dir: Directory to save Parquet files
        overwrite: Whether to overwrite existing files
        verbose: Whether to print verbose output

    Returns:
        List of paths to saved Parquet files
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Set until_year and until_week to from_year and from_week if not provided
    if until_year is None:
        until_year = from_year
    if until_week is None:
        until_week = from_week

    current_year = from_year
    current_week = from_week
    data_files: List[str] = []

    # Use tqdm for progress bar
    pbar = tqdm(
        total=((until_year - from_year) * 52) + (until_week - from_week) + 1,
        desc="Fetching Weekly Data",
        unit="week",
    )

    while (current_year < until_year) or (
        current_year == until_year and current_week <= until_week
    ):
        try:
            # Check if the data file already exists
            file_path = os.path.join(
                output_dir, f"week_{current_year}_W{current_week:02.0f}.parquet"
            )
            if overwrite or not os.path.exists(file_path):
                # Export usage data for the current week
                weekly_data = self.data_fetcher.export_usage_data(
                    current_year,
                    current_week,
                    current_year,
                    current_week,
                    verbose=verbose,
                )

                # Format the usage data
                formatted_data = self.data_formatter.format_usage_data(weekly_data)

                # Save formatted data as Parquet file
                formatted_data.to_parquet(file_path, index=False, engine="pyarrow")

                # Get delay from config or use default
                delay = 5
                if self.config and "sacct" in self.config.config:
                    delay = self.config.config["sacct"].get("delay_between_calls", 5)
                time.sleep(delay)

            # Append file path to list of saved files
            data_files.append(file_path)

            # Update progress bar
            pbar.update(1)
        except Exception as e:
            msg = f"Error fetching data for week {current_year}-W{current_week:02.0f}: {e}"
            print(msg)

        # Move to the next week
        if current_week == 52:
            current_week = 1
            current_year += 1
        else:
            current_week += 1

    pbar.close()
    return data_files

UsageDataFetcher

Fetches usage data from the SLURM database.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
class UsageDataFetcher:
    """
    Fetches usage data from the SLURM database.
    """

    def __init__(self, command_executor: Optional[Callable] = None):
        """
        Initialize the UsageDataFetcher with an optional command executor.
        This allows dependency injection for better testing.

        Args:
            command_executor: Function to execute shell commands.
        """
        self.command_executor: Callable = command_executor or subprocess.run

        # Try to get config if it's been initialized
        try:
            from .config import get_config
            self.config: Any = get_config()
        except RuntimeError:
            self.config = None

    def export_usage_data(
        self, 
        from_year: int, 
        from_week: int, 
        until_year: Optional[int] = None, 
        until_week: Optional[int] = None, 
        verbose: bool = False
    ) -> pd.DataFrame:
        """
        Export usage data for the specified date range.

        Args:
            from_year: Starting year
            from_week: Starting week
            until_year: Ending year (default: from_year)
            until_week: Ending week (default: from_week)
            verbose: Whether to print verbose output

        Returns:
            DataFrame containing the combined usage data
        """
        if until_year is None:
            until_year = from_year
        if until_week is None:
            until_week = from_week

        combined_df = pd.DataFrame()

        current_year = from_year
        current_week = from_week

        while (current_year < until_year) or (
            current_year == until_year and current_week <= until_week
        ):
            # Get the start and end dates of the week
            start_date, end_date = self.get_week_dates(current_year, current_week)

            # Format the dates as strings
            sacct_start = start_date.strftime("%Y-%m-%d")
            sacct_end = (end_date + timedelta(days=1)).strftime("%Y-%m-%d")

            # Run the sacct command and get the DataFrame
            weekly_df = self.run_sacct_command(sacct_start, sacct_end, verbose=verbose)

            # Append the weekly data to the combined DataFrame
            combined_df = pd.concat([combined_df, weekly_df], ignore_index=True)

            # Increment the week
            if current_week == 52:
                current_week = 1
                current_year += 1
            else:
                current_week += 1

        return combined_df

    def run_sacct_command(
        self, 
        sacct_start: str, 
        sacct_end: str, 
        verbose: bool = False
    ) -> pd.DataFrame:
        """
        Run the sacct command and return the output as a DataFrame.

        Args:
            sacct_start: Start time for the command.
            sacct_end: End time for the command.
            verbose: Whether to print verbose output.

        Returns:
            DataFrame containing the command output.
        """
        # Get format from config if available
        format_string = "--format=JobID,User,QOS,Account,Partition,Submit,Start,End,State,Elapsed,AveDiskRead,AveDiskWrite,AveCPU,MaxRSS,AllocCPUS,TotalCPU,NodeList,AllocTRES,Cluster"
        if self.config and "sacct" in self.config.config:
            format_string = self.config.config["sacct"].get("format", format_string)

        command: List[str] = [
            "sacct",
            format_string,
            "--parsable2",
            "--allusers",
            f"--starttime={sacct_start}",
            f"--endtime={sacct_end}",
        ]

        # Add any additional options from config
        if self.config and "sacct" in self.config.config:
            for option, value in self.config.config["sacct"].get("options", {}).items():
                if value is True:
                    command.append(f"--{option}")
                elif value is not False and value is not None:
                    command.append(f"--{option}={value}")

        if verbose:
            print(" ".join(command))

        result = self.command_executor(
            command,
            stdout=subprocess.PIPE,
            text=True,
            encoding="latin-1",
            errors="ignore",
        )
        output = cast(subprocess.CompletedProcess, result).stdout

        # Filter out any lines containing "RUNNING" or "Unknown" states
        filtered_output = "\n".join(
            line
            for line in output.splitlines()
            if "RUNNING" not in line and "Unknown" not in line
        )

        df = pd.read_csv(io.StringIO(filtered_output), sep="|")
        if verbose:
            print(df.head())
        return df

    @staticmethod
    def get_week_dates(
        year: int, 
        week: int, 
        chunk_size: int = 7
    ) -> Tuple[datetime, datetime]:
        """
        Calculate the start and end dates of a specific week.

        Args:
            year: The year.
            week: The week number.
            chunk_size: Number of days in the week (default is 7).

        Returns:
            Tuple containing the start and end dates of the week.
        """
        start_date = datetime.strptime(f"{year}-W{week - 1}-1", "%Y-W%U-%w")
        end_date = start_date + timedelta(days=chunk_size - 1)
        return start_date, end_date
__init__(command_executor=None)

Initialize the UsageDataFetcher with an optional command executor. This allows dependency injection for better testing.

Parameters:

Name Type Description Default
command_executor Optional[Callable]

Function to execute shell commands.

None
Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __init__(self, command_executor: Optional[Callable] = None):
    """
    Initialize the UsageDataFetcher with an optional command executor.
    This allows dependency injection for better testing.

    Args:
        command_executor: Function to execute shell commands.
    """
    self.command_executor: Callable = command_executor or subprocess.run

    # Try to get config if it's been initialized
    try:
        from .config import get_config
        self.config: Any = get_config()
    except RuntimeError:
        self.config = None
export_usage_data(from_year, from_week, until_year=None, until_week=None, verbose=False)

Export usage data for the specified date range.

Parameters:

Name Type Description Default
from_year int

Starting year

required
from_week int

Starting week

required
until_year Optional[int]

Ending year (default: from_year)

None
until_week Optional[int]

Ending week (default: from_week)

None
verbose bool

Whether to print verbose output

False

Returns:

Type Description
DataFrame

DataFrame containing the combined usage data

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def export_usage_data(
    self, 
    from_year: int, 
    from_week: int, 
    until_year: Optional[int] = None, 
    until_week: Optional[int] = None, 
    verbose: bool = False
) -> pd.DataFrame:
    """
    Export usage data for the specified date range.

    Args:
        from_year: Starting year
        from_week: Starting week
        until_year: Ending year (default: from_year)
        until_week: Ending week (default: from_week)
        verbose: Whether to print verbose output

    Returns:
        DataFrame containing the combined usage data
    """
    if until_year is None:
        until_year = from_year
    if until_week is None:
        until_week = from_week

    combined_df = pd.DataFrame()

    current_year = from_year
    current_week = from_week

    while (current_year < until_year) or (
        current_year == until_year and current_week <= until_week
    ):
        # Get the start and end dates of the week
        start_date, end_date = self.get_week_dates(current_year, current_week)

        # Format the dates as strings
        sacct_start = start_date.strftime("%Y-%m-%d")
        sacct_end = (end_date + timedelta(days=1)).strftime("%Y-%m-%d")

        # Run the sacct command and get the DataFrame
        weekly_df = self.run_sacct_command(sacct_start, sacct_end, verbose=verbose)

        # Append the weekly data to the combined DataFrame
        combined_df = pd.concat([combined_df, weekly_df], ignore_index=True)

        # Increment the week
        if current_week == 52:
            current_week = 1
            current_year += 1
        else:
            current_week += 1

    return combined_df
get_week_dates(year, week, chunk_size=7) staticmethod

Calculate the start and end dates of a specific week.

Parameters:

Name Type Description Default
year int

The year.

required
week int

The week number.

required
chunk_size int

Number of days in the week (default is 7).

7

Returns:

Type Description
Tuple[datetime, datetime]

Tuple containing the start and end dates of the week.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@staticmethod
def get_week_dates(
    year: int, 
    week: int, 
    chunk_size: int = 7
) -> Tuple[datetime, datetime]:
    """
    Calculate the start and end dates of a specific week.

    Args:
        year: The year.
        week: The week number.
        chunk_size: Number of days in the week (default is 7).

    Returns:
        Tuple containing the start and end dates of the week.
    """
    start_date = datetime.strptime(f"{year}-W{week - 1}-1", "%Y-W%U-%w")
    end_date = start_date + timedelta(days=chunk_size - 1)
    return start_date, end_date
run_sacct_command(sacct_start, sacct_end, verbose=False)

Run the sacct command and return the output as a DataFrame.

Parameters:

Name Type Description Default
sacct_start str

Start time for the command.

required
sacct_end str

End time for the command.

required
verbose bool

Whether to print verbose output.

False

Returns:

Type Description
DataFrame

DataFrame containing the command output.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def run_sacct_command(
    self, 
    sacct_start: str, 
    sacct_end: str, 
    verbose: bool = False
) -> pd.DataFrame:
    """
    Run the sacct command and return the output as a DataFrame.

    Args:
        sacct_start: Start time for the command.
        sacct_end: End time for the command.
        verbose: Whether to print verbose output.

    Returns:
        DataFrame containing the command output.
    """
    # Get format from config if available
    format_string = "--format=JobID,User,QOS,Account,Partition,Submit,Start,End,State,Elapsed,AveDiskRead,AveDiskWrite,AveCPU,MaxRSS,AllocCPUS,TotalCPU,NodeList,AllocTRES,Cluster"
    if self.config and "sacct" in self.config.config:
        format_string = self.config.config["sacct"].get("format", format_string)

    command: List[str] = [
        "sacct",
        format_string,
        "--parsable2",
        "--allusers",
        f"--starttime={sacct_start}",
        f"--endtime={sacct_end}",
    ]

    # Add any additional options from config
    if self.config and "sacct" in self.config.config:
        for option, value in self.config.config["sacct"].get("options", {}).items():
            if value is True:
                command.append(f"--{option}")
            elif value is not False and value is not None:
                command.append(f"--{option}={value}")

    if verbose:
        print(" ".join(command))

    result = self.command_executor(
        command,
        stdout=subprocess.PIPE,
        text=True,
        encoding="latin-1",
        errors="ignore",
    )
    output = cast(subprocess.CompletedProcess, result).stdout

    # Filter out any lines containing "RUNNING" or "Unknown" states
    filtered_output = "\n".join(
        line
        for line in output.splitlines()
        if "RUNNING" not in line and "Unknown" not in line
    )

    df = pd.read_csv(io.StringIO(filtered_output), sep="|")
    if verbose:
        print(df.head())
    return df

UsageDataFormatter

Formats the usage data DataFrame.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
class UsageDataFormatter:
    """
    Formats the usage data DataFrame.
    """

    def __init__(self):
        """Initialize the UsageDataFormatter."""
        # Try to get config if it's been initialized
        try:
            from .config import get_config
            self.config: Any = get_config()
        except RuntimeError:
            self.config = None

    def format_usage_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Format the usage data DataFrame:
        - Convert Start, End, and Submit columns to datetime format.
        - Split AllocTRES into separate columns: billing, cpu, gres_gpu, mem, node.

        Args:
            df: DataFrame containing usage data fetched from sacct command.

        Returns:
            Formatted DataFrame with datetime columns and split AllocTRES.
        """
        # Convert Start, End, and Submit columns to datetime format
        df["Start"] = pd.to_datetime(df["Start"])
        df["End"] = pd.to_datetime(df["End"])
        df["Submit"] = pd.to_datetime(df["Submit"])

        df[["billing", "cpu", "gres_gpu", "mem", "node"]] = df["AllocTRES"].str.split(
            ",", expand=True
        )
        df["CPUs"] = df["cpu"].str.extract(r"cpu=(\d+)").fillna(0).astype(int)
        df["GPUs"] = df["gres_gpu"].str.extract(r"gres/gpu=(\d+)").fillna(0).astype(int)
        df["mem"] = df["mem"].str.extract(r"mem=(\d+[MG])")
        df["Memory [GB]"] = df.mem.apply(self.convert_mem_to_gb).astype(float)
        df["Nodes"] = df["node"].str.extract(r"node=(\d+)").fillna(0).astype(int)

        df["JobIDgroup"] = df.JobID.apply(lambda s: s.split(".")[0])
        df["Elapsed"] = df.End - df.Start
        df["Elapsed [h]"] = df["Elapsed"].apply(lambda x: x.total_seconds() / 3600)

        df = df.ffill()

        df["State"] = df.State.apply(lambda x: x.split()[0])

        df["StartDay"] = df.Start.dt.normalize()
        df["StartWeekDay"] = df.Start.dt.day_name()
        df["StartWeek"] = df.Start.dt.isocalendar().week
        df["StartMonth"] = df.Start.dt.month
        df["StartYear"] = df.Start.dt.year
        df["StartYear_iso"] = df.Start.dt.isocalendar().year
        df["StartYearWeek"] = (
            df.StartYear_iso.astype(str)
            + "-"
            + df.StartWeek.apply(lambda x: f"{x:02d}")
        )
        df["StartYearMonth"] = (
            df.StartYear.astype(str) + "-" + df.StartMonth.apply(lambda x: f"{x:02d}")
        )

        df["SubmitDay"] = df.Submit.dt.normalize()
        df["SubmitWeekDay"] = df.Submit.dt.day_name()
        df["SubmitWeek"] = df.Submit.dt.isocalendar().week
        df["SubmitMonth"] = df.Submit.dt.month
        df["SubmitYear"] = df.Submit.dt.year
        df["SubmitYear_iso"] = df.Submit.dt.isocalendar().year
        df["SubmitYearWeek"] = (
            df.SubmitYear_iso.astype(str)
            + "-"
            + df.SubmitWeek.apply(lambda x: f"{x:02d}")
        )
        df["SubmitYearMonth"] = (
            df.SubmitYear.astype(str) + "-" + df.SubmitMonth.apply(lambda x: f"{x:02d}")
        )

        df = df.query("JobID == JobIDgroup")
        df = df.drop(["AllocTRES", "AllocCPUS", "mem", "JobIDgroup"], axis=1)

        df["WaitingTime [h]"] = (df.Start - df.Submit).dt.total_seconds() / 3600
        df["GPU-hours"] = df["Elapsed [h]"].fillna(0).astype(float) * df["GPUs"].fillna(
            0
        ).astype(float)
        df["CPU-hours"] = df["Elapsed [h]"].fillna(0).astype(float) * df["CPUs"].fillna(
            0
        ).astype(float)

        df["NodeList"] = df["NodeList"].apply(unpack_nodelist_string)

        df["StartYearWeek"] = df["StartYearWeek"].apply(week_to_date)
        df["StartYearMonth"] = df["StartYearMonth"].apply(month_to_date)
        df["SubmitYearWeek"] = df["SubmitYearWeek"].apply(week_to_date)
        df["SubmitYearMonth"] = df["SubmitYearMonth"].apply(month_to_date)
        df["SubmitDay"] = df["Submit"].dt.normalize()
        df["StartDay"] = df["Start"].dt.normalize()

        df["JobDuration"] = pd.Categorical(
            df["Elapsed [h]"].apply(categorize_time),
            categories=["<5s", "<1min", "<5min", "<1h", "<5h", "<24h", ">=24h"],
            ordered=True,
        )

        # Get columns to return from config if available
        columns: List[str] = [
            "User",
            "QOS",
            "Account",
            "Partition",
            "Submit",
            "Start",
            "End",
            "SubmitDay",
            "SubmitWeekDay",
            "SubmitYearWeek",
            "SubmitYearMonth",
            "StartDay",
            "StartWeekDay",
            "StartYearWeek",
            "StartYearMonth",
            "State",
            "WaitingTime [h]",
            "Elapsed [h]",
            "Nodes",
            "NodeList",
            "CPUs",
            "GPUs",
            "CPU-hours",
            "GPU-hours",
            "AveCPU",
            "TotalCPU",
            "AveDiskRead",
            "AveDiskWrite",
            "MaxRSS",
            "Cluster"
        ]

        if self.config and "data_formatting" in self.config.config and "columns" in self.config.config["data_formatting"]:
            columns = self.config.config["data_formatting"]["columns"]

        return df[columns]

    @staticmethod
    def convert_mem_to_gb(mem_value: Union[str, float, None]) -> float:
        """
        Convert memory value to gigabytes.

        Args:
            mem_value: Memory value with unit (e.g., "1000M" or "1G")

        Returns:
            Float value in gigabytes
        """
        # Check if mem_value is already a float (used for numeric values directly)
        if isinstance(mem_value, float):
            return mem_value

        # Handle None values
        if mem_value is None:
            return 0.0

        # Extract numeric value and unit from mem_value
        numeric_part = float(mem_value[:-1])
        unit = mem_value[-1]

        # Define multiplier based on unit (M for megabytes, G for gigabytes)
        if unit == "M":
            multiplier = 1 / 1024
        elif unit == "G":
            multiplier = 1
        else:
            msg = f"Unsupported memory unit in {mem_value}"
            raise ValueError(msg)

        return numeric_part * multiplier
__init__()

Initialize the UsageDataFormatter.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
253
254
255
256
257
258
259
260
def __init__(self):
    """Initialize the UsageDataFormatter."""
    # Try to get config if it's been initialized
    try:
        from .config import get_config
        self.config: Any = get_config()
    except RuntimeError:
        self.config = None
convert_mem_to_gb(mem_value) staticmethod

Convert memory value to gigabytes.

Parameters:

Name Type Description Default
mem_value Union[str, float, None]

Memory value with unit (e.g., "1000M" or "1G")

required

Returns:

Type Description
float

Float value in gigabytes

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
@staticmethod
def convert_mem_to_gb(mem_value: Union[str, float, None]) -> float:
    """
    Convert memory value to gigabytes.

    Args:
        mem_value: Memory value with unit (e.g., "1000M" or "1G")

    Returns:
        Float value in gigabytes
    """
    # Check if mem_value is already a float (used for numeric values directly)
    if isinstance(mem_value, float):
        return mem_value

    # Handle None values
    if mem_value is None:
        return 0.0

    # Extract numeric value and unit from mem_value
    numeric_part = float(mem_value[:-1])
    unit = mem_value[-1]

    # Define multiplier based on unit (M for megabytes, G for gigabytes)
    if unit == "M":
        multiplier = 1 / 1024
    elif unit == "G":
        multiplier = 1
    else:
        msg = f"Unsupported memory unit in {mem_value}"
        raise ValueError(msg)

    return numeric_part * multiplier
format_usage_data(df)

Format the usage data DataFrame: - Convert Start, End, and Submit columns to datetime format. - Split AllocTRES into separate columns: billing, cpu, gres_gpu, mem, node.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing usage data fetched from sacct command.

required

Returns:

Type Description
DataFrame

Formatted DataFrame with datetime columns and split AllocTRES.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def format_usage_data(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Format the usage data DataFrame:
    - Convert Start, End, and Submit columns to datetime format.
    - Split AllocTRES into separate columns: billing, cpu, gres_gpu, mem, node.

    Args:
        df: DataFrame containing usage data fetched from sacct command.

    Returns:
        Formatted DataFrame with datetime columns and split AllocTRES.
    """
    # Convert Start, End, and Submit columns to datetime format
    df["Start"] = pd.to_datetime(df["Start"])
    df["End"] = pd.to_datetime(df["End"])
    df["Submit"] = pd.to_datetime(df["Submit"])

    df[["billing", "cpu", "gres_gpu", "mem", "node"]] = df["AllocTRES"].str.split(
        ",", expand=True
    )
    df["CPUs"] = df["cpu"].str.extract(r"cpu=(\d+)").fillna(0).astype(int)
    df["GPUs"] = df["gres_gpu"].str.extract(r"gres/gpu=(\d+)").fillna(0).astype(int)
    df["mem"] = df["mem"].str.extract(r"mem=(\d+[MG])")
    df["Memory [GB]"] = df.mem.apply(self.convert_mem_to_gb).astype(float)
    df["Nodes"] = df["node"].str.extract(r"node=(\d+)").fillna(0).astype(int)

    df["JobIDgroup"] = df.JobID.apply(lambda s: s.split(".")[0])
    df["Elapsed"] = df.End - df.Start
    df["Elapsed [h]"] = df["Elapsed"].apply(lambda x: x.total_seconds() / 3600)

    df = df.ffill()

    df["State"] = df.State.apply(lambda x: x.split()[0])

    df["StartDay"] = df.Start.dt.normalize()
    df["StartWeekDay"] = df.Start.dt.day_name()
    df["StartWeek"] = df.Start.dt.isocalendar().week
    df["StartMonth"] = df.Start.dt.month
    df["StartYear"] = df.Start.dt.year
    df["StartYear_iso"] = df.Start.dt.isocalendar().year
    df["StartYearWeek"] = (
        df.StartYear_iso.astype(str)
        + "-"
        + df.StartWeek.apply(lambda x: f"{x:02d}")
    )
    df["StartYearMonth"] = (
        df.StartYear.astype(str) + "-" + df.StartMonth.apply(lambda x: f"{x:02d}")
    )

    df["SubmitDay"] = df.Submit.dt.normalize()
    df["SubmitWeekDay"] = df.Submit.dt.day_name()
    df["SubmitWeek"] = df.Submit.dt.isocalendar().week
    df["SubmitMonth"] = df.Submit.dt.month
    df["SubmitYear"] = df.Submit.dt.year
    df["SubmitYear_iso"] = df.Submit.dt.isocalendar().year
    df["SubmitYearWeek"] = (
        df.SubmitYear_iso.astype(str)
        + "-"
        + df.SubmitWeek.apply(lambda x: f"{x:02d}")
    )
    df["SubmitYearMonth"] = (
        df.SubmitYear.astype(str) + "-" + df.SubmitMonth.apply(lambda x: f"{x:02d}")
    )

    df = df.query("JobID == JobIDgroup")
    df = df.drop(["AllocTRES", "AllocCPUS", "mem", "JobIDgroup"], axis=1)

    df["WaitingTime [h]"] = (df.Start - df.Submit).dt.total_seconds() / 3600
    df["GPU-hours"] = df["Elapsed [h]"].fillna(0).astype(float) * df["GPUs"].fillna(
        0
    ).astype(float)
    df["CPU-hours"] = df["Elapsed [h]"].fillna(0).astype(float) * df["CPUs"].fillna(
        0
    ).astype(float)

    df["NodeList"] = df["NodeList"].apply(unpack_nodelist_string)

    df["StartYearWeek"] = df["StartYearWeek"].apply(week_to_date)
    df["StartYearMonth"] = df["StartYearMonth"].apply(month_to_date)
    df["SubmitYearWeek"] = df["SubmitYearWeek"].apply(week_to_date)
    df["SubmitYearMonth"] = df["SubmitYearMonth"].apply(month_to_date)
    df["SubmitDay"] = df["Submit"].dt.normalize()
    df["StartDay"] = df["Start"].dt.normalize()

    df["JobDuration"] = pd.Categorical(
        df["Elapsed [h]"].apply(categorize_time),
        categories=["<5s", "<1min", "<5min", "<1h", "<5h", "<24h", ">=24h"],
        ordered=True,
    )

    # Get columns to return from config if available
    columns: List[str] = [
        "User",
        "QOS",
        "Account",
        "Partition",
        "Submit",
        "Start",
        "End",
        "SubmitDay",
        "SubmitWeekDay",
        "SubmitYearWeek",
        "SubmitYearMonth",
        "StartDay",
        "StartWeekDay",
        "StartYearWeek",
        "StartYearMonth",
        "State",
        "WaitingTime [h]",
        "Elapsed [h]",
        "Nodes",
        "NodeList",
        "CPUs",
        "GPUs",
        "CPU-hours",
        "GPU-hours",
        "AveCPU",
        "TotalCPU",
        "AveDiskRead",
        "AveDiskWrite",
        "MaxRSS",
        "Cluster"
    ]

    if self.config and "data_formatting" in self.config.config and "columns" in self.config.config["data_formatting"]:
        columns = self.config.config["data_formatting"]["columns"]

    return df[columns]

calculate_date_range(weeks_back=None, from_date=None, to_date=None)

Calculate date range for data extraction based on different inputs.

Parameters:

Name Type Description Default
weeks_back Optional[int]

Number of weeks to go back from today

None
from_date Optional[str]

Starting date in ISO week format (YYYY-Www)

None
to_date Optional[str]

Ending date in ISO week format (YYYY-Www)

None

Returns:

Type Description
Tuple[int, int, int, int]

Tuple of (from_year, from_week, until_year, until_week)

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def calculate_date_range(
    weeks_back: Optional[int] = None, 
    from_date: Optional[str] = None, 
    to_date: Optional[str] = None
) -> Tuple[int, int, int, int]:
    """
    Calculate date range for data extraction based on different inputs.

    Args:
        weeks_back: Number of weeks to go back from today
        from_date: Starting date in ISO week format (YYYY-Www)
        to_date: Ending date in ISO week format (YYYY-Www)

    Returns:
        Tuple of (from_year, from_week, until_year, until_week)
    """
    today = datetime.today()
    until_year = today.year
    until_week = today.isocalendar()[1]

    if weeks_back is not None:
        # Calculate date range based on weeks_back
        weeks_ago = today - timedelta(weeks=weeks_back)
        from_year = weeks_ago.year
        from_week = weeks_ago.isocalendar()[1]
    elif from_date is not None:
        # Parse from_date in ISO week format
        from_year, from_week = parse_iso_week(from_date)

        if to_date is not None:
            # Parse to_date in ISO week format
            until_year, until_week = parse_iso_week(to_date)
    else:
        # Default: 4 weeks back
        four_weeks_ago = today - timedelta(weeks=4)
        from_year = four_weeks_ago.year
        from_week = four_weeks_ago.isocalendar()[1]

    return from_year, from_week, until_year, until_week

main()

Main entry point for the data fetcher.

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def main() -> None:
    """Main entry point for the data fetcher."""
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Fetch and export SLURM usage data.")

    # Define mutually exclusive group for date selection
    date_group = parser.add_mutually_exclusive_group()
    date_group.add_argument(
        "--weeks",
        type=int,
        help="Number of weeks to go back from today",
    )
    date_group.add_argument(
        "--from",
        dest="from_date",
        type=str,
        help="Starting date in ISO week format (e.g., 2025-W01)",
    )

    # Other arguments
    parser.add_argument(
        "--to",
        dest="to_date",
        type=str,
        help="Ending date in ISO week format (e.g., 2025-W52). Only valid with --from",
    )
    parser.add_argument(
        "--config",
        type=str,
        help="Path to configuration file",
    )
    parser.add_argument(
        "--output-dir",
        type=str,
        default="data/hostname/weekly-data",
        help="Directory to save the Parquet files.",
    )
    parser.add_argument(
        "--overwrite",
        action="store_true",
        help="Overwrite existing files if they are already present.",
    )
    parser.add_argument("-v", "--verbose", action="store_true", default=False)

    # Legacy support for older interface
    parser.add_argument(
        "--from-year",
        type=int,
        help=argparse.SUPPRESS,  # Hide from help
    )
    parser.add_argument(
        "--from-week",
        type=int,
        help=argparse.SUPPRESS,  # Hide from help
    )
    parser.add_argument(
        "--until-year",
        type=int,
        help=argparse.SUPPRESS,  # Hide from help
    )
    parser.add_argument(
        "--until-week",
        type=int,
        help=argparse.SUPPRESS,  # Hide from help
    )

    args = parser.parse_args()

    # Initialize config if provided
    if args.config:
        from .config import init_config
        init_config(args.config)

    # Handle the different date specification options
    if args.from_year is not None and args.from_week is not None:
        # Legacy mode
        from_year = args.from_year
        from_week = args.from_week
        until_year = args.until_year or from_year
        until_week = args.until_week or from_week
    else:
        # New mode
        from_year, from_week, until_year, until_week = calculate_date_range(
            weeks_back=args.weeks,
            from_date=args.from_date,
            to_date=args.to_date
        )

    # Create instances of fetcher and formatter
    data_fetcher = UsageDataFetcher()
    data_formatter = UsageDataFormatter()

    # Create and run the data exporter
    exporter = DataExporter(data_fetcher, data_formatter)
    exporter.fetch_data_weekly(
        from_year=from_year,
        from_week=from_week,
        until_year=until_year,
        until_week=until_week,
        output_dir=args.output_dir,
        overwrite=args.overwrite,
        verbose=args.verbose,
    )

parse_iso_week(iso_week_str)

Parse an ISO week string in the format YYYY-Www (e.g., 2025-W01).

Parameters:

Name Type Description Default
iso_week_str str

ISO week string in the format YYYY-Www

required

Returns:

Type Description
Tuple[int, int]

Tuple of (year, week_number)

Source code in src/slurm_usage_history/scripts/get_weekly_usage.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def parse_iso_week(iso_week_str: str) -> Tuple[int, int]:
    """
    Parse an ISO week string in the format YYYY-Www (e.g., 2025-W01).

    Args:
        iso_week_str: ISO week string in the format YYYY-Www

    Returns:
        Tuple of (year, week_number)
    """
    match = re.match(r'(\d{4})-W(\d{1,2})', iso_week_str)
    if not match:
        msg = f"Invalid ISO week format: {iso_week_str}. Expected format: YYYY-Www (e.g., 2025-W01)"
        raise ValueError(msg)

    year = int(match.group(1))
    week = int(match.group(2))

    # Validate week number
    if week < 1 or week > 53:
        msg = f"Invalid week number: {week}. Week number must be between 1 and 53."
        raise ValueError(msg)

    return year, week

tools

categorize_time(hours)

Categorize time in hours into predefined categories.

Parameters:

Name Type Description Default
hours Union[float, int]

Time in hours to categorize.

required

Returns:

Type Description
str

The category corresponding to the given time.

Source code in src/slurm_usage_history/tools.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def categorize_time(hours: Union[float, int]) -> str:
    """
    Categorize time in hours into predefined categories.

    Args:
        hours: Time in hours to categorize.

    Returns:
        The category corresponding to the given time.
    """
    if hours < (5 / 3600):
        return "<5s"
    if hours < (1 / 60):
        return "<1min"
    if hours < (5 / 60):
        return "<5min"
    if hours < (30 / 60):
        return "<30min"
    if hours < 1:
        return "<1h"
    if hours < 5:
        return "<5h"
    if hours < 10:
        return "<10h"
    if hours < 24:
        return "<24h"
    # > 24 hours
    return ">=24h"

categorize_time_series(hours_series)

Categorize a Pandas Series of time in hours into predefined categories.

Parameters:

Name Type Description Default
hours_series Series

A Pandas Series with time in hours.

required

Returns:

Type Description
Series

A Pandas Series of categorical type with the corresponding categories.

Source code in src/slurm_usage_history/tools.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def categorize_time_series(hours_series: pd.Series) -> pd.Series:
    """
    Categorize a Pandas Series of time in hours into predefined categories.

    Args:
        hours_series: A Pandas Series with time in hours.

    Returns:
        A Pandas Series of categorical type with the corresponding categories.
    """
    categories = [
        "<5s",
        "<1min",
        "<15min",
        "<30min",
        "<1h",
        "<5h",
        "<10h",
        "<24h",
        ">=24h",
    ]
    bins = [0, 5 / 3600, 1 / 60, 15 / 60, 30 / 60, 1, 5, 10, 24, float("inf")]

    # Use pd.cut to bin and categorize
    categorized_series = pd.cut(hours_series, bins=bins, labels=categories, right=False, ordered=True)

    return categorized_series.astype("category")

get_time_column(date_str1, date_str2)

Calculate the timespan in days between two dates provided in ISO format strings.

Parameters:

Name Type Description Default
date_str1 str

The first date in ISO format (YYYY-MM-DD).

required
date_str2 str

The second date in ISO format (YYYY-MM-DD).

required

Returns:

Type Description
str

The column name to use based on the timespan between dates.

Source code in src/slurm_usage_history/tools.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def get_time_column(date_str1: str, date_str2: str) -> str:
    """
    Calculate the timespan in days between two dates provided in ISO format strings.

    Args:
        date_str1: The first date in ISO format (YYYY-MM-DD).
        date_str2: The second date in ISO format (YYYY-MM-DD).

    Returns:
        The column name to use based on the timespan between dates.
    """
    # Parse the input strings into datetime objects
    date1 = datetime.strptime(date_str1, "%Y-%m-%d")
    date2 = datetime.strptime(date_str2, "%Y-%m-%d")

    # Calculate the difference in days
    timespan = abs((date2 - date1).days)

    if timespan < 66:
        return "SubmitDay"
    if timespan < (365 * 1.5):
        return "SubmitYearWeek"
    return "SubmitYearMonth"

month_to_date(year_month_str)

Convert a year-month string to a datetime object.

Parameters:

Name Type Description Default
year_month_str str

String in format 'YYYY-MM' where YYYY is year and MM is month

required

Returns:

Type Description
datetime

First day of the specified month

Source code in src/slurm_usage_history/tools.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def month_to_date(year_month_str: str) -> datetime:
    """
    Convert a year-month string to a datetime object.

    Args:
        year_month_str: String in format 'YYYY-MM' where YYYY is year and MM is month

    Returns:
        First day of the specified month
    """
    # Parse the year and month from the input string
    year, month = map(int, year_month_str.split("-"))

    # Construct the first day of the given month
    return datetime(year, month, 1)

natural_sort_key(s)

Natural sorting function that handles numeric parts in strings properly. For example: "cpu2" will come before "cpu11" with natural sorting.

Parameters:

Name Type Description Default
s Optional[Union[str, Any]]

String to convert to a natural sort key

required

Returns:

Type Description
Union[List[Union[int, str]], Any]

A list that can be used as a sort key with proper numeric ordering

Source code in src/slurm_usage_history/tools.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def natural_sort_key(s: Optional[Union[str, Any]]) -> Union[List[Union[int, str]], Any]:
    """
    Natural sorting function that handles numeric parts in strings properly.
    For example: "cpu2" will come before "cpu11" with natural sorting.

    Args:
        s: String to convert to a natural sort key

    Returns:
        A list that can be used as a sort key with proper numeric ordering
    """
    # If the input is not a string (e.g., None or already numeric), return it
    if not isinstance(s, str):
        return s

    # Split the string into text and numeric parts
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r"(\d+)", s)]

print_column_info_in_markdown(df)

Prints the column data types and an example value for each column in markdown format.

Parameters:

Name Type Description Default
df DataFrame

The dataframe to inspect.

required
Source code in src/slurm_usage_history/tools.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def print_column_info_in_markdown(df: pd.DataFrame) -> None:
    """
    Prints the column data types and an example value for each
    column in markdown format.

    Args:
        df: The dataframe to inspect.
    """
    # Create a list to store the column data type and example value
    column_info: List[List[Any]] = []

    for column in df.columns:
        dtype = df[column].dtype
        example_value = df[column].iloc[0]  # Get the first value as example
        column_info.append([column, str(dtype), example_value])

    # Create a DataFrame from the list to display in markdown format
    column_info_df = pd.DataFrame(column_info, columns=["Column", "Data Type", "Example Value"])

    # Print the markdown representation of the dataframe
    print(column_info_df.to_markdown(index=False))

timeit(func)

Decorator to measure execution time of a function.

Source code in src/slurm_usage_history/tools.py
10
11
12
13
14
15
16
17
18
19
def timeit(func: Callable[..., T]) -> Callable[..., T]:
    """Decorator to measure execution time of a function."""
    @functools.wraps(func)
    def wrapper_timeit(*args: Any, **kwargs: Any) -> T:
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed_time = time.perf_counter() - start_time
        print(f"Function '{func.__name__}' executed in {elapsed_time:.4f} seconds")
        return result
    return wrapper_timeit

unpack_nodelist_string(nodelist_str)

Unpacks a GPU string into a list of individual components. Handles ranges (e.g., gpu[08-09,11,14]) and single items (e.g., gpu16).

Parameters:

Name Type Description Default
nodelist_str Optional[str]

String containing node list information

required

Returns:

Type Description
List[str]

List of individual node names

Source code in src/slurm_usage_history/tools.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def unpack_nodelist_string(nodelist_str: Optional[str]) -> List[str]:
    """
    Unpacks a GPU string into a list of individual components.
    Handles ranges (e.g., gpu[08-09,11,14]) and single items (e.g., gpu16).

    Args:
        nodelist_str: String containing node list information

    Returns:
        List of individual node names
    """
    if not nodelist_str or nodelist_str == "None assigned":
        return []

    # Initialize a list to collect unpacked values
    unpacked_list: List[str] = []

    # Match patterns for ranges and list items
    range_pattern = re.compile(r"(\d+)-(\d+)")
    list_pattern = re.compile(r"(\w+)\[(.*?)\]")

    # Check for list patterns (e.g., gpu[08-09,11,14])
    list_match = list_pattern.search(nodelist_str)
    if list_match:
        base, range_str = list_match.groups()
        ranges = range_str.split(",")
        for r in ranges:
            if "-" in r:
                start, end = map(int, r.split("-"))
                for num in range(start, end + 1):
                    unpacked_list.append(f"{base}{num:02d}")
            else:
                unpacked_list.append(f"{base}{r}")
    else:
        # If no list pattern, check if there are single items or other cases
        parts = nodelist_str.split(",")
        for part in parts:
            # Check for ranges in single items (e.g., gpu01,03])
            range_match = range_pattern.search(part)
            if range_match:
                base, _ = part.split("[")
                start, end = map(int, range_match.groups())
                for num in range(start, end + 1):
                    unpacked_list.append(f"{base}{num:02d}")
            else:
                # Handle regular items
                unpacked_list.append(part)

    # Ensure no invalid items like trailing ']'
    return [item.strip("]") for item in unpacked_list]

week_to_date(year_week_str)

Convert a year-week string to a datetime object.

Parameters:

Name Type Description Default
year_week_str str

String in format 'YYYY-WW' where YYYY is year and WW is week number

required

Returns:

Type Description
datetime

First day (Monday) of the specified week

Source code in src/slurm_usage_history/tools.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def week_to_date(year_week_str: str) -> datetime:
    """
    Convert a year-week string to a datetime object.

    Args:
        year_week_str: String in format 'YYYY-WW' where YYYY is year and WW is week number

    Returns:
        First day (Monday) of the specified week
    """
    # Parse the year and week from the input string
    year, week = map(int, year_week_str.split("-"))

    # Define the first day of the year
    first_day_of_year = datetime(year, 1, 1)

    # Find the start of the first week (Monday)
    first_week_start = first_day_of_year - timedelta(days=first_day_of_year.weekday())

    # Calculate the start date of the given week
    return first_week_start + timedelta(weeks=week - 1)