CarbonTracker
The CarbonTracker class is the main interface for starting, stopping and reporting through carbontracker.
Parameters: |
|
---|
Example
Tracking the carbon intensity of PyTorch model training:
from carbontracker.tracker import CarbonTracker
tracker = CarbonTracker(epochs=max_epochs)
# Training loop.
for epoch in range(max_epochs):
tracker.epoch_start()
# Your model training.
tracker.epoch_end()
# Optional: Add a stop in case of early termination before all monitor_epochs has
# been monitored to ensure that actual consumption is reported.
tracker.stop()
Source code in carbontracker/tracker.py
class CarbonTracker:
"""
The CarbonTracker class is the main interface for starting, stopping and reporting through **carbontracker**.
Args:
epochs (int): Total epochs of your training loop.
api_keys (dict, optional): Dictionary of Carbon Intensity API keys following the {name:key} format. Can also be set using `CarbonTracker.set_api_keys`
Example: `{ \\"electricitymaps\\": \\"abcdefg\\" }`
epochs_before_pred (int, optional): Epochs to monitor before outputting predicted consumption. Set to -1 for all epochs. Set to 0 for no prediction.
monitor_epochs (int, optional): Total number of epochs to monitor. Outputs actual consumption when reached. Set to -1 for all epochs. Cannot be less than `epochs_before_pred` or equal to 0.
update_interval (int, optional): Interval in seconds between power usage measurements are taken by sleeper thread.
interpretable (bool, optional): If set to `True` then the CO2eq are also converted to interpretable numbers such as the equivalent distance travelled in a car, etc. Otherwise, no conversions are done.
stop_and_confirm (bool, optional): If set to `True` then the main thread (with your training loop) is paused after epochs_before_pred epochs to output the prediction and the user will need to confirm to continue training. Otherwise, prediction is output and training is continued instantly.
ignore_errors (bool, optional): If set to `True` then all errors will cause energy monitoring to be stopped and training will continue. Otherwise, training will be interrupted as with regular errors.
components (str, optional): Comma-separated string of which components to monitor. Options are: `"all"`, `"gpu"`, `"cpu"`, or `"gpu,cpu"`.
devices_by_pid (bool, optional): If `True`, only devices (under the chosen components) running processes associated with the main process are measured. If False, all available devices are measured. Note that this requires your devices to have active processes before instantiating the CarbonTracker class.
log_dir (str, optional): Path to the desired directory to write log files. If `None`, then no logging will be done.
log_file_prefix (str, optional): Prefix to add to the log file name.
verbose (int, optional): Sets the level of verbosity.
decimal_precision (int, optional): Desired decimal precision of reported values.
Example:
Tracking the carbon intensity of PyTorch model training:
from carbontracker.tracker import CarbonTracker
tracker = CarbonTracker(epochs=max_epochs)
# Training loop.
for epoch in range(max_epochs):
tracker.epoch_start()
# Your model training.
tracker.epoch_end()
# Optional: Add a stop in case of early termination before all monitor_epochs has
# been monitored to ensure that actual consumption is reported.
tracker.stop()
"""
def __init__(
self,
epochs,
epochs_before_pred=1,
monitor_epochs=-1,
update_interval=1,
interpretable=True,
stop_and_confirm=False,
ignore_errors=False,
components="all",
devices_by_pid=False,
log_dir=None,
log_file_prefix="",
verbose=1,
decimal_precision=12,
api_keys=None,
):
if api_keys is not None:
self.set_api_keys(api_keys)
self.epochs = epochs
self.epochs_before_pred = (
epochs if epochs_before_pred < 0 else epochs_before_pred
)
self.monitor_epochs = epochs if monitor_epochs < 0 else monitor_epochs
if self.monitor_epochs == 0 or self.monitor_epochs < self.epochs_before_pred:
raise ValueError(
"Argument monitor_epochs expected a value in "
f"{{-1, >0, >=epochs_before_pred}}, got {monitor_epochs}."
)
self.interpretable = interpretable
self.stop_and_confirm = stop_and_confirm
self.ignore_errors = ignore_errors
self.epoch_counter = 0
self.decimal_precision = decimal_precision
self.deleted = False
try:
pids = self._get_pids()
self.logger = loggerutil.Logger(
log_dir=log_dir,
verbose=verbose,
log_prefix=log_file_prefix,
logger_id=str(randint(1, 999999)),
)
self.tracker = CarbonTrackerThread(
delete=self._delete,
components=component.create_components(
components=components, pids=pids, devices_by_pid=devices_by_pid, logger=self.logger
),
logger=self.logger,
ignore_errors=ignore_errors,
update_interval=update_interval,
)
self.intensity_stopper = Event()
self.intensity_updater = CarbonIntensityThread(
self.logger, self.intensity_stopper
)
except Exception as e:
self._handle_error(e)
def epoch_start(self):
"""
Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_start()
self.epoch_counter += 1
except Exception as e:
self._handle_error(e)
def epoch_end(self):
"""
Ends tracking energy consumption for current epoch. Call in the end of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_end()
if self.epoch_counter == self.epochs_before_pred:
self._output_pred()
if self.stop_and_confirm:
self._user_query()
if self.epoch_counter == self.monitor_epochs:
self._output_actual()
if self.epoch_counter == self.monitor_epochs:
self._delete()
except Exception as e:
self._handle_error(e)
def stop(self):
"""Ensure that tracker is stopped and deleted. E.g. use with early
stopping, where not all monitor_epochs have been run."""
if self.deleted:
return
self.logger.info(
f"Training was interrupted before all {self.monitor_epochs} epochs"
" were monitored."
)
# Decrement epoch_counter with 1 since measurements for ultimate epoch
# was interrupted and is not accounted for.
self.epoch_counter -= 1
self._output_actual()
self._delete()
def set_api_keys(self, api_dict):
"""Set API keys (given as {name:key}) for carbon intensity fetchers."""
try:
for name, key in api_dict.items():
if name.lower() == "electricitymaps":
electricitymaps.ElectricityMap.set_api_key(key)
else:
raise exceptions.FetcherNameError(
f"Invalid API name '{name}' given."
)
except Exception as e:
self._handle_error(e)
def _handle_error(self, error):
err_str = traceback.format_exc()
if self.ignore_errors:
err_str = (
f"Ignored error: {err_str}Continued training without " "monitoring..."
)
self.logger.err_critical(err_str)
self.logger.output(err_str)
if self.ignore_errors:
# Stop monitoring but continue training.
self._delete()
else:
sys.exit(70)
def _output_energy(self, description, time, energy, co2eq, conversions):
precision = self.decimal_precision
output = (
f"\n{description}\n"
f"\tTime:\t{loggerutil.convert_to_timestring(time)}\n"
f"\tEnergy:\t{energy:.{precision}f} kWh\n"
f"\tCO2eq:\t{co2eq:.{precision}f} g"
)
if conversions:
conv_str = "\n\tThis is equivalent to:"
for units, unit in conversions:
conv_str += f"\n\t{units:.{precision}f} {unit}"
output += conv_str
self.logger.output(output, verbose_level=1)
def _output_actual(self):
"""Output actual usage so far."""
energy_usages = self.tracker.total_energy_per_epoch()
energy = energy_usages.sum()
times = self.tracker.epoch_times
time = np.sum(times)
_co2eq = self._co2eq(energy)
conversions = co2eq.convert(_co2eq) if self.interpretable else None
if self.epochs_before_pred == 0:
self._output_energy(
"Actual consumption:", time, energy, _co2eq, conversions
)
else:
self._output_energy(
f"Actual consumption for {self.epoch_counter} epoch(s):",
time,
energy,
_co2eq,
conversions,
)
def _output_pred(self):
"""Output predicted usage for full training epochs."""
epoch_energy_usages = self.tracker.total_energy_per_epoch()
epoch_times = self.tracker.epoch_times
pred_energy = predictor.predict_energy(self.epochs, epoch_energy_usages)
pred_time = predictor.predict_time(self.epochs, epoch_times)
pred_co2eq = self._co2eq(pred_energy, pred_time)
conversions = co2eq.convert(pred_co2eq) if self.interpretable else None
self._output_energy(
f"Predicted consumption for {self.epochs} epoch(s):",
pred_time,
pred_energy,
pred_co2eq,
conversions,
)
def _co2eq(self, energy_usage, pred_time_dur=None):
""" "Returns the CO2eq (g) of the energy usage (kWh)."""
if pred_time_dur:
ci = self.intensity_updater.predict_carbon_intensity(pred_time_dur)
else:
ci = self.intensity_updater.average_carbon_intensity()
co2eq = energy_usage * ci.carbon_intensity
return co2eq
def _user_query(self):
self.logger.output("Continue training (y/n)?")
user_input = input().lower()
self._check_input(user_input)
def _check_input(self, user_input: str):
if user_input == "y":
self.logger.output("Continuing...")
return
elif user_input == "n":
self.logger.info("Session ended by user.")
self.logger.output("Quitting...")
sys.exit(0)
else:
self.logger.output("Input not recognized. Try again (y/n):")
user_input = input().lower()
self._check_input(user_input)
def _delete(self):
self.tracker.stop()
self.intensity_stopper.set()
del self.logger
del self.tracker
del self.intensity_updater
del self.intensity_stopper
self.deleted = True
def _get_pids(self) -> List[int]:
"""Get current process id and all children process ids."""
process = psutil.Process()
pids = [process.pid] + [child.pid for child in process.children(recursive=True)]
return pids
_co2eq(energy_usage, pred_time_dur=None)
"Returns the CO2eq (g) of the energy usage (kWh).
Source code in carbontracker/tracker.py
def _co2eq(self, energy_usage, pred_time_dur=None):
""" "Returns the CO2eq (g) of the energy usage (kWh)."""
if pred_time_dur:
ci = self.intensity_updater.predict_carbon_intensity(pred_time_dur)
else:
ci = self.intensity_updater.average_carbon_intensity()
co2eq = energy_usage * ci.carbon_intensity
return co2eq
_get_pids()
Get current process id and all children process ids.
Source code in carbontracker/tracker.py
def _get_pids(self) -> List[int]:
"""Get current process id and all children process ids."""
process = psutil.Process()
pids = [process.pid] + [child.pid for child in process.children(recursive=True)]
return pids
_output_actual()
Output actual usage so far.
Source code in carbontracker/tracker.py
def _output_actual(self):
"""Output actual usage so far."""
energy_usages = self.tracker.total_energy_per_epoch()
energy = energy_usages.sum()
times = self.tracker.epoch_times
time = np.sum(times)
_co2eq = self._co2eq(energy)
conversions = co2eq.convert(_co2eq) if self.interpretable else None
if self.epochs_before_pred == 0:
self._output_energy(
"Actual consumption:", time, energy, _co2eq, conversions
)
else:
self._output_energy(
f"Actual consumption for {self.epoch_counter} epoch(s):",
time,
energy,
_co2eq,
conversions,
)
_output_pred()
Output predicted usage for full training epochs.
Source code in carbontracker/tracker.py
def _output_pred(self):
"""Output predicted usage for full training epochs."""
epoch_energy_usages = self.tracker.total_energy_per_epoch()
epoch_times = self.tracker.epoch_times
pred_energy = predictor.predict_energy(self.epochs, epoch_energy_usages)
pred_time = predictor.predict_time(self.epochs, epoch_times)
pred_co2eq = self._co2eq(pred_energy, pred_time)
conversions = co2eq.convert(pred_co2eq) if self.interpretable else None
self._output_energy(
f"Predicted consumption for {self.epochs} epoch(s):",
pred_time,
pred_energy,
pred_co2eq,
conversions,
)
epoch_end()
Ends tracking energy consumption for current epoch. Call in the end of training loop.
Source code in carbontracker/tracker.py
def epoch_end(self):
"""
Ends tracking energy consumption for current epoch. Call in the end of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_end()
if self.epoch_counter == self.epochs_before_pred:
self._output_pred()
if self.stop_and_confirm:
self._user_query()
if self.epoch_counter == self.monitor_epochs:
self._output_actual()
if self.epoch_counter == self.monitor_epochs:
self._delete()
except Exception as e:
self._handle_error(e)
epoch_start()
Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
Source code in carbontracker/tracker.py
def epoch_start(self):
"""
Starts tracking energy consumption for current epoch. Call in the beginning of training loop.
"""
if self.deleted:
return
try:
self.tracker.epoch_start()
self.epoch_counter += 1
except Exception as e:
self._handle_error(e)
set_api_keys(api_dict)
Set API keys (given as {name:key}) for carbon intensity fetchers.
Source code in carbontracker/tracker.py
def set_api_keys(self, api_dict):
"""Set API keys (given as {name:key}) for carbon intensity fetchers."""
try:
for name, key in api_dict.items():
if name.lower() == "electricitymaps":
electricitymaps.ElectricityMap.set_api_key(key)
else:
raise exceptions.FetcherNameError(
f"Invalid API name '{name}' given."
)
except Exception as e:
self._handle_error(e)
stop()
Ensure that tracker is stopped and deleted. E.g. use with early stopping, where not all monitor_epochs have been run.
Source code in carbontracker/tracker.py
def stop(self):
"""Ensure that tracker is stopped and deleted. E.g. use with early
stopping, where not all monitor_epochs have been run."""
if self.deleted:
return
self.logger.info(
f"Training was interrupted before all {self.monitor_epochs} epochs"
" were monitored."
)
# Decrement epoch_counter with 1 since measurements for ultimate epoch
# was interrupted and is not accounted for.
self.epoch_counter -= 1
self._output_actual()
self._delete()