Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Intel Raptor Lake+ Big Little - E & P-core pinning #7

Open
Mirabis opened this issue Sep 30, 2024 · 3 comments
Open
Assignees
Labels
enhancement New feature or request

Comments

@Mirabis
Copy link

Mirabis commented Sep 30, 2024

Is your feature request related to a problem? Please describe.
I'm currently pinning certain LXCs to e-cores and others to p-cores depending on their workload. I'd like to start using this script but would love it if I can keep that functionality or even manage it through this script.

For example on my system:

13th Gen Intel(R) Core(TM) i5-13500 has cores 0-11 as P-cores and 12-19 as E-cores. I can thus pin an LXC to X-cores within an lxc conf file with:

[Performance Cores](lxc.cgroup2.cpuset.cpus: 0-11)
[Efficiency Cores](lxc.cgroup2.cpuset.cpus: 12-19)

Describe the solution you'd like
If that behaviour could be mimicked, or my existing tags "e-core" / "p-core" can be kept than another script will modify those files as is currently already done.

Describe alternatives you've considered
I already have a script I run periodically through cron to update the conf files based on PVE tags I have assigned to LXCs.

Additional context
My existing script below:

[root@pve scripts]$ cat tag-based-lxc-cpu-pinning.sh
#!/bin/bash

for conf_file in /etc/pve/lxc/[0-9]*.conf; do
    container_id=$(basename "$conf_file" .conf)
    tags_line=$(grep -oP '(?<=tags: ).*' "$conf_file")

    # Check if tags line exists and contains "p-core" or "e-core" or "core-{number}"
    if [ -n "$tags_line" ]; then

        if echo "$tags_line" | grep -oP "core-\K\d+"; then
          specific_core=$(echo "$tags_line" | grep -oP "core-\K\d+")
          cpus_range=$specific_core
        else
            if echo "$tags_line" | grep -q "p-core"; then
                cpus_range="0-11"
            else
                cpus_range=""
            fi

            if echo "$tags_line" | grep -q "e-core"; then
                if [ -n "$cpus_range" ]; then
                    cpus_range="0-19"
                else
                    cpus_range="12-19"
                fi
            fi
        fi

        # Update or create the lxc.cgroup2.cpuset.cpus value in the container's config file
        if [ -n "$cpus_range" ]; then
            if grep -q "^lxc.cgroup2.cpuset.cpus:" "$conf_file"; then
                sed -i "/^lxc.cgroup2.cpuset.cpus:/s/.*/lxc.cgroup2.cpuset.cpus: $cpus_range/" "$conf_file"
            else
                echo "lxc.cgroup2.cpuset.cpus: $cpus_range" >> "$conf_file"
            fi
            echo "Updated $container_id ($tags_line): lxc.cgroup2.cpuset.cpus=$cpus_range"
        fi
    fi
done
@fabriziosalmi
Copy link
Owner

I was thinking about cpu pinning and cpu units allocation management since the beginning. This will be so hard to me to implement quickly unfortunately... but of course, if you want to give a try, you more than welcome.
Of course if I wake up a morning with time and faith, I am sure this will be achieved but again.. not in a close future :(

@fabriziosalmi fabriziosalmi added enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed labels Oct 1, 2024
@fabriziosalmi
Copy link
Owner

⚠️ help wanted here, skilled contributor required to achieve this mission

@fabriziosalmi
Copy link
Owner

lxc_utils.py initial for your request.. i will try to create tests for such features and include in the main soon ;)

import json
import logging
import os
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from threading import Lock
from typing import Any, Dict, List, Optional, Tuple, Union

try:
    import paramiko
except ImportError:
    logging.error("Paramiko package not installed. SSH functionality disabled.")

from config import (BACKUP_DIR,  IGNORE_LXC, LOG_FILE,
                    LXC_TIER_ASSOCIATIONS, PROXMOX_HOSTNAME, config, get_config_value)

lock = Lock()

# Global variable to hold the SSH client
ssh_client: Optional[paramiko.SSHClient] = None

