Log Parsing

Carbontracker contains utilities for parsing and interacting with the generated log files for futher analysis and reporting.

aggregate_consumption(log_dir)

Aggregate consumption in all log files in specified log_dir.

Parameters:
  • log_dir (str) –

    Directory of logs

Returns:
  • total_energy( float ) –

    Total energy (kWh) of all logs

  • total_co2( float ) –

    Total CO2eq (gCO2eq) of all logs

  • total_equivalents( float ) –

    Total energy of all logs

Source code in carbontracker/parser.py
def aggregate_consumption(log_dir):
    """
    Aggregate consumption in all log files in specified log_dir.

    Args:
        log_dir (str): Directory of logs

    Returns:
        total_energy (float): Total energy (kWh) of all logs
        total_co2 (float): Total CO2eq (gCO2eq) of all logs
        total_equivalents (float): Total energy of all logs
    """
    output_logs, std_logs = get_all_logs(log_dir=log_dir)

    total_energy = 0
    total_co2eq = 0
    total_equivalents = {}

    for output_log, std_log in zip(output_logs, std_logs):
        with open(output_log, "r") as f:
            output_data = f.read()
        with open(std_log, "r") as f:
            std_data = f.read()

        actual, pred = get_consumption(output_data)
        early_stop = get_early_stop(std_data)

        if actual is None and pred is None:
            continue
        elif actual is None and pred is not None:
            energy = pred["energy (kWh)"]
            co2eq = pred["co2eq (g)"]
            equivalents = pred["equivalents"]
        elif pred is None and actual is not None:
            energy = actual["energy (kWh)"]
            co2eq = actual["co2eq (g)"]
            equivalents = actual["equivalents"]
        # Both actual and pred is available
        elif pred is not None and actual is not None:
            actual_epochs = actual["epochs"]
            pred_epochs = pred["epochs"]
            if early_stop or actual_epochs == pred_epochs:
                energy = actual["energy (kWh)"]
                co2eq = actual["co2eq (g)"]
                equivalents = actual["equivalents"]
            else:
                energy = pred["energy (kWh)"]
                co2eq = pred["co2eq (g)"]
                equivalents = pred["equivalents"]
        else:
            continue  # unreachable case

        total_energy += energy
        if not np.isnan(co2eq):
            total_co2eq += co2eq
        if equivalents is not None:
            for key, value in equivalents.items():
                total_equivalents[key] = total_equivalents.get(key, 0) + value

    return total_energy, total_co2eq, total_equivalents

get_all_logs(log_dir)

Get all output and standard logs in log_dir.

Parameters:
  • log_dir (str) –

    Directory of logs

Returns:
  • std_logs( list[str] ) –

    List of file names of standard logs

  • output_logs( list[str] ) –

    List of file names of output logs

Raises:
  • MismatchedLogFilesError

    Thrown if there exists standard log files that cannot be matched with an output log file or vice versa.

Source code in carbontracker/parser.py
def get_all_logs(log_dir):
    """
    Get all output and standard logs in log_dir.

    Args:
        log_dir (str): Directory of logs

    Returns:
        std_logs (list[str]): List of file names of standard logs
        output_logs (list[str]): List of file names of output logs

    Raises:
        MismatchedLogFilesError: Thrown if there exists standard log files that cannot be matched with an output log file or vice versa.
    """
    files = [
        os.path.join(log_dir, f)
        for f in os.listdir(log_dir)
        if os.path.isfile(os.path.join(log_dir, f))
        and os.path.getsize(os.path.join(log_dir, f)) > 0
    ]
    output_re = re.compile(r".*carbontracker_output.log")
    std_re = re.compile(r".*carbontracker.log")
    output_logs = sorted(list(filter(output_re.match, files)))
    std_logs = sorted(list(filter(std_re.match, files)))
    if len(output_logs) != len(std_logs):
        # Try to remove the files with no matching output/std logs
        op_fn = [f.split("_carbontracker")[0] for f in output_logs]
        std_fn = [f.split("_carbontracker")[0] for f in std_logs]
        if len(std_logs) > len(output_logs):
            missing_logs = list(set(std_fn) - set(op_fn))
            [std_logs.remove(f + "_carbontracker.log") for f in missing_logs]
        else:
            missing_logs = list(set(op_fn) - set(std_fn))
            [output_logs.remove(f + "_carbontracker_output.log") for f in missing_logs]
        ### Even after removal if then there is a mismatch, then throw the error
        if len(output_logs) != len(std_logs):
            raise exceptions.MismatchedLogFilesError(
                f"Found {len(output_logs)} output logs and {len(std_logs)} "
                "standard logs. Expected equal number of logs."
            )
    return output_logs, std_logs

