CarbonTracker
The CarbonTracker class is the main interface for starting, stopping and reporting through carbontracker.
Parameters: |
|
---|
Example
Tracking the carbon intensity of PyTorch model training:
from carbontracker.tracker import CarbonTracker
tracker = CarbonTracker(epochs=max_epochs)
# Training loop.
for epoch in range(max_epochs):
tracker.epoch_start()
# Your model training.
tracker.epoch_end()
# Optional: Add a stop in case of early termination before all monitor_epochs has
# been monitored to ensure that actual consumption is reported.
tracker.stop()
Source code in carbontracker/tracker.py
class CarbonTracker:
"""
The CarbonTracker class is the main interface for starting, stopping and reporting through **carbontracker**.
Args:
epochs (int): Total epochs of your training loop.
api_keys (dict, optional): Dictionary of Carbon Intensity API keys following the {name:key} format. Can also be set using `CarbonTracker.set_api_keys`
Example: `{ \\"electricitymaps\\": \\"abcdefg\\" }`
epochs_before_pred (int, optional): Epochs to monitor before outputting predicted consumption. Set to -1 for all epochs. Set to 0 for no prediction.
monitor_epochs (int, optional): Total number of epochs to monitor. Outputs actual consumption when reached. Set to -1 for all epochs. Cannot be less than `epochs_before_pred` or equal to 0.
update_interval (int, optional): Interval in seconds between power usage measurements are taken by sleeper thread.
interpretable (bool, optional): If set to `True` then the CO2eq are also converted to interpretable numbers such as the equivalent distance travelled in a car, etc. Otherwise, no conversions are done.
stop_and_confirm (bool, optional): If set to `True` then the main thread (with your training loop) is paused after epochs_before_pred epochs to output the prediction and the user will need to confirm to continue training. Otherwise, prediction is output and training is continued instantly.
ignore_errors (bool, optional): If set to `True` then all errors will cause energy monitoring to be stopped and training will continue. Otherwise, training will be interrupted as with regular errors.
components (str, optional): Comma-separated string of which components to monitor. Options are: `"all"`, `"gpu"`, `"cpu"`, or `"gpu,cpu"`.
devices_by_pid (bool, optional): If `True`, only devices (under the chosen components) running processes associated with the main process are measured. If False, all available devices are measured. Note that this requires your devices to have active processes before instantiating the CarbonTracker class.
log_dir (str, optional): Path to the desired directory to write log files. If `None`, then no logging will be done.
log_file_prefix (str, optional): Prefix to add to the log file name.
verbose (int, optional): Sets the level of verbosity.
decimal_precision (int, optional): Desired decimal precision of reported values.
sim_cpu (float, optional): Custom CPU value for components.
sim_cpu_tdp (float, optional): Custom TDP value for components.
sim_cpu_util (float, optional): Custom CPU utilization for components.
sim_gpu (float, optional): Custom GPU value for components.
sim_gpu_watts (float, optional): Custom GPU Watts value for components.
sim_gpu_util (float, optional): Custom GPU utilization for components.
Example:
Tracking the carbon intensity of PyTorch model training:
from carbontracker.tracker import CarbonTracker
tracker = CarbonTracker(epochs=max_epochs)
# Training loop.
for epoch in range(max_epochs):
tracker.epoch_start()
# Your model training.
tracker.epoch_end()
# Optional: Add a stop in case of early termination before all monitor_epochs has
# been monitored to ensure that actual consumption is reported.
tracker.stop()
"""
def __init__(
self,
epochs,
epochs_before_pred=1,
monitor_epochs=-1,
update_interval=1,
interpretable=True,
stop_and_confirm=False,
ignore_errors=False,
components="all",
devices_by_pid=False,
log_dir=None,
log_file_prefix="",
verbose=1,
decimal_precision=12,
api_keys=None,
sim_cpu=None,
sim_cpu_tdp=None,
sim_cpu_util=None,
sim_gpu=None,
sim_gpu_watts=None,
sim_gpu_util=None
):
"""Initialize CarbonTracker.
Args:
epochs (int): Number of epochs to monitor.
epochs_before_pred (int, optional): Number of epochs to monitor before making predictions. Defaults to 1.
monitor_epochs (int, optional): Number of epochs to monitor. Defaults to -1.
update_interval (int, optional): Interval in seconds between measurements. Defaults to 1.
interpretable (bool, optional): Whether to make predictions interpretable. Defaults to True.
stop_and_confirm (bool, optional): Whether to stop and confirm before making predictions. Defaults to False.
ignore_errors (bool, optional): Whether to ignore errors. Defaults to False.
components (str, optional): Components to monitor. Defaults to "all".
devices_by_pid (bool, optional): Whether to monitor devices by PID. Defaults to False.
log_dir (str, optional): Directory to store logs. Defaults to None.
log_file_prefix (str, optional): Prefix for log files. Defaults to "".
verbose (int, optional): Verbosity level. Defaults to 1.
decimal_precision (int, optional): Decimal precision for measurements. Defaults to 12.
api_keys (dict, optional): API keys for external services. Defaults to None.
sim_cpu (str, optional): Simulated CPU name. Defaults to None.
sim_cpu_tdp (float, optional): Simulated CPU TDP in Watts. Defaults to None.
sim_cpu_util (float, optional): Simulated CPU utilization. Defaults to None.
sim_gpu (str, optional): Simulated GPU name. Defaults to None.
sim_gpu_watts (float, optional): Simulated GPU power consumption in Watts. Defaults to None.
sim_gpu_util (float, optional): Simulated GPU utilization. Defaults to None.
"""
# Add validation for monitor_epochs
if monitor_epochs != -1:
if monitor_epochs < epochs_before_pred:
raise ValueError("monitor_epochs cannot be less than epochs_before_pred")
if monitor_epochs == 0:
raise ValueError("monitor_epochs cannot be zero")
# Validate simulated component configurations
if sim_cpu is not None and sim_cpu_tdp is None:
raise ValueError("When using simulated CPU (sim_cpu), you must also specify the CPU TDP (sim_cpu_tdp)")
if sim_gpu is not None and sim_gpu_watts is None:
raise ValueError("When using simulated GPU (sim_gpu), you must also specify the GPU power consumption (sim_gpu_watts)")
self.epochs = epochs
self.epochs_before_pred = epochs_before_pred
self.monitor_epochs = monitor_epochs
self.update_interval = update_interval
self.interpretable = interpretable
self.stop_and_confirm = stop_and_confirm
self.ignore_errors = ignore_errors
self.components = components
self.devices_by_pid = devices_by_pid
self.log_dir = log_dir
self.log_file_prefix = log_file_prefix
self.verbose = verbose
self.decimal_precision = decimal_precision
self.api_keys = api_keys
self.sim_cpu = sim_cpu
self.sim_cpu_tdp = sim_cpu_tdp
self.sim_cpu_util = sim_cpu_util
self.sim_gpu = sim_gpu
self.sim_gpu_watts = sim_gpu_watts
self.sim_gpu_util = sim_gpu_util
self.deleted = False
self.epoch_counter = 0
try:
pids = self._get_pids()
self.logger = loggerutil.Logger(
log_dir=log_dir,
verbose=verbose,
log_prefix=log_file_prefix,
logger_id=str(randint(1, 999999)),
)
self.tracker = CarbonTrackerThread(
delete=self._delete,
components=component.create_components(
components=components, pids=pids, devices_by_pid=devices_by_pid, logger=self.logger, sim_cpu=self.sim_cpu, sim_cpu_tdp=self.sim_cpu_tdp, sim_cpu_util=self.sim_cpu_util, sim_gpu=self.sim_gpu, sim_gpu_watts=self.sim_gpu_watts, sim_gpu_util=self.sim_gpu_util
),
logger=self.logger,
ignore_errors=ignore_errors,
update_interval=update_interval,
)
self.intensity_stopper = Event()
self.intensity_updater = CarbonIntensityThread(
self.logger, self.intensity_stopper
)
except Exception as e:
self._handle_error(e)
def epoch_start(self):
"""
Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_start()
self.epoch_counter += 1
except Exception as e:
self._handle_error(e)
def epoch_end(self):
"""
Ends tracking energy consumption for current epoch. Call in the end of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_end()
if self.epoch_counter == self.epochs_before_pred:
self._output_pred()
if self.stop_and_confirm:
self._user_query()
if self.epoch_counter == self.monitor_epochs:
self._output_actual()
if self.epoch_counter == self.monitor_epochs:
self._delete()
except Exception as e:
self._handle_error(e)
def stop(self):
"""Ensure that tracker is stopped and deleted. E.g. use with early
stopping, where not all monitor_epochs have been run."""
if self.deleted:
return
self.logger.info(
f"Training was interrupted before all {self.monitor_epochs} epochs"
" were monitored."
)
# Decrement epoch_counter with 1 since measurements for ultimate epoch
# was interrupted and is not accounted for.
self.epoch_counter -= 1
self._output_actual()
self._delete()
def set_api_keys(self, api_dict):
"""Set API keys (given as {name:key}) for carbon intensity fetchers."""
try:
for name, key in api_dict.items():
if name.lower() == "electricitymaps":
electricitymaps.ElectricityMap.set_api_key(key)
else:
raise exceptions.FetcherNameError(
f"Invalid API name '{name}' given."
)
except Exception as e:
self._handle_error(e)
def _handle_error(self, error):
err_str = traceback.format_exc()
if self.ignore_errors:
err_str = (
f"Ignored error: {err_str}Continued training without " "monitoring..."
)
self.logger.err_critical(err_str)
self.logger.output(err_str)
if self.ignore_errors:
# Stop monitoring but continue training.
self._delete()
else:
sys.exit(70)
def _output_energy(self, description, time, energy, co2eq, conversions):
precision = self.decimal_precision
output = (
f"\n{description}\n"
f"\tTime:\t{loggerutil.convert_to_timestring(time)}\n"
f"\tEnergy:\t{energy:.{precision}f} kWh\n"
f"\tCO2eq:\t{co2eq:.{precision}f} g"
)
if conversions:
conv_str = "\n\tThis is equivalent to:"
for units, unit in conversions:
conv_str += f"\n\t{units:.{precision}f} {unit}"
output += conv_str
self.logger.output(output, verbose_level=1)
def _output_actual(self):
"""Output actual usage so far."""
energy_usages = self.tracker.total_energy_per_epoch()
energy = energy_usages.sum()
times = self.tracker.epoch_times
time = np.sum(times)
_co2eq = self._co2eq(energy)
conversions = co2eq.convert(_co2eq) if self.interpretable else None
if self.epochs_before_pred == 0:
self._output_energy(
"Actual consumption:", time, energy, _co2eq, conversions
)
else:
self._output_energy(
f"Actual consumption for {self.epoch_counter} epoch(s):",
time,
energy,
_co2eq,
conversions,
)
def _output_pred(self):
"""Output predicted usage for full training epochs."""
epoch_energy_usages = self.tracker.total_energy_per_epoch()
epoch_times = self.tracker.epoch_times
pred_energy = predictor.predict_energy(self.epochs, epoch_energy_usages)
pred_time = predictor.predict_time(self.epochs, epoch_times)
pred_co2eq = self._co2eq(pred_energy, pred_time)
conversions = co2eq.convert(pred_co2eq) if self.interpretable else None
self._output_energy(
f"Predicted consumption for {self.epochs} epoch(s):",
pred_time,
pred_energy,
pred_co2eq,
conversions,
)
def _co2eq(self, energy_usage, pred_time_dur=None):
""" "Returns the CO2eq (g) of the energy usage (kWh)."""
if pred_time_dur:
ci = self.intensity_updater.predict_carbon_intensity(pred_time_dur)
else:
ci = self.intensity_updater.average_carbon_intensity()
co2eq = energy_usage * ci.carbon_intensity
return co2eq
def _user_query(self):
self.logger.output("Continue training (y/n)?")
user_input = input().lower()
self._check_input(user_input)
def _check_input(self, user_input: str):
if user_input == "y":
self.logger.output("Continuing...")
return
elif user_input == "n":
self.logger.info("Session ended by user.")
self.logger.output("Quitting...")
sys.exit(0)
else:
self.logger.output("Input not recognized. Try again (y/n):")
user_input = input().lower()
self._check_input(user_input)
def _delete(self):
self.tracker.stop()
self.intensity_stopper.set()
del self.logger
del self.tracker
del self.intensity_updater
del self.intensity_stopper
self.deleted = True
def _get_pids(self) -> List[int]:
"""Get current process id and all children process ids."""
process = psutil.Process()
pids = [process.pid] + [child.pid for child in process.children(recursive=True)]
return pids
__init__(epochs, epochs_before_pred=1, monitor_epochs=-1, update_interval=1, interpretable=True, stop_and_confirm=False, ignore_errors=False, components='all', devices_by_pid=False, log_dir=None, log_file_prefix='', verbose=1, decimal_precision=12, api_keys=None, sim_cpu=None, sim_cpu_tdp=None, sim_cpu_util=None, sim_gpu=None, sim_gpu_watts=None, sim_gpu_util=None)
Initialize CarbonTracker.
Parameters: |
|
---|
Source code in carbontracker/tracker.py
def __init__(
self,
epochs,
epochs_before_pred=1,
monitor_epochs=-1,
update_interval=1,
interpretable=True,
stop_and_confirm=False,
ignore_errors=False,
components="all",
devices_by_pid=False,
log_dir=None,
log_file_prefix="",
verbose=1,
decimal_precision=12,
api_keys=None,
sim_cpu=None,
sim_cpu_tdp=None,
sim_cpu_util=None,
sim_gpu=None,
sim_gpu_watts=None,
sim_gpu_util=None
):
"""Initialize CarbonTracker.
Args:
epochs (int): Number of epochs to monitor.
epochs_before_pred (int, optional): Number of epochs to monitor before making predictions. Defaults to 1.
monitor_epochs (int, optional): Number of epochs to monitor. Defaults to -1.
update_interval (int, optional): Interval in seconds between measurements. Defaults to 1.
interpretable (bool, optional): Whether to make predictions interpretable. Defaults to True.
stop_and_confirm (bool, optional): Whether to stop and confirm before making predictions. Defaults to False.
ignore_errors (bool, optional): Whether to ignore errors. Defaults to False.
components (str, optional): Components to monitor. Defaults to "all".
devices_by_pid (bool, optional): Whether to monitor devices by PID. Defaults to False.
log_dir (str, optional): Directory to store logs. Defaults to None.
log_file_prefix (str, optional): Prefix for log files. Defaults to "".
verbose (int, optional): Verbosity level. Defaults to 1.
decimal_precision (int, optional): Decimal precision for measurements. Defaults to 12.
api_keys (dict, optional): API keys for external services. Defaults to None.
sim_cpu (str, optional): Simulated CPU name. Defaults to None.
sim_cpu_tdp (float, optional): Simulated CPU TDP in Watts. Defaults to None.
sim_cpu_util (float, optional): Simulated CPU utilization. Defaults to None.
sim_gpu (str, optional): Simulated GPU name. Defaults to None.
sim_gpu_watts (float, optional): Simulated GPU power consumption in Watts. Defaults to None.
sim_gpu_util (float, optional): Simulated GPU utilization. Defaults to None.
"""
# Add validation for monitor_epochs
if monitor_epochs != -1:
if monitor_epochs < epochs_before_pred:
raise ValueError("monitor_epochs cannot be less than epochs_before_pred")
if monitor_epochs == 0:
raise ValueError("monitor_epochs cannot be zero")
# Validate simulated component configurations
if sim_cpu is not None and sim_cpu_tdp is None:
raise ValueError("When using simulated CPU (sim_cpu), you must also specify the CPU TDP (sim_cpu_tdp)")
if sim_gpu is not None and sim_gpu_watts is None:
raise ValueError("When using simulated GPU (sim_gpu), you must also specify the GPU power consumption (sim_gpu_watts)")
self.epochs = epochs
self.epochs_before_pred = epochs_before_pred
self.monitor_epochs = monitor_epochs
self.update_interval = update_interval
self.interpretable = interpretable
self.stop_and_confirm = stop_and_confirm
self.ignore_errors = ignore_errors
self.components = components
self.devices_by_pid = devices_by_pid
self.log_dir = log_dir
self.log_file_prefix = log_file_prefix
self.verbose = verbose
self.decimal_precision = decimal_precision
self.api_keys = api_keys
self.sim_cpu = sim_cpu
self.sim_cpu_tdp = sim_cpu_tdp
self.sim_cpu_util = sim_cpu_util
self.sim_gpu = sim_gpu
self.sim_gpu_watts = sim_gpu_watts
self.sim_gpu_util = sim_gpu_util
self.deleted = False
self.epoch_counter = 0
try:
pids = self._get_pids()
self.logger = loggerutil.Logger(
log_dir=log_dir,
verbose=verbose,
log_prefix=log_file_prefix,
logger_id=str(randint(1, 999999)),
)
self.tracker = CarbonTrackerThread(
delete=self._delete,
components=component.create_components(
components=components, pids=pids, devices_by_pid=devices_by_pid, logger=self.logger, sim_cpu=self.sim_cpu, sim_cpu_tdp=self.sim_cpu_tdp, sim_cpu_util=self.sim_cpu_util, sim_gpu=self.sim_gpu, sim_gpu_watts=self.sim_gpu_watts, sim_gpu_util=self.sim_gpu_util
),
logger=self.logger,
ignore_errors=ignore_errors,
update_interval=update_interval,
)
self.intensity_stopper = Event()
self.intensity_updater = CarbonIntensityThread(
self.logger, self.intensity_stopper
)
except Exception as e:
self._handle_error(e)
epoch_end()
Ends tracking energy consumption for current epoch. Call in the end of training loop.
Source code in carbontracker/tracker.py
def epoch_end(self):
"""
Ends tracking energy consumption for current epoch. Call in the end of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_end()
if self.epoch_counter == self.epochs_before_pred:
self._output_pred()
if self.stop_and_confirm:
self._user_query()
if self.epoch_counter == self.monitor_epochs:
self._output_actual()
if self.epoch_counter == self.monitor_epochs:
self._delete()
except Exception as e:
self._handle_error(e)
epoch_start()
Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
Source code in carbontracker/tracker.py
def epoch_start(self):
"""
Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_start()
self.epoch_counter += 1
except Exception as e:
self._handle_error(e)
set_api_keys(api_dict)
Set API keys (given as {name:key}) for carbon intensity fetchers.
Source code in carbontracker/tracker.py
def set_api_keys(self, api_dict):
"""Set API keys (given as {name:key}) for carbon intensity fetchers."""
try:
for name, key in api_dict.items():
if name.lower() == "electricitymaps":
electricitymaps.ElectricityMap.set_api_key(key)
else:
raise exceptions.FetcherNameError(
f"Invalid API name '{name}' given."
)
except Exception as e:
self._handle_error(e)
stop()
Ensure that tracker is stopped and deleted. E.g. use with early stopping, where not all monitor_epochs have been run.
Source code in carbontracker/tracker.py
def stop(self):
"""Ensure that tracker is stopped and deleted. E.g. use with early
stopping, where not all monitor_epochs have been run."""
if self.deleted:
return
self.logger.info(
f"Training was interrupted before all {self.monitor_epochs} epochs"
" were monitored."
)
# Decrement epoch_counter with 1 since measurements for ultimate epoch
# was interrupted and is not accounted for.
self.epoch_counter -= 1
self._output_actual()
self._delete()