def get_ssh_client() -> Optional[paramiko.SSHClient]:
    """Get or create a new SSH client connection."""
    global ssh_client
    if ssh_client is None:
        logging.debug("Creating a new SSH connection...")
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            ssh.connect(
                hostname=config.get('DEFAULT', {}).get('proxmox_host'),
                port=config.get('DEFAULT', {}).get('ssh_port', 22),
                username=config.get('DEFAULT', {}).get('ssh_user'),
                password=config.get('DEFAULT', {}).get('ssh_password'),
                key_filename=config.get('DEFAULT', {}).get('ssh_key_path'),
                timeout=10
            )
            logging.info("SSH connection established successfully.")
            ssh_client = ssh
        except paramiko.SSHException as e:
            logging.error("SSH connection failed: %s", str(e))
            return None
        except Exception as e:
            logging.error("Unexpected error establishing SSH connection: %s", str(e))
            return None
    return ssh_client

def close_ssh_client() -> None:
    """Close the SSH client connection."""
    global ssh_client
    if ssh_client:
        logging.debug("Closing SSH connection...")
        ssh_client.close()
        ssh_client = None

def run_command(cmd: str, timeout: int = 30) -> Optional[str]:
    """Execute a command locally or remotely based on configuration.

    Args:
        cmd: The command to execute.
        timeout: Timeout in seconds for the command execution.

    Returns:
        The command output or None if the command failed.
    """
    use_remote_proxmox = config.get('DEFAULT', {}).get('use_remote_proxmox', False)
    logging.debug("Inside run_command: use_remote_proxmox = %s", use_remote_proxmox)
    return (run_remote_command if use_remote_proxmox else run_local_command)(cmd, timeout)


def run_local_command(cmd: str, timeout: int = 30) -> Optional[str]:
    """Execute a command locally with timeout.

    Args:
        cmd: The command to execute.
        timeout: Timeout in seconds for the command execution.

    Returns:
        The command output or None if the command failed.
    """
    try:
        result = subprocess.check_output(
            cmd, shell=True, timeout=timeout, stderr=subprocess.STDOUT,
        ).decode('utf-8').strip()
        logging.debug("Command '%s' executed successfully. Output: %s", cmd, result)
        return result
    except subprocess.TimeoutExpired:
        logging.error("Command '%s' timed out after %d seconds", cmd, timeout)
    except subprocess.CalledProcessError as e:
        logging.error("Command '%s' failed: %s", cmd, e.output.decode('utf-8'))
    except Exception as e:  # pylint: disable=broad-except
        logging.error("Unexpected error executing '%s': %s", cmd, str(e))
    return None


def run_remote_command(cmd: str, timeout: int = 30) -> Optional[str]:
    """Execute a command on a remote Proxmox host via SSH.

    Args:
        cmd: The command to execute.
        timeout: Timeout in seconds for the command execution.

    Returns:
        The command output or None if the command failed.
    """
    logging.debug("Running remote command: %s", cmd)
    ssh = get_ssh_client()
    if not ssh:
        return None
    try:
        _, stdout, _ = ssh.exec_command(cmd, timeout=timeout)
        output = stdout.read().decode('utf-8').strip()
        logging.debug("Remote command '%s' executed successfully: %s", cmd, output)
        return output
    except paramiko.SSHException as e:
        logging.error("SSH execution failed: %s", str(e))
    except Exception as e:  # pylint: disable=broad-except
        logging.error("Unexpected SSH error executing '%s': %s", cmd, str(e))
    return None

def get_proxmox_tags(ctid: str) -> str:
    """Get tags associated with an LXC container from Proxmox.

    Args:
        ctid: The container ID.

    Returns:
        The tags as a string, or an empty string if not found or an error occurs.
    """
    try:
      tags = run_command(f"pvesh get /lxc/{ctid}/config --output-format json | jq -r .tags")
      if tags:
          return tags.strip().strip('"')
    except Exception as e:
        logging.error(f"Failed to get tags for container {ctid}: {e}")
    return ""


def parse_cpu_pinning_from_tags(tags: str) -> str:
    """Parse tags and return the cpuset string.

    Args:
        tags: The tags string.

    Returns:
        The cpuset string or empty if no match found.
    """
    if not tags:
        return ""

    if core_match := re.search(r"core-(\d+)", tags):
        return core_match.group(1)
    elif "p-core" in tags:
        return "0-11" # Replace with actual P-core range
    elif "e-core" in tags:
        return "12-19"  # Replace with actual E-core range
    return ""