get_avg_power_usages(std_log_data)

Retrieve average power usages for each epoch (W).

Parameters:
  • std_log_data (str) –

    Log to parse

Returns:
  • dict

    Dictionary containing list of average power usages for each epoch per component. Has shape: { [component name]: list[list[float]] }

Source code in carbontracker/parser.py
def get_avg_power_usages(std_log_data):
    """
    Retrieve average power usages for each epoch (W).

    Args:
        std_log_data (str): Log to parse

    Returns:
        (dict): Dictionary containing list of average power usages for each epoch per component. Has shape:
                {
                        [component name]: list[list[float]]
                }
    """
    power_re = re.compile(r"Average power usage \(W\) for (.+): (\[?[0-9\.]+\]?|None)")
    matches = re.findall(power_re, std_log_data)
    components = list(set([comp for comp, _ in matches]))
    avg_power_usages = {}

    for component in components:
        powers: list[list[float]] = []
        for comp, power in matches:
            if comp == component:
                if power == "None":
                    powers.append([0.0])
                    continue
                else:
                    p_list = power.strip("[").strip("]").split(" ")
                    p_power = [float(num) for num in p_list if num != ""]
                    powers.append(p_power)
        avg_power_usages[component] = powers

    return avg_power_usages

get_consumption(output_log_data)

Gets actual and predicted energy consumption, CO2eq and equivalence statements from output_log_data using regular expressions.

Parameters:
  • output_log_data (str) –

    Log data to search through.

Returns:
  • actual( dict | None ) –

    Actual consumption

  • pred( dict | None ) –

    Predicted consumption

    Both actual and pred has the shape:

    {
        "epochs": int,
        "duration (s)": int,
        "energy (kWh)": float | None,
        "co2eq (g)": float | None,
        "equivalents": equivalents,
    }
    
Source code in carbontracker/parser.py
def get_consumption(output_log_data: str):
    """
    Gets actual and predicted energy consumption, CO2eq and equivalence statements from output_log_data using regular expressions.

    Args:
        output_log_data (str): Log data to search through.

    Returns:
        actual (dict | None): Actual consumption

        pred (dict | None): Predicted consumption

            Both `actual` and `pred` has the shape:

                {
                    "epochs": int,
                    "duration (s)": int,
                    "energy (kWh)": float | None,
                    "co2eq (g)": float | None,
                    "equivalents": equivalents,
                }
    """
    actual_re = re.compile(
        r"(?i)Actual consumption"
        r"(?:\s*for\s+\d+\s+epochs)?"
        r"[\s\S]*?Time:\s*(.*)\n\s*Energy:\s*(.*)\s+kWh"
        r"[\s\S]*?CO2eq:\s*(.*)\s+g"
        r"(?:\s*This is equivalent to:\s*([\s\S]*?))?(?=\d{4}-\d{2}-\d{2}|\Z)"
    )
    pred_re = re.compile(
        r"(?i)Predicted consumption for (\d*) epoch\(s\):"
        r"[\s\S]*?Time:\s*(.*)\n\s*Energy:\s*(.*)\s+kWh"
        r"[\s\S]*?CO2eq:\s*(.*)\s+g"
        r"(?:\s*This is equivalent to:\s*([\s\S]*?))?(?=\d{4}-\d{2}-\d{2}|\Z)"
    )
    actual_match = re.search(actual_re, output_log_data)
    pred_match = re.search(pred_re, output_log_data)
    actual = extract_measurements(actual_match)
    pred = extract_measurements(pred_match)
    return actual, pred

