205 lines
8.1 KiB
Python
205 lines
8.1 KiB
Python
import time
|
|
import subprocess
|
|
import json
|
|
import os
|
|
from sqlalchemy.orm import Session
|
|
from . import models, database
|
|
|
|
# Base directory for data
|
|
BASE_DATA_DIR = "/home/asf/testarena"
|
|
if os.name == 'nt':
|
|
BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data"
|
|
|
|
def update_json_status(queue_id, task_id, status, result=None):
|
|
queue_dir = os.path.join(BASE_DATA_DIR, queue_id)
|
|
status_file = os.path.join(queue_dir, "queue_status.json")
|
|
if os.path.exists(status_file):
|
|
with open(status_file, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
if task_id:
|
|
data["tasks"][task_id] = status
|
|
else:
|
|
data["status"] = status
|
|
|
|
if result:
|
|
data["results"] = data.get("results", {})
|
|
data["results"][task_id] = result
|
|
|
|
with open(status_file, 'w') as f:
|
|
json.dump(data, f, indent=4)
|
|
|
|
import datetime
|
|
import time
|
|
import os
|
|
|
|
os.environ["XDG_RUNTIME_DIR"] = "/tmp"
|
|
|
|
def run_command_with_logging(cmd, log_file, cwd=None, env=None, timeout=1800, stop_string=None):
|
|
"""Runs a command, logs output, and optionally stops when a string is found."""
|
|
if env is None:
|
|
env = os.environ.copy()
|
|
if cwd is None:
|
|
cwd = os.getcwd()
|
|
|
|
start_time = time.time()
|
|
iso_start = datetime.datetime.now().isoformat()
|
|
|
|
with open(log_file, "a") as f:
|
|
msg = f"[{iso_start}] [INFO] Executing command: {cmd} in {cwd}\n"
|
|
print(msg, end="")
|
|
f.write(msg)
|
|
f.flush()
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
shell=True,
|
|
text=True,
|
|
cwd=cwd,
|
|
env=env,
|
|
bufsize=1,
|
|
universal_newlines=True
|
|
)
|
|
|
|
# Read output line by line
|
|
while True:
|
|
# Check for global timeout
|
|
if time.time() - start_time > timeout:
|
|
process.kill()
|
|
iso_now = datetime.datetime.now().isoformat()
|
|
err_msg = f"[{iso_now}] [ERROR] Command timed out after {timeout} seconds\n"
|
|
print(err_msg, end="")
|
|
f.write(err_msg)
|
|
return 124 # Timeout exit code
|
|
|
|
line = process.stdout.readline()
|
|
if not line and process.poll() is not None:
|
|
break
|
|
|
|
if line:
|
|
iso_now = datetime.datetime.now().isoformat()
|
|
# Determine level (simple heuristic)
|
|
level = "INFO"
|
|
if any(word in line.lower() for word in ["error", "fail", "fatal", "critical"]):
|
|
level = "ERROR"
|
|
|
|
log_line = f"[{iso_now}] [{level}] {line}"
|
|
print(log_line, end="")
|
|
f.write(log_line)
|
|
f.flush()
|
|
|
|
# Check for stop string
|
|
if stop_string and stop_string in line:
|
|
iso_now = datetime.datetime.now().isoformat()
|
|
stop_msg = f"[{iso_now}] [INFO] Stop string '{stop_string}' detected. Terminating process.\n"
|
|
print(stop_msg, end="")
|
|
f.write(stop_msg)
|
|
f.flush()
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
return 0
|
|
|
|
return process.returncode
|
|
except Exception as e:
|
|
iso_now = datetime.datetime.now().isoformat()
|
|
err_msg = f"[{iso_now}] [ERROR] Exception during execution: {str(e)}\n"
|
|
print(err_msg, end="")
|
|
f.write(err_msg)
|
|
return 1
|
|
|
|
def run_worker():
|
|
print("Worker started...")
|
|
while True:
|
|
db = database.SessionLocal()
|
|
try:
|
|
# Get next waiting queue
|
|
queue = db.query(models.Queue).filter(models.Queue.status == "Waiting").order_by(models.Queue.created_at).first()
|
|
|
|
if queue:
|
|
print(f"Processing queue: {queue.id} (Branch: {queue.source})")
|
|
queue.status = "Running"
|
|
update_json_status(queue.id, None, "Running")
|
|
db.commit()
|
|
|
|
queue_dir = os.path.join(BASE_DATA_DIR, queue.id)
|
|
os.makedirs(queue_dir, exist_ok=True)
|
|
queue_log = os.path.join(queue_dir, "queue_log.txt")
|
|
|
|
# 0- Clone repository if not exists
|
|
clone_cmd = "./TPF/gitea_repo_controller.sh clone"
|
|
run_command_with_logging(clone_cmd, queue_log)
|
|
|
|
# 0.1- Checkout branch
|
|
checkout_cmd = f"./TPF/gitea_repo_controller.sh checkout {queue.source}"
|
|
run_command_with_logging(checkout_cmd, queue_log)
|
|
|
|
# 1-5 Build software and run QEMU
|
|
# We stop when we see the multicore app start message
|
|
build_cmd = f"/bin/bash -c 'source $HOME/esp/esp-idf/export.sh && cd TPF/Sensor_hub_repo && idf.py build && idf.py qemu'"
|
|
run_command_with_logging(build_cmd, queue_log, stop_string="cpu_start: Multicore app")
|
|
|
|
# 9- Loop for each task
|
|
tasks = db.query(models.Task).filter(models.Task.queue_id == queue.id, models.Task.status == "Waiting").all()
|
|
|
|
for task in tasks:
|
|
# Check if queue was aborted mid-way
|
|
db.refresh(queue)
|
|
if queue.status == "Aborted":
|
|
break
|
|
|
|
task.status = "Running"
|
|
update_json_status(queue.id, task.id, "Running")
|
|
db.commit()
|
|
|
|
try:
|
|
# Run scenario_execution.py queue_id scenario_path task_id
|
|
# It must be executed from TPF/Sensor_hub_repo with IDF sourced
|
|
script_path = os.path.abspath("./TPF/scenario_execution.py")
|
|
repo_dir = os.path.abspath("./TPF/Sensor_hub_repo")
|
|
cmd = f"/bin/bash -c 'source $HOME/esp/esp-idf/export.sh && python3 {script_path} {queue.id} {task.scenario_path} {task.id}'"
|
|
|
|
task_dir = os.path.join(queue_dir, task.id)
|
|
os.makedirs(task_dir, exist_ok=True)
|
|
|
|
ret = run_command_with_logging(cmd, queue_log, cwd=repo_dir)
|
|
|
|
if ret == 0:
|
|
task.status = "Finished"
|
|
else:
|
|
task.status = "Error"
|
|
|
|
# Try to find the summary if it exists
|
|
summary_path = os.path.join(task_dir, "final_summary.json")
|
|
if os.path.exists(summary_path):
|
|
with open(summary_path, 'r') as f:
|
|
task.result = json.load(f)
|
|
|
|
update_json_status(queue.id, task.id, task.status, task.result)
|
|
|
|
except Exception as e:
|
|
task.status = "Error"
|
|
update_json_status(queue.id, task.id, "Error")
|
|
|
|
db.commit()
|
|
|
|
if queue.status != "Aborted":
|
|
queue.status = "Finished"
|
|
update_json_status(queue.id, None, "Finished")
|
|
db.commit()
|
|
|
|
time.sleep(5) # Poll every 5 seconds
|
|
except Exception as e:
|
|
print(f"Worker error: {e}")
|
|
time.sleep(10)
|
|
finally:
|
|
db.close()
|
|
|
|
if __name__ == "__main__":
|
|
run_worker()
|