def set_cpu_pinning(ctid: str, cpu_range: str) -> None:
    """Set the CPU pinning for an LXC container.

    Args:
        ctid: The container ID.
        cpu_range: The CPU range to pin to.
    """
    if not cpu_range:
        logging.info(f"No CPU pinning required for container {ctid}")
        return

    conf_file = f"/etc/pve/lxc/{ctid}.conf"
    if os.path.exists(conf_file):
        if grep_result := run_command(f"grep '^lxc.cgroup2.cpuset.cpus:' {conf_file}"):
            run_command(f"sed -i '/^lxc.cgroup2.cpuset.cpus:/s/.*/lxc.cgroup2.cpuset.cpus: {cpu_range}/' {conf_file}")
            logging.info(f"Updated CPU pinning for container {ctid} to {cpu_range}")
        else:
            run_command(f'echo "lxc.cgroup2.cpuset.cpus: {cpu_range}" >> {conf_file}')
            logging.info(f"Set CPU pinning for container {ctid} to {cpu_range}")
    else:
        logging.error(f"Configuration file not found for container {ctid}: {conf_file}")


def get_containers() -> List[str]:
    """Return list of container IDs, excluding ignored ones.

    Returns:
        A list of container IDs.
    """
    containers = run_command("pct list | awk 'NR>1 {print $1}'")
    return [ctid for ctid in containers.splitlines() if ctid not in IGNORE_LXC] if containers else []


def is_container_running(ctid: str) -> bool:
    """Check if a container is running.

    Args:
        ctid: The container ID.

    Returns:
        True if the container is running, False otherwise.
    """
    status = run_command(f"pct status {ctid}")
    return bool(status and "status: running" in status.lower())


def backup_container_settings(ctid: str, settings: Dict[str, Any]) -> None:
    """Backup container configuration to JSON file.

    Args:
        ctid: The container ID.
        settings: The container settings to backup.
    """
    try:
        os.makedirs(BACKUP_DIR, exist_ok=True)
        backup_file = os.path.join(BACKUP_DIR, f"{ctid}_backup.json")
        with lock:
            with open(backup_file, 'w', encoding='utf-8') as f:
                json.dump(settings, f)
        logging.debug("Backup saved for container %s: %s", ctid, settings)
    except Exception as e:  # pylint: disable=broad-except
        logging.error("Failed to backup settings for %s: %s", ctid, str(e))


def load_backup_settings(ctid: str) -> Optional[Dict[str, Any]]:
    """Load container configuration from a backup JSON file.

    Args:
        ctid: The container ID.

    Returns:
        The loaded container settings, or None if no backup is found.
    """
    try:
        backup_file = os.path.join(BACKUP_DIR, f"{ctid}_backup.json")
        if os.path.exists(backup_file):
            with lock:
                with open(backup_file, 'r', encoding='utf-8') as f:
                    settings = json.load(f)
            logging.debug("Loaded backup for container %s: %s", ctid, settings)
            return settings
        logging.warning("No backup found for container %s", ctid)
        return None
    except Exception as e:  # pylint: disable=broad-except
        logging.error("Failed to load backup for %s: %s", ctid, str(e))
        return None


def rollback_container_settings(ctid: str) -> None:
    """Restore container settings from backup.

    Args:
        ctid: The container ID.
    """
    settings = load_backup_settings(ctid)
    if settings:
        logging.info("Rolling back container %s to backup settings", ctid)
        run_command(f"pct set {ctid} -cores {settings['cores']}")
        run_command(f"pct set {ctid} -memory {settings['memory']}")


def log_json_event(ctid: str, action: str, resource_change: str) -> None:
    """Log container change events in JSON format.

    Args:
        ctid: The container ID.
        action: The action that was performed.
        resource_change: Details of the resource change.
    """
    log_data = {
        "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        "proxmox_host": PROXMOX_HOSTNAME,
        "container_id": ctid,
        "action": action,
        "change": resource_change,
    }
    with lock:
        with open(LOG_FILE.replace('.log', '.json'), 'a', encoding='utf-8') as json_log_file:
            json_log_file.write(json.dumps(log_data) + '\n')


def get_total_cores() -> int:
    """Calculate available CPU cores after reserving percentage.

    Returns:
        The available number of CPU cores.
    """
    total_cores = int(run_command("nproc") or 0)
    reserved_cores = max(1, int(total_cores * int(get_config_value('DEFAULT', 'reserve_cpu_percent', 10)) / 100))
    available_cores = total_cores - reserved_cores
    logging.debug(
        "Total cores: %d, Reserved: %d, Available: %d",
        total_cores,
        reserved_cores,
        available_cores,
    )
    return available_cores