get_devices(std_log_data)

Retrieve dictionary of components with their device(s).

Parameters:
  • std_log_data (str) –

    Log data to parse

Returns:
  • dict

    Dictionary with devices per component of shape

    {
            [component]: ["device1", "device2"]
    }
    

    Where [component] is the component name and "device1", "device2" are device names.

Source code in carbontracker/parser.py
def get_devices(std_log_data: str) -> Dict[str, List[str]]:
    """
    Retrieve dictionary of components with their device(s).

    Args:
        std_log_data (str): Log data to parse

    Returns:
        (dict): Dictionary with devices per component of shape

                {
                        [component]: ["device1", "device2"]
                }

            Where `[component]` is the component name and `"device1"`, `"device2"` are device names.
    """
    comp_re = re.compile(r"The following components were found:(.*)\n")
    device_re = re.compile(r" (.*?) with device\(s\) (.*?)\.")
    # Take first match as we only expect one.
    match = re.findall(comp_re, std_log_data)
    if not match:
        return {}
    device_matches = re.findall(device_re, match[0])
    devices = {}

    for comp, device_str in device_matches:
        dev = device_str.split(",")
        devices[comp.lower()] = dev

    return devices

get_epoch_durations(std_log_data)

Retrieve epoch durations (s).

Parameters:
  • std_log_data (str) –

    Log to parse

Returns:
  • list[float]

    List of epoch durations (s)

Source code in carbontracker/parser.py
def get_epoch_durations(std_log_data):
    """
    Retrieve epoch durations (s).

    Args:
        std_log_data (str): Log to parse

    Returns:
        (list[float]): List of epoch durations (s)
    """
    duration_re = re.compile(r"Duration: (\d+):(\d{2}):(\d\d?(?:.\d{2})?)")
    matches = re.findall(duration_re, std_log_data)
    epoch_durations = [
        float(h) * 60 * 60 + float(m) * 60 + float(s) for h, m, s in matches
    ]
    return epoch_durations

get_most_recent_logs(log_dir)

Retrieve the file names of the most recent standard and output logs.

Parameters:
  • log_dir (str) –

    Directory of logs

Returns:
  • std_log( str ) –

    File name of latest standard log

  • output_log( str ) –

    File name of latest output log

Source code in carbontracker/parser.py
def get_most_recent_logs(log_dir):
    """
    Retrieve the file names of the most recent standard and output logs.

    Args:
        log_dir (str): Directory of logs

    Returns:
        std_log (str): File name of latest standard log
        output_log (str): File name of latest output log
    """
    # Get all files in log_dir.
    files = [
        os.path.join(log_dir, f)
        for f in os.listdir(log_dir)
        if os.path.isfile(os.path.join(log_dir, f))
    ]
    # Find output and standard logs and sort by modified date.
    output_re = re.compile(r".*carbontracker_output.log")
    std_re = re.compile(r".*carbontracker.log")
    output_logs = list(filter(output_re.match, files))
    std_logs = list(filter(std_re.match, files))
    output_logs.sort(key=os.path.getmtime)
    std_logs.sort(key=os.path.getmtime)

    return std_logs[-1], output_logs[-1]

parse_all_logs(log_dir)

Parse all logs in directory.

Parameters:
  • log_dir (str) –

    Directory of logs

Returns:
  • dict[]

    List of log entries of shape

    {
        "output_filename": str,
        "standard_filename": str,
        "components": dict, # See parse_logs
        "early_stop": bool,
        "actual": dict | None, # See get_consumption
        "pred": dict | None, # See get_consumption
    }
    
Source code in carbontracker/parser.py
def parse_all_logs(log_dir):
    """
    Parse all logs in directory.

    Args:
        log_dir (str): Directory of logs

    Returns:
        (dict[]): List of log entries of shape

                {
                    "output_filename": str,
                    "standard_filename": str,
                    "components": dict, # See parse_logs
                    "early_stop": bool,
                    "actual": dict | None, # See get_consumption
                    "pred": dict | None, # See get_consumption
                }
    """
    logs = []
    output_logs, std_logs = get_all_logs(log_dir)

    for out, std in zip(output_logs, std_logs):
        with open(std, "r") as f:
            std_log_data = f.read()

        with open(out, "r") as f:
            output_log_data = f.read()

        actual, pred = get_consumption(output_log_data)
        early_stop = get_early_stop(std_log_data)
        entry = {
            "output_filename": out,
            "standard_filename": std,
            "components": parse_logs(log_dir, std, out),
            "early_stop": early_stop,
            "actual": actual,
            "pred": pred,
        }
        logs.append(entry)

    return logs

