update DB table
This commit is contained in:
@@ -30,12 +30,25 @@ def update_json_status(queue_id, task_id, status, result=None):
|
|||||||
json.dump(data, f, indent=4)
|
json.dump(data, f, indent=4)
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import time
|
||||||
|
|
||||||
def run_command_with_logging(cmd, log_file, cwd=None, env=None):
|
def run_command_with_logging(cmd, log_file, cwd=None, env=None, timeout=1800):
|
||||||
"""Runs a command, logs output to file and stdout with ISO timestamps."""
|
"""Runs a command, logs output to file and stdout with ISO timestamps and levels."""
|
||||||
if env is None:
|
if env is None:
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
|
if cwd is None:
|
||||||
|
cwd = os.getcwd()
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
iso_start = datetime.datetime.now().isoformat()
|
||||||
|
|
||||||
with open(log_file, "a") as f:
|
with open(log_file, "a") as f:
|
||||||
|
msg = f"[{iso_start}] [INFO] Executing command: {cmd} in {cwd}\n"
|
||||||
|
print(msg, end="")
|
||||||
|
f.write(msg)
|
||||||
|
f.flush()
|
||||||
|
|
||||||
|
try:
|
||||||
process = subprocess.Popen(
|
process = subprocess.Popen(
|
||||||
cmd,
|
cmd,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
@@ -43,16 +56,45 @@ def run_command_with_logging(cmd, log_file, cwd=None, env=None):
|
|||||||
shell=True,
|
shell=True,
|
||||||
text=True,
|
text=True,
|
||||||
cwd=cwd,
|
cwd=cwd,
|
||||||
env=env
|
env=env,
|
||||||
|
bufsize=1,
|
||||||
|
universal_newlines=True
|
||||||
)
|
)
|
||||||
for line in process.stdout:
|
|
||||||
timestamp = datetime.datetime.now().isoformat()
|
# Read output line by line
|
||||||
log_line = f"[{timestamp}] {line}"
|
while True:
|
||||||
|
# Check for global timeout
|
||||||
|
if time.time() - start_time > timeout:
|
||||||
|
process.kill()
|
||||||
|
iso_now = datetime.datetime.now().isoformat()
|
||||||
|
err_msg = f"[{iso_now}] [ERROR] Command timed out after {timeout} seconds\n"
|
||||||
|
print(err_msg, end="")
|
||||||
|
f.write(err_msg)
|
||||||
|
return 124 # Timeout exit code
|
||||||
|
|
||||||
|
line = process.stdout.readline()
|
||||||
|
if not line and process.poll() is not None:
|
||||||
|
break
|
||||||
|
|
||||||
|
if line:
|
||||||
|
iso_now = datetime.datetime.now().isoformat()
|
||||||
|
# Determine level (simple heuristic)
|
||||||
|
level = "INFO"
|
||||||
|
if any(word in line.lower() for word in ["error", "fail", "fatal", "critical"]):
|
||||||
|
level = "ERROR"
|
||||||
|
|
||||||
|
log_line = f"[{iso_now}] [{level}] {line}"
|
||||||
print(log_line, end="")
|
print(log_line, end="")
|
||||||
f.write(log_line)
|
f.write(log_line)
|
||||||
f.flush()
|
f.flush()
|
||||||
process.wait()
|
|
||||||
return process.returncode
|
return process.returncode
|
||||||
|
except Exception as e:
|
||||||
|
iso_now = datetime.datetime.now().isoformat()
|
||||||
|
err_msg = f"[{iso_now}] [ERROR] Exception during execution: {str(e)}\n"
|
||||||
|
print(err_msg, end="")
|
||||||
|
f.write(err_msg)
|
||||||
|
return 1
|
||||||
|
|
||||||
def run_worker():
|
def run_worker():
|
||||||
print("Worker started...")
|
print("Worker started...")
|
||||||
@@ -73,19 +115,14 @@ def run_worker():
|
|||||||
queue_log = os.path.join(queue_dir, "queue_log.txt")
|
queue_log = os.path.join(queue_dir, "queue_log.txt")
|
||||||
|
|
||||||
# 0- Clone repository if not exists
|
# 0- Clone repository if not exists
|
||||||
print("Cloning repository...")
|
|
||||||
clone_cmd = "./TPF/gitea_repo_controller.sh clone"
|
clone_cmd = "./TPF/gitea_repo_controller.sh clone"
|
||||||
run_command_with_logging(clone_cmd, queue_log)
|
run_command_with_logging(clone_cmd, queue_log)
|
||||||
|
|
||||||
# 0.1- Checkout branch
|
# 0.1- Checkout branch
|
||||||
print(f"Checking out branch: {queue.source}")
|
|
||||||
checkout_cmd = f"./TPF/gitea_repo_controller.sh checkout {queue.source}"
|
checkout_cmd = f"./TPF/gitea_repo_controller.sh checkout {queue.source}"
|
||||||
run_command_with_logging(checkout_cmd, queue_log)
|
run_command_with_logging(checkout_cmd, queue_log)
|
||||||
|
|
||||||
# 1-5 Build software
|
# 1-5 Build software
|
||||||
print("Building software...")
|
|
||||||
# We need to source the IDF export script and then build.
|
|
||||||
# Using a single shell command to maintain environment.
|
|
||||||
# Explicitly use /bin/bash to avoid shell mismatch
|
# Explicitly use /bin/bash to avoid shell mismatch
|
||||||
build_cmd = f"/bin/bash -c 'source $HOME/esp/esp-idf/export.sh && cd TPF/Sensor_hub_repo && idf.py build && idf.py qemu'"
|
build_cmd = f"/bin/bash -c 'source $HOME/esp/esp-idf/export.sh && cd TPF/Sensor_hub_repo && idf.py build && idf.py qemu'"
|
||||||
run_command_with_logging(build_cmd, queue_log)
|
run_command_with_logging(build_cmd, queue_log)
|
||||||
@@ -99,7 +136,6 @@ def run_worker():
|
|||||||
if queue.status == "Aborted":
|
if queue.status == "Aborted":
|
||||||
break
|
break
|
||||||
|
|
||||||
print(f"Running task: {task.id}")
|
|
||||||
task.status = "Running"
|
task.status = "Running"
|
||||||
update_json_status(queue.id, task.id, "Running")
|
update_json_status(queue.id, task.id, "Running")
|
||||||
db.commit()
|
db.commit()
|
||||||
@@ -107,10 +143,8 @@ def run_worker():
|
|||||||
try:
|
try:
|
||||||
# Run scenario_execution.py queue_id scenario_path task_id
|
# Run scenario_execution.py queue_id scenario_path task_id
|
||||||
script_path = "./TPF/scenario_execution.py"
|
script_path = "./TPF/scenario_execution.py"
|
||||||
# Use the same python environment
|
|
||||||
cmd = f"python3 {script_path} {queue.id} {task.scenario_path} {task.id}"
|
cmd = f"python3 {script_path} {queue.id} {task.scenario_path} {task.id}"
|
||||||
|
|
||||||
# We want to capture this in the task log too
|
|
||||||
task_dir = os.path.join(queue_dir, task.id)
|
task_dir = os.path.join(queue_dir, task.id)
|
||||||
os.makedirs(task_dir, exist_ok=True)
|
os.makedirs(task_dir, exist_ok=True)
|
||||||
|
|
||||||
@@ -130,7 +164,6 @@ def run_worker():
|
|||||||
update_json_status(queue.id, task.id, task.status, task.result)
|
update_json_status(queue.id, task.id, task.status, task.result)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error running task {task.id}: {e}")
|
|
||||||
task.status = "Error"
|
task.status = "Error"
|
||||||
update_json_status(queue.id, task.id, "Error")
|
update_json_status(queue.id, task.id, "Error")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user