def get_total_memory() -> int:
    """Calculate available memory after reserving a fixed amount.

    Returns:
        The available memory in MB.
    """
    try:
        command_output = run_command("free -m | awk '/^Mem:/ {print $2}'")
        total_memory = int(command_output.strip()) if command_output else 0
    except (ValueError, subprocess.CalledProcessError) as e:
        logging.error("Failed to get total memory: %s", str(e))
        total_memory = 0

    available_memory = max(0, total_memory - int(get_config_value('DEFAULT', 'reserve_memory_mb', 2048)))
    logging.debug(
        "Total memory: %dMB, Reserved: %dMB, Available: %dMB",
        total_memory,
        int(get_config_value('DEFAULT', 'reserve_memory_mb', 2048)),
        available_memory,
    )
    return available_memory


def get_cpu_usage(ctid: str) -> float:
    """Get container CPU usage using multiple fallback methods.

    Args:
        ctid: The container ID.

    Returns:
        The CPU usage as a float percentage (0.0 - 100.0).
    """
    def run_cmd(command: str) -> str:
        """Execute a command and return its output or an empty string if failed.

        Args:
            command: Command to be executed.

        Returns:
            The output of the command.
        """
        try:
            result = subprocess.run(
                command, shell=True, capture_output=True, text=True, check=True
            )
            return result.stdout.strip()
        except subprocess.CalledProcessError as e:
            logging.warning("Command failed: %s, Error: %s", command, str(e))
            return ""

    def loadavg_method(ctid: str) -> float:
        """Calculate CPU usage using load average.

        Args:
            ctid: The container ID.

        Returns:
            The CPU usage as a float percentage.
        """
        try:
            loadavg = float(run_cmd(f"pct exec {ctid} -- cat /proc/loadavg").split()[0])
            num_cpus = int(run_cmd(f"pct exec {ctid} -- nproc"))
            if num_cpus == 0:
                raise ValueError("Number of CPUs is zero.")
            return round(min((loadavg / num_cpus) * 100, 100.0), 2)
        except Exception as e:  # pylint: disable=broad-except
            raise RuntimeError("Loadavg method failed: %s", str(e)) from e

    def load_method(ctid: str) -> float:
        """Calculate CPU usage using /proc/stat.

        Args:
            ctid: The container ID.

        Returns:
            The CPU usage as a float percentage.
        """
        try:
            cmd = f"pct exec {ctid} -- cat /proc/stat | grep '^cpu '"
            initial_times = list(map(float, run_cmd(cmd).split()[1:]))
            initial_total = sum(initial_times)
            initial_idle = initial_times[3]

            time.sleep(1)

            new_times = list(map(float, run_cmd(cmd).split()[1:]))
            new_total = sum(new_times)
            new_idle = new_times[3]

            total_diff = new_total - initial_total
            idle_diff = new_idle - initial_idle

            if total_diff == 0:
                raise ValueError("Total CPU time did not change.")

            return round(
                max(min(100.0 * (total_diff - idle_diff) / total_diff, 100.0), 0.0), 2
            )
        except Exception as e:  # pylint: disable=broad-except
            raise RuntimeError("Load method failed: %s", str(e)) from e

    methods: List[Tuple[str, Any]] = [
        ("Load Average", loadavg_method),
        ("Load", load_method),
    ]

    for method_name, method in methods:
        try:
            cpu = method(ctid)
            if cpu is not None and cpu >= 0.0:
                logging.info("CPU usage for %s using %s: %s%%", ctid, method_name, cpu)
                return cpu
        except Exception as e:  # pylint: disable=broad-except
            logging.warning("%s failed for %s: %s", method_name, ctid, str(e))

    logging.error("All CPU usage methods failed for %s. Using 0.0", ctid)
    return 0.0


def get_memory_usage(ctid: str) -> float:
    """Get container memory usage percentage.

    Args:
        ctid: The container ID.

    Returns:
        The memory usage as a float percentage (0.0 - 100.0).
    """
    mem_info = run_command(
        f"pct exec {ctid} -- awk '/MemTotal/ {{t=$2}} /MemAvailable/ {{a=$2}} "
        f"END {{print t, t-a}}' /proc/meminfo"
    )
    if mem_info:
        try:
            total, used = map(int, mem_info.split())
            return (used * 100) / total
        except ValueError:
            logging.error("Failed to parse memory info for %s: '%s'", ctid, mem_info)
    logging.error("Failed to get memory usage for %s", ctid)
    return 0.0