parse_logs(log_dir, std_log_file=None, output_log_file=None)

Parse logs in log_dir (defaults to most recent logs).

Parameters:
  • log_dir (str) –

    Directory of logs

  • std_log_file (str, default: None ) –

    Log file to read. Defaults to most recent logs.

  • output_log_file (str, default: None ) –

    Deprecated

Returns:
  • dict

    Dictionary of shape

    {
            [component name]: {
                "avg_power_usages (W)": NDArray | None,
                "avg_energy_usages (J)": NDArray | None,
                "epoch_durations (s)": NDArray | None,
                "devices": str[],
            }
    }
    

    where [component name] is either "gpu" or "cpu". Return value can contain both "gpu" and "cpu" field.

Source code in carbontracker/parser.py
def parse_logs(log_dir, std_log_file=None, output_log_file=None):
    """
    Parse logs in log_dir (defaults to most recent logs).

    Args:
        log_dir (str): Directory of logs
        std_log_file (str, optional): Log file to read. Defaults to most recent logs.
        output_log_file (str, optional): Deprecated

    Returns:
        (dict): Dictionary of shape

                {
                        [component name]: {
                            "avg_power_usages (W)": NDArray | None,
                            "avg_energy_usages (J)": NDArray | None,
                            "epoch_durations (s)": NDArray | None,
                            "devices": str[],
                        }
                }

            where `[component name]` is either `"gpu"` or `"cpu"`.
            Return value can contain both `"gpu"` and `"cpu"` field.
    """
    if std_log_file is None or output_log_file is None:
        std_log_file, output_log_file = get_most_recent_logs(log_dir)

    with open(std_log_file, "r") as f:
        std_log_data = f.read()

    epoch_durations = get_epoch_durations(std_log_data)
    avg_power_usages = get_avg_power_usages(std_log_data)
    devices = get_devices(std_log_data)

    components = {}
    for comp, devices in devices.items():
        power_usages = (
            np.array(avg_power_usages[comp]) if len(avg_power_usages) != 0 else None
        )
        durations = np.array(epoch_durations) if len(epoch_durations) != 0 else None
        if power_usages is None or durations is None:
            energy_usages = None
        else:
            if power_usages.size != durations.size:
                raise exceptions.MismatchedEpochsError(
                    f"Found {power_usages.size} power measurements and {durations.size} duration measurements. "
                    "Expected equal number of measurements."
                )
            energy_usages = (power_usages.T * durations).T
        measurements = {
            "avg_power_usages (W)": power_usages,
            "avg_energy_usages (J)": energy_usages,
            "epoch_durations (s)": durations,
            "devices": devices,
        }
        components[comp] = measurements

    return components

print_aggregate(log_dir)

Prints the aggregate consumption in all log files in log_dir to stdout. See get_aggregate.

Parameters:
  • log_dir (str) –

    Directory of logs

Source code in carbontracker/parser.py
def print_aggregate(log_dir):
    """
    Prints the aggregate consumption in all log files in log_dir to stdout. See `get_aggregate`.

    Args:
        log_dir (str): Directory of logs
    """
    energy, co2eq, equivalents = aggregate_consumption(log_dir)

    equivalents_p = " or ".join([f"{v:.16f} {k}" for k, v in equivalents.items()])

    printable = f"The training of models in this work is estimated to use {energy:.16f} kWh of electricity contributing to {co2eq / 1000:.16f} kg of CO2eq. "
    if equivalents_p:
        printable += f"This is equivalent to {equivalents_p}. "

    printable += "Measured by carbontracker (https://github.com/lfwa/carbontracker)."

    print(printable)