diff --git a/testarena_app/worker.py b/testarena_app/worker.py index 2fb5eae..394beb8 100644 --- a/testarena_app/worker.py +++ b/testarena_app/worker.py @@ -30,29 +30,71 @@ def update_json_status(queue_id, task_id, status, result=None): json.dump(data, f, indent=4) import datetime +import time -def run_command_with_logging(cmd, log_file, cwd=None, env=None): - """Runs a command, logs output to file and stdout with ISO timestamps.""" +def run_command_with_logging(cmd, log_file, cwd=None, env=None, timeout=1800): + """Runs a command, logs output to file and stdout with ISO timestamps and levels.""" if env is None: env = os.environ.copy() + if cwd is None: + cwd = os.getcwd() + + start_time = time.time() + iso_start = datetime.datetime.now().isoformat() + with open(log_file, "a") as f: - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - shell=True, - text=True, - cwd=cwd, - env=env - ) - for line in process.stdout: - timestamp = datetime.datetime.now().isoformat() - log_line = f"[{timestamp}] {line}" - print(log_line, end="") - f.write(log_line) - f.flush() - process.wait() - return process.returncode + msg = f"[{iso_start}] [INFO] Executing command: {cmd} in {cwd}\n" + print(msg, end="") + f.write(msg) + f.flush() + + try: + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True, + text=True, + cwd=cwd, + env=env, + bufsize=1, + universal_newlines=True + ) + + # Read output line by line + while True: + # Check for global timeout + if time.time() - start_time > timeout: + process.kill() + iso_now = datetime.datetime.now().isoformat() + err_msg = f"[{iso_now}] [ERROR] Command timed out after {timeout} seconds\n" + print(err_msg, end="") + f.write(err_msg) + return 124 # Timeout exit code + + line = process.stdout.readline() + if not line and process.poll() is not None: + break + + if line: + iso_now = datetime.datetime.now().isoformat() + # Determine level (simple heuristic) + level = "INFO" + if any(word in line.lower() for word in ["error", "fail", "fatal", "critical"]): + level = "ERROR" + + log_line = f"[{iso_now}] [{level}] {line}" + print(log_line, end="") + f.write(log_line) + f.flush() + + return process.returncode + except Exception as e: + iso_now = datetime.datetime.now().isoformat() + err_msg = f"[{iso_now}] [ERROR] Exception during execution: {str(e)}\n" + print(err_msg, end="") + f.write(err_msg) + return 1 def run_worker(): print("Worker started...") @@ -73,19 +115,14 @@ def run_worker(): queue_log = os.path.join(queue_dir, "queue_log.txt") # 0- Clone repository if not exists - print("Cloning repository...") clone_cmd = "./TPF/gitea_repo_controller.sh clone" run_command_with_logging(clone_cmd, queue_log) # 0.1- Checkout branch - print(f"Checking out branch: {queue.source}") checkout_cmd = f"./TPF/gitea_repo_controller.sh checkout {queue.source}" run_command_with_logging(checkout_cmd, queue_log) # 1-5 Build software - print("Building software...") - # We need to source the IDF export script and then build. - # Using a single shell command to maintain environment. # Explicitly use /bin/bash to avoid shell mismatch build_cmd = f"/bin/bash -c 'source $HOME/esp/esp-idf/export.sh && cd TPF/Sensor_hub_repo && idf.py build && idf.py qemu'" run_command_with_logging(build_cmd, queue_log) @@ -99,7 +136,6 @@ def run_worker(): if queue.status == "Aborted": break - print(f"Running task: {task.id}") task.status = "Running" update_json_status(queue.id, task.id, "Running") db.commit() @@ -107,10 +143,8 @@ def run_worker(): try: # Run scenario_execution.py queue_id scenario_path task_id script_path = "./TPF/scenario_execution.py" - # Use the same python environment cmd = f"python3 {script_path} {queue.id} {task.scenario_path} {task.id}" - # We want to capture this in the task log too task_dir = os.path.join(queue_dir, task.id) os.makedirs(task_dir, exist_ok=True) @@ -130,7 +164,6 @@ def run_worker(): update_json_status(queue.id, task.id, task.status, task.result) except Exception as e: - print(f"Error running task {task.id}: {e}") task.status = "Error" update_json_status(queue.id, task.id, "Error")