def is_ignored(ctid: str) -> bool:
    """Check if a container is in the ignore list.

    Args:
        ctid: The container ID.

    Returns:
        True if the container is in the ignore list, False otherwise.
    """
    return str(ctid) in IGNORE_LXC


def get_container_data(ctid: str) -> Optional[Dict[str, Any]]:
    """Collect container resource usage data.

    Args:
        ctid: The container ID.

    Returns:
        A dictionary containing container resource data or None if not available.
    """
    if is_ignored(ctid) or not is_container_running(ctid):
        return None

    logging.debug("Collecting data for container %s", ctid)
    try:
        # Fetch and apply cpu pinning
        tags = get_proxmox_tags(ctid)
        cpu_range = parse_cpu_pinning_from_tags(tags)
        set_cpu_pinning(ctid, cpu_range)

        cores = int(run_command(f"pct config {ctid} | grep cores | awk '{{print $2}}'") or 0)
        memory = int(run_command(f"pct config {ctid} | grep memory | awk '{{print $2}}'") or 0)
        settings = {"cores": cores, "memory": memory}
        backup_container_settings(ctid, settings)
        return {
            "cpu": get_cpu_usage(ctid),
            "mem": get_memory_usage(ctid),
            "initial_cores": cores,
            "initial_memory": memory,
        }
    except Exception as e:  # pylint: disable=broad-except
        logging.error("Error collecting data for %s: %s", ctid, str(e))
        return None


def collect_container_data() -> Dict[str, Dict[str, Any]]:
    """Collect data from all containers in parallel.

    Returns:
        A dictionary containing container resource data for all containers.
    """
    containers: Dict[str, Dict[str, Any]] = {}
    with ThreadPoolExecutor(max_workers=2) as executor:
        future_to_ctid = {
            executor.submit(get_container_data, ctid): ctid
            for ctid in get_containers()
        }
        for future in as_completed(future_to_ctid):
            ctid = future_to_ctid[future]
            try:
                data = future.result()
                if data:
                    containers[ctid] = data
                    logging.debug("Container %s data: %s", ctid, data)
            except Exception as e:  # pylint: disable=broad-except
                logging.error("Error retrieving data for %s: %s", ctid, str(e))
    return containers


def prioritize_containers(containers: Dict[str, Dict[str, Any]]) -> List[Tuple[str, Dict[str, Any]]]:
    """Sort containers by resource usage priority.

    Args:
        containers: A dictionary of container resource data.

    Returns:
        A sorted list of container IDs and their data.
    """
    if not containers:
        logging.info("No containers to prioritize.")
        return []

    try:
        priorities = sorted(
            containers.items(),
            key=lambda item: (item[1]['cpu'], item[1]['mem']),
            reverse=True,
        )
        logging.debug("Container priorities: %s", priorities)
        return priorities
    except Exception as e:  # pylint: disable=broad-except
        logging.error("Error prioritizing containers: %s", str(e))
        return []


def get_container_config(ctid: str) -> Dict[str, Any]:
    """Get container tier configuration.

    Args:
        ctid: The container ID.

    Returns:
        The container's tier configuration.
    """
    return LXC_TIER_ASSOCIATIONS.get(ctid, config)


def generate_unique_snapshot_name(base_name: str) -> str:
    """Generate timestamped snapshot name.

    Args:
        base_name: Base name for the snapshot.

    Returns:
        A unique snapshot name.
    """
    return f"{base_name}-{datetime.now().strftime('%Y%m%d%H%M%S')}"


def generate_cloned_hostname(base_name: str, clone_number: int) -> str:
    """Generate unique hostname for cloned container.

    Args:
        base_name: Base name for the cloned container.
        clone_number: The clone number.

    Returns:
        A unique hostname for the cloned container.
    """
    return f"{base_name}-cloned-{clone_number}"

import atexit
import re
atexit.register(close_ssh_client)

@fabriziosalmi fabriziosalmi removed help wanted Extra attention is needed good first issue Good for newcomers labels Jan 16, 2025
@fabriziosalmi fabriziosalmi self-assigned this Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants