CarbonTracker

The CarbonTracker class is the main interface for starting, stopping and reporting through carbontracker.

Parameters:
  • epochs (int) –

    Total epochs of your training loop.

  • api_keys (dict, default: None ) –

    Dictionary of Carbon Intensity API keys following the {name:key} format. Can also be set using CarbonTracker.set_api_keys

    Example: { \"electricitymaps\": \"abcdefg\" }

  • epochs_before_pred (int, default: 1 ) –

    Epochs to monitor before outputting predicted consumption. Set to -1 for all epochs. Set to 0 for no prediction.

  • monitor_epochs (int, default: -1 ) –

    Total number of epochs to monitor. Outputs actual consumption when reached. Set to -1 for all epochs. Cannot be less than epochs_before_pred or equal to 0.

  • update_interval (int, default: 1 ) –

    Interval in seconds between power usage measurements are taken by sleeper thread.

  • interpretable (bool, default: True ) –

    If set to True then the CO2eq are also converted to interpretable numbers such as the equivalent distance travelled in a car, etc. Otherwise, no conversions are done.

  • stop_and_confirm (bool, default: False ) –

    If set to True then the main thread (with your training loop) is paused after epochs_before_pred epochs to output the prediction and the user will need to confirm to continue training. Otherwise, prediction is output and training is continued instantly.

  • ignore_errors (bool, default: False ) –

    If set to True then all errors will cause energy monitoring to be stopped and training will continue. Otherwise, training will be interrupted as with regular errors.

  • components (str, default: 'all' ) –

    Comma-separated string of which components to monitor. Options are: "all", "gpu", "cpu", or "gpu,cpu".

  • devices_by_pid (bool, default: False ) –

    If True, only devices (under the chosen components) running processes associated with the main process are measured. If False, all available devices are measured. Note that this requires your devices to have active processes before instantiating the CarbonTracker class.

  • log_dir (str, default: None ) –

    Path to the desired directory to write log files. If None, then no logging will be done.

  • log_file_prefix (str, default: '' ) –

    Prefix to add to the log file name.

  • verbose (int, default: 1 ) –

    Sets the level of verbosity.

  • decimal_precision (int, default: 12 ) –

    Desired decimal precision of reported values.

Example

Tracking the carbon intensity of PyTorch model training:

from carbontracker.tracker import CarbonTracker

tracker = CarbonTracker(epochs=max_epochs)
# Training loop.
for epoch in range(max_epochs):
    tracker.epoch_start()
    # Your model training.
    tracker.epoch_end()

# Optional: Add a stop in case of early termination before all monitor_epochs has
# been monitored to ensure that actual consumption is reported.
tracker.stop()
Source code in carbontracker/tracker.py
class CarbonTracker:
    """

    The CarbonTracker class is the main interface for starting, stopping and reporting through **carbontracker**.

    Args:
        epochs (int): Total epochs of your training loop.
        api_keys (dict, optional): Dictionary of Carbon Intensity API keys following the {name:key} format. Can also be set using `CarbonTracker.set_api_keys`

            Example: `{ \\"electricitymaps\\": \\"abcdefg\\" }`
        epochs_before_pred (int, optional): Epochs to monitor before outputting predicted consumption. Set to -1 for all epochs. Set to 0 for no prediction.
        monitor_epochs (int, optional): Total number of epochs to monitor. Outputs actual consumption when reached. Set to -1 for all epochs. Cannot be less than `epochs_before_pred` or equal to 0.
        update_interval (int, optional): Interval in seconds between power usage measurements are taken by sleeper thread.
        interpretable (bool, optional): If set to `True` then the CO2eq are also converted to interpretable numbers such as the equivalent distance travelled in a car, etc. Otherwise, no conversions are done.
        stop_and_confirm (bool, optional): If set to `True` then the main thread (with your training loop) is paused after epochs_before_pred epochs to output the prediction and the user will need to confirm to continue training. Otherwise, prediction is output and training is continued instantly.
        ignore_errors (bool, optional): If set to `True` then all errors will cause energy monitoring to be stopped and training will continue. Otherwise, training will be interrupted as with regular errors.
        components (str, optional): Comma-separated string of which components to monitor. Options are: `"all"`, `"gpu"`, `"cpu"`, or `"gpu,cpu"`.
        devices_by_pid (bool, optional): If `True`, only devices (under the chosen components) running processes associated with the main process are measured. If False, all available devices are measured. Note that this requires your devices to have active processes before instantiating the CarbonTracker class.
        log_dir (str, optional): Path to the desired directory to write log files. If `None`, then no logging will be done.
        log_file_prefix (str, optional): Prefix to add to the log file name.
        verbose (int, optional): Sets the level of verbosity.
        decimal_precision (int, optional): Desired decimal precision of reported values.

    Example:
        Tracking the carbon intensity of PyTorch model training:

            from carbontracker.tracker import CarbonTracker

            tracker = CarbonTracker(epochs=max_epochs)
            # Training loop.
            for epoch in range(max_epochs):
                tracker.epoch_start()
                # Your model training.
                tracker.epoch_end()

            # Optional: Add a stop in case of early termination before all monitor_epochs has
            # been monitored to ensure that actual consumption is reported.
            tracker.stop()

    """

    def __init__(
        self,
        epochs,
        epochs_before_pred=1,
        monitor_epochs=-1,
        update_interval=1,
        interpretable=True,
        stop_and_confirm=False,
        ignore_errors=False,
        components="all",
        devices_by_pid=False,
        log_dir=None,
        log_file_prefix="",
        verbose=1,
        decimal_precision=12,
        api_keys=None,
    ):
        if api_keys is not None:
            self.set_api_keys(api_keys)

        self.epochs = epochs
        self.epochs_before_pred = (
            epochs if epochs_before_pred < 0 else epochs_before_pred
        )
        self.monitor_epochs = epochs if monitor_epochs < 0 else monitor_epochs
        if self.monitor_epochs == 0 or self.monitor_epochs < self.epochs_before_pred:
            raise ValueError(
                "Argument monitor_epochs expected a value in "
                f"{{-1, >0, >=epochs_before_pred}}, got {monitor_epochs}."
            )
        self.interpretable = interpretable
        self.stop_and_confirm = stop_and_confirm
        self.ignore_errors = ignore_errors
        self.epoch_counter = 0
        self.decimal_precision = decimal_precision
        self.deleted = False

        try:
            pids = self._get_pids()
            self.logger = loggerutil.Logger(
                log_dir=log_dir,
                verbose=verbose,
                log_prefix=log_file_prefix,
                logger_id=str(randint(1, 999999)),
            )
            self.tracker = CarbonTrackerThread(
                delete=self._delete,
                components=component.create_components(
                    components=components, pids=pids, devices_by_pid=devices_by_pid, logger=self.logger
                ),
                logger=self.logger,
                ignore_errors=ignore_errors,
                update_interval=update_interval,
            )
            self.intensity_stopper = Event()
            self.intensity_updater = CarbonIntensityThread(
                self.logger, self.intensity_stopper
            )
        except Exception as e:
            self._handle_error(e)

    def epoch_start(self):
        """
        Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
        """
        if self.deleted:
            return

        try:
            self.tracker.epoch_start()
            self.epoch_counter += 1
        except Exception as e:
            self._handle_error(e)

    def epoch_end(self):
        """
        Ends tracking energy consumption for current epoch. Call in the end of training loop.
        """
        if self.deleted:
            return

        try:
            self.tracker.epoch_end()

            if self.epoch_counter == self.epochs_before_pred:
                self._output_pred()
                if self.stop_and_confirm:
                    self._user_query()

            if self.epoch_counter == self.monitor_epochs:
                self._output_actual()

            if self.epoch_counter == self.monitor_epochs:
                self._delete()
        except Exception as e:
            self._handle_error(e)

    def stop(self):
        """Ensure that tracker is stopped and deleted. E.g. use with early
        stopping, where not all monitor_epochs have been run."""
        if self.deleted:
            return
        self.logger.info(
            f"Training was interrupted before all {self.monitor_epochs} epochs"
            " were monitored."
        )
        # Decrement epoch_counter with 1 since measurements for ultimate epoch
        # was interrupted and is not accounted for.
        self.epoch_counter -= 1
        self._output_actual()
        self._delete()

    def set_api_keys(self, api_dict):
        """Set API keys (given as {name:key}) for carbon intensity fetchers."""
        try:
            for name, key in api_dict.items():
                if name.lower() == "electricitymaps":
                    electricitymaps.ElectricityMap.set_api_key(key)
                else:
                    raise exceptions.FetcherNameError(
                        f"Invalid API name '{name}' given."
                    )
        except Exception as e:
            self._handle_error(e)

    def _handle_error(self, error):
        err_str = traceback.format_exc()
        if self.ignore_errors:
            err_str = (
                f"Ignored error: {err_str}Continued training without " "monitoring..."
            )

        self.logger.err_critical(err_str)
        self.logger.output(err_str)

        if self.ignore_errors:
            # Stop monitoring but continue training.
            self._delete()
        else:
            sys.exit(70)

    def _output_energy(self, description, time, energy, co2eq, conversions):
        precision = self.decimal_precision
        output = (
            f"\n{description}\n"
            f"\tTime:\t{loggerutil.convert_to_timestring(time)}\n"
            f"\tEnergy:\t{energy:.{precision}f} kWh\n"
            f"\tCO2eq:\t{co2eq:.{precision}f} g"
        )

        if conversions:
            conv_str = "\n\tThis is equivalent to:"
            for units, unit in conversions:
                conv_str += f"\n\t{units:.{precision}f} {unit}"
            output += conv_str

        self.logger.output(output, verbose_level=1)

    def _output_actual(self):
        """Output actual usage so far."""
        energy_usages = self.tracker.total_energy_per_epoch()
        energy = energy_usages.sum()
        times = self.tracker.epoch_times
        time = np.sum(times)
        _co2eq = self._co2eq(energy)
        conversions = co2eq.convert(_co2eq) if self.interpretable else None
        if self.epochs_before_pred == 0:
            self._output_energy(
                "Actual consumption:", time, energy, _co2eq, conversions
            )
        else:
            self._output_energy(
                f"Actual consumption for {self.epoch_counter} epoch(s):",
                time,
                energy,
                _co2eq,
                conversions,
            )

    def _output_pred(self):
        """Output predicted usage for full training epochs."""
        epoch_energy_usages = self.tracker.total_energy_per_epoch()
        epoch_times = self.tracker.epoch_times
        pred_energy = predictor.predict_energy(self.epochs, epoch_energy_usages)
        pred_time = predictor.predict_time(self.epochs, epoch_times)
        pred_co2eq = self._co2eq(pred_energy, pred_time)
        conversions = co2eq.convert(pred_co2eq) if self.interpretable else None

        self._output_energy(
            f"Predicted consumption for {self.epochs} epoch(s):",
            pred_time,
            pred_energy,
            pred_co2eq,
            conversions,
        )

    def _co2eq(self, energy_usage, pred_time_dur=None):
        """ "Returns the CO2eq (g) of the energy usage (kWh)."""
        if pred_time_dur:
            ci = self.intensity_updater.predict_carbon_intensity(pred_time_dur)
        else:
            ci = self.intensity_updater.average_carbon_intensity()
        co2eq = energy_usage * ci.carbon_intensity
        return co2eq

    def _user_query(self):
        self.logger.output("Continue training (y/n)?")
        user_input = input().lower()
        self._check_input(user_input)

    def _check_input(self, user_input: str):
        if user_input == "y":
            self.logger.output("Continuing...")
            return
        elif user_input == "n":
            self.logger.info("Session ended by user.")
            self.logger.output("Quitting...")
            sys.exit(0)
        else:
            self.logger.output("Input not recognized. Try again (y/n):")
            user_input = input().lower()
            self._check_input(user_input)

    def _delete(self):
        self.tracker.stop()
        self.intensity_stopper.set()
        del self.logger
        del self.tracker
        del self.intensity_updater
        del self.intensity_stopper
        self.deleted = True

    def _get_pids(self) -> List[int]:
        """Get current process id and all children process ids."""
        process = psutil.Process()
        pids = [process.pid] + [child.pid for child in process.children(recursive=True)]
        return pids

_co2eq(energy_usage, pred_time_dur=None)

"Returns the CO2eq (g) of the energy usage (kWh).

Source code in carbontracker/tracker.py
def _co2eq(self, energy_usage, pred_time_dur=None):
    """ "Returns the CO2eq (g) of the energy usage (kWh)."""
    if pred_time_dur:
        ci = self.intensity_updater.predict_carbon_intensity(pred_time_dur)
    else:
        ci = self.intensity_updater.average_carbon_intensity()
    co2eq = energy_usage * ci.carbon_intensity
    return co2eq

_get_pids()

Get current process id and all children process ids.

Source code in carbontracker/tracker.py
def _get_pids(self) -> List[int]:
    """Get current process id and all children process ids."""
    process = psutil.Process()
    pids = [process.pid] + [child.pid for child in process.children(recursive=True)]
    return pids

_output_actual()

Output actual usage so far.

Source code in carbontracker/tracker.py
def _output_actual(self):
    """Output actual usage so far."""
    energy_usages = self.tracker.total_energy_per_epoch()
    energy = energy_usages.sum()
    times = self.tracker.epoch_times
    time = np.sum(times)
    _co2eq = self._co2eq(energy)
    conversions = co2eq.convert(_co2eq) if self.interpretable else None
    if self.epochs_before_pred == 0:
        self._output_energy(
            "Actual consumption:", time, energy, _co2eq, conversions
        )
    else:
        self._output_energy(
            f"Actual consumption for {self.epoch_counter} epoch(s):",
            time,
            energy,
            _co2eq,
            conversions,
        )

_output_pred()

Output predicted usage for full training epochs.

Source code in carbontracker/tracker.py
def _output_pred(self):
    """Output predicted usage for full training epochs."""
    epoch_energy_usages = self.tracker.total_energy_per_epoch()
    epoch_times = self.tracker.epoch_times
    pred_energy = predictor.predict_energy(self.epochs, epoch_energy_usages)
    pred_time = predictor.predict_time(self.epochs, epoch_times)
    pred_co2eq = self._co2eq(pred_energy, pred_time)
    conversions = co2eq.convert(pred_co2eq) if self.interpretable else None

    self._output_energy(
        f"Predicted consumption for {self.epochs} epoch(s):",
        pred_time,
        pred_energy,
        pred_co2eq,
        conversions,
    )

epoch_end()

Ends tracking energy consumption for current epoch. Call in the end of training loop.

Source code in carbontracker/tracker.py
def epoch_end(self):
    """
    Ends tracking energy consumption for current epoch. Call in the end of training loop.
    """
    if self.deleted:
        return

    try:
        self.tracker.epoch_end()

        if self.epoch_counter == self.epochs_before_pred:
            self._output_pred()
            if self.stop_and_confirm:
                self._user_query()

        if self.epoch_counter == self.monitor_epochs:
            self._output_actual()

        if self.epoch_counter == self.monitor_epochs:
            self._delete()
    except Exception as e:
        self._handle_error(e)

epoch_start()

Starts tracking energy consumption for current epoch. Call in the beginning of training loop.

Source code in carbontracker/tracker.py
def epoch_start(self):
    """
    Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
    """
    if self.deleted:
        return

    try:
        self.tracker.epoch_start()
        self.epoch_counter += 1
    except Exception as e:
        self._handle_error(e)

set_api_keys(api_dict)

Set API keys (given as {name:key}) for carbon intensity fetchers.

Source code in carbontracker/tracker.py
def set_api_keys(self, api_dict):
    """Set API keys (given as {name:key}) for carbon intensity fetchers."""
    try:
        for name, key in api_dict.items():
            if name.lower() == "electricitymaps":
                electricitymaps.ElectricityMap.set_api_key(key)
            else:
                raise exceptions.FetcherNameError(
                    f"Invalid API name '{name}' given."
                )
    except Exception as e:
        self._handle_error(e)

stop()

Ensure that tracker is stopped and deleted. E.g. use with early stopping, where not all monitor_epochs have been run.

Source code in carbontracker/tracker.py
def stop(self):
    """Ensure that tracker is stopped and deleted. E.g. use with early
    stopping, where not all monitor_epochs have been run."""
    if self.deleted:
        return
    self.logger.info(
        f"Training was interrupted before all {self.monitor_epochs} epochs"
        " were monitored."
    )
    # Decrement epoch_counter with 1 since measurements for ultimate epoch
    # was interrupted and is not accounted for.
    self.epoch_counter -= 1
    self._output_actual()
    self._delete()