Files
testarena_backend/testarena_app/worker.py
2025-12-28 03:46:11 +01:00

208 lines
8.3 KiB
Python

import time
import subprocess
import json
import os
from sqlalchemy.orm import Session
from . import models, database
# Base directory for data
BASE_DATA_DIR = "/home/asf/testarena"
if os.name == 'nt':
BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data"
def update_json_status(queue_id, task_id, status, result=None):
queue_dir = os.path.join(BASE_DATA_DIR, queue_id)
status_file = os.path.join(queue_dir, "queue_status.json")
if os.path.exists(status_file):
with open(status_file, 'r') as f:
data = json.load(f)
if task_id:
data["tasks"][task_id] = status
else:
data["status"] = status
if result:
data["results"] = data.get("results", {})
data["results"][task_id] = result
with open(status_file, 'w') as f:
json.dump(data, f, indent=4)
import datetime
import time
import os
os.environ["XDG_RUNTIME_DIR"] = "/tmp"
def run_command_with_logging(cmd, log_file, cwd=None, env=None, timeout=1800, stop_string=None):
"""Runs a command, logs output, and optionally stops when a string is found."""
if env is None:
env = os.environ.copy()
if cwd is None:
cwd = os.getcwd()
start_time = time.time()
iso_start = datetime.datetime.now().isoformat()
with open(log_file, "a") as f:
msg = f"[{iso_start}] [INFO] Executing command: {cmd} in {cwd}\n"
print(msg, end="")
f.write(msg)
f.flush()
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
text=True,
cwd=cwd,
env=env,
bufsize=1,
universal_newlines=True
)
# Read output line by line
while True:
# Check for global timeout
if time.time() - start_time > timeout:
process.kill()
iso_now = datetime.datetime.now().isoformat()
err_msg = f"[{iso_now}] [ERROR] Command timed out after {timeout} seconds\n"
print(err_msg, end="")
f.write(err_msg)
return 124 # Timeout exit code
line = process.stdout.readline()
if not line and process.poll() is not None:
break
if line:
iso_now = datetime.datetime.now().isoformat()
# Determine level (simple heuristic)
level = "INFO"
if any(word in line.lower() for word in ["error", "fail", "fatal", "critical"]):
level = "ERROR"
log_line = f"[{iso_now}] [{level}] {line}"
print(log_line, end="")
f.write(log_line)
f.flush()
# Check for stop string
if stop_string and stop_string in line:
iso_now = datetime.datetime.now().isoformat()
stop_msg = f"[{iso_now}] [INFO] Stop string '{stop_string}' detected. Terminating process.\n"
print(stop_msg, end="")
f.write(stop_msg)
f.flush()
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
return 0
return process.returncode
except Exception as e:
iso_now = datetime.datetime.now().isoformat()
err_msg = f"[{iso_now}] [ERROR] Exception during execution: {str(e)}\n"
print(err_msg, end="")
f.write(err_msg)
return 1
def run_worker():
print("Worker started...")
while True:
db = database.SessionLocal()
try:
# Get next waiting queue
queue = db.query(models.Queue).filter(models.Queue.status == "Waiting").order_by(models.Queue.created_at).first()
if queue:
print(f"Processing queue: {queue.id} (Branch: {queue.source})")
queue.status = "Running"
update_json_status(queue.id, None, "Running")
db.commit()
queue_dir = os.path.join(BASE_DATA_DIR, queue.id)
os.makedirs(queue_dir, exist_ok=True)
queue_log = os.path.join(queue_dir, "queue_log.txt")
# 0- Clone repository if not exists
clone_cmd = "./TPF/gitea_repo_controller.sh clone"
run_command_with_logging(clone_cmd, queue_log)
# 0.1- Checkout branch
checkout_cmd = f"./TPF/gitea_repo_controller.sh checkout {queue.source}"
run_command_with_logging(checkout_cmd, queue_log)
# Clean up any orphaned QEMU processes
run_command_with_logging("pkill -f qemu-system-xtensa || true", queue_log)
# 1-5 Build software and run QEMU
# We stop when we see the multicore app start message
build_cmd = f"/bin/bash -c 'export PYTHONUNBUFFERED=1 && source $HOME/esp/esp-idf/export.sh && cd TPF/Sensor_hub_repo && idf.py build && idf.py qemu'"
run_command_with_logging(build_cmd, queue_log, stop_string="cpu_start: Multicore app")
# 9- Loop for each task
tasks = db.query(models.Task).filter(models.Task.queue_id == queue.id, models.Task.status == "Waiting").all()
for task in tasks:
# Check if queue was aborted mid-way
db.refresh(queue)
if queue.status == "Aborted":
break
task.status = "Running"
update_json_status(queue.id, task.id, "Running")
db.commit()
try:
# Run scenario_execution.py queue_id scenario_path task_id
# It must be executed from TPF/Sensor_hub_repo with IDF sourced
script_path = os.path.abspath("./TPF/scenario_execution.py")
repo_dir = os.path.abspath("./TPF/Sensor_hub_repo")
cmd = f"/bin/bash -c 'export PYTHONUNBUFFERED=1 && source $HOME/esp/esp-idf/export.sh && python3 {script_path} {queue.id} {task.scenario_path} {task.id}'"
task_dir = os.path.join(queue_dir, task.id)
os.makedirs(task_dir, exist_ok=True)
ret = run_command_with_logging(cmd, queue_log, cwd=repo_dir)
if ret == 0:
task.status = "Finished"
else:
task.status = "Error"
# Try to find the summary if it exists
summary_path = os.path.join(task_dir, "final_summary.json")
if os.path.exists(summary_path):
with open(summary_path, 'r') as f:
task.result = json.load(f)
update_json_status(queue.id, task.id, task.status, task.result)
except Exception as e:
task.status = "Error"
update_json_status(queue.id, task.id, "Error")
db.commit()
if queue.status != "Aborted":
queue.status = "Finished"
update_json_status(queue.id, None, "Finished")
db.commit()
time.sleep(5) # Poll every 5 seconds
except Exception as e:
print(f"Worker error: {e}")
time.sleep(10)
finally:
db.close()
if __name__ == "__main__":
run_worker()