init
This commit is contained in:
98
testarena_app/worker.py
Normal file
98
testarena_app/worker.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import time
|
||||
import subprocess
|
||||
import json
|
||||
import os
|
||||
from sqlalchemy.orm import Session
|
||||
from . import models, database
|
||||
|
||||
# Base directory for data
|
||||
BASE_DATA_DIR = "/home/asf/testarena"
|
||||
if os.name == 'nt':
|
||||
BASE_DATA_DIR = "d:/ASF - course/ASF_01/ASF_tools/asf-pc-server/testarena_pc_backend/testarena_data"
|
||||
|
||||
def update_json_status(queue_id, task_id, status, result=None):
|
||||
queue_dir = os.path.join(BASE_DATA_DIR, queue_id)
|
||||
status_file = os.path.join(queue_dir, "queue_status.json")
|
||||
if os.path.exists(status_file):
|
||||
with open(status_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if task_id:
|
||||
data["tasks"][task_id] = status
|
||||
else:
|
||||
data["status"] = status
|
||||
|
||||
if result:
|
||||
data["results"] = data.get("results", {})
|
||||
data["results"][task_id] = result
|
||||
|
||||
with open(status_file, 'w') as f:
|
||||
json.dump(data, f, indent=4)
|
||||
|
||||
def run_worker():
|
||||
print("Worker started...")
|
||||
while True:
|
||||
db = database.SessionLocal()
|
||||
try:
|
||||
# Get next waiting queue
|
||||
queue = db.query(models.Queue).filter(models.Queue.status == "Waiting").order_by(models.Queue.created_at).first()
|
||||
|
||||
if queue:
|
||||
print(f"Processing queue: {queue.id}")
|
||||
queue.status = "Running"
|
||||
update_json_status(queue.id, None, "Running")
|
||||
db.commit()
|
||||
|
||||
tasks = db.query(models.Task).filter(models.Task.queue_id == queue.id, models.Task.status == "Waiting").all()
|
||||
|
||||
for task in tasks:
|
||||
# Check if queue was aborted mid-way
|
||||
db.refresh(queue)
|
||||
if queue.status == "Aborted":
|
||||
break
|
||||
|
||||
print(f"Running task: {task.id}")
|
||||
task.status = "Running"
|
||||
update_json_status(queue.id, task.id, "Running")
|
||||
db.commit()
|
||||
|
||||
try:
|
||||
# Run tpf_execution.py [queue_id, scenario_path, task_id]
|
||||
# Assuming tpf_execution.py is in the parent directory or accessible
|
||||
script_path = "tpf_execution.py"
|
||||
# For testing, let's assume it's in the same dir as the app or parent
|
||||
cmd = ["python", script_path, queue.id, task.scenario_path, task.id]
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
# Parse result if it returns json
|
||||
try:
|
||||
execution_result = json.loads(result.stdout)
|
||||
except:
|
||||
execution_result = {"output": result.stdout, "error": result.stderr}
|
||||
|
||||
task.status = "Finished"
|
||||
task.result = execution_result
|
||||
update_json_status(queue.id, task.id, "Finished", execution_result)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error running task {task.id}: {e}")
|
||||
task.status = "Error"
|
||||
update_json_status(queue.id, task.id, "Error")
|
||||
|
||||
db.commit()
|
||||
|
||||
if queue.status != "Aborted":
|
||||
queue.status = "Finished"
|
||||
update_json_status(queue.id, None, "Finished")
|
||||
db.commit()
|
||||
|
||||
time.sleep(5) # Poll every 5 seconds
|
||||
except Exception as e:
|
||||
print(f"Worker error: {e}")
|
||||
time.sleep(10)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_worker()
|
||||
Reference in New Issue
Block a user