update test arena
This commit is contained in:
@@ -2,6 +2,10 @@ from flask import Flask
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_login import LoginManager
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
import json
|
||||
|
||||
db = SQLAlchemy()
|
||||
login_manager = LoginManager()
|
||||
@@ -73,5 +77,23 @@ def create_app():
|
||||
except Exception as e:
|
||||
# Admin user might already exist, rollback and continue
|
||||
db.session.rollback()
|
||||
|
||||
|
||||
# Start background polling thread
|
||||
def poll_jobs():
|
||||
with app.app_context():
|
||||
from app.models import Job
|
||||
from app.routes.jobs import update_job_status_internal
|
||||
while True:
|
||||
try:
|
||||
# Poll all jobs that are not finished
|
||||
unfinished_jobs = Job.query.filter(Job.status.in_(['waiting', 'in_progress'])).all()
|
||||
for job in unfinished_jobs:
|
||||
update_job_status_internal(job)
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Background polling error: {e}")
|
||||
time.sleep(20)
|
||||
|
||||
polling_thread = threading.Thread(target=poll_jobs, daemon=True)
|
||||
polling_thread.start()
|
||||
|
||||
return app
|
||||
|
||||
@@ -7,9 +7,26 @@ dashboard_bp = Blueprint('dashboard', __name__, url_prefix='/dashboard')
|
||||
@dashboard_bp.route('/')
|
||||
@login_required
|
||||
def index():
|
||||
if current_user.is_admin:
|
||||
jobs = Job.query.order_by(Job.submitted_at.desc()).all()
|
||||
else:
|
||||
jobs = Job.query.filter_by(user_id=current_user.id).order_by(Job.submitted_at.desc()).all()
|
||||
username_query = request.args.get('username')
|
||||
job_id_query = request.args.get('job_id')
|
||||
|
||||
return render_template('dashboard/index.html', jobs=jobs)
|
||||
query = Job.query
|
||||
|
||||
# Global search by Job ID
|
||||
if job_id_query:
|
||||
query = query.filter(Job.id == job_id_query)
|
||||
# Global search by Username
|
||||
elif username_query:
|
||||
from app.models import User
|
||||
query = query.join(User).filter(User.username.ilike(f'%{username_query}%'))
|
||||
# Default view
|
||||
else:
|
||||
if not current_user.is_admin:
|
||||
query = query.filter(Job.user_id == current_user.id)
|
||||
|
||||
jobs = query.order_by(Job.submitted_at.desc()).all()
|
||||
|
||||
return render_template('dashboard/index.html',
|
||||
jobs=jobs,
|
||||
username_query=username_query,
|
||||
job_id_query=job_id_query)
|
||||
|
||||
@@ -344,17 +344,32 @@ def submit_final():
|
||||
scenario_map = json.loads(scenario_map_json) if scenario_map_json else {}
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"[ERROR] JSON Decode Error in submit_final: {e}")
|
||||
print(f"[DEBUG] scenarios_json: {scenarios_json}")
|
||||
print(f"[DEBUG] scenario_map_json: {scenario_map_json}")
|
||||
flash('Invalid scenario data', 'error')
|
||||
return redirect(url_for('jobs.submit'))
|
||||
|
||||
# Generate Remote IDs
|
||||
remote_queue_id = generate_remote_id()
|
||||
remote_task_ids = {s: generate_remote_id() for s in scenarios}
|
||||
# Create Job record first to get the ID
|
||||
job = Job(
|
||||
user_id=current_user.id,
|
||||
branch_name=branch_name,
|
||||
scenarios=json.dumps(scenarios),
|
||||
environment=environment,
|
||||
test_mode=test_mode,
|
||||
status='waiting'
|
||||
)
|
||||
db.session.add(job)
|
||||
db.session.commit()
|
||||
|
||||
# Use Job ID as Remote Queue ID
|
||||
remote_queue_id = str(job.id)
|
||||
remote_task_ids = {s: f"{job.id}_{i+1}" for i, s in enumerate(scenarios)}
|
||||
|
||||
# Update job with remote IDs
|
||||
job.remote_queue_id = remote_queue_id
|
||||
job.remote_task_ids = json.dumps(remote_task_ids)
|
||||
db.session.commit()
|
||||
|
||||
# Prepare Remote Payload
|
||||
# Format: {"source": "branch", "queue_id": ["staging", {"task_id": "path"}]}
|
||||
# Format: {"source": "branch", "345": ["staging", {"task1": "path"}]}
|
||||
payload = {
|
||||
"source": branch_name,
|
||||
remote_queue_id: [
|
||||
@@ -373,20 +388,6 @@ def submit_final():
|
||||
print(f"[ERROR] Failed to trigger remote queue: {e}")
|
||||
flash(f'Warning: Job saved but failed to trigger remote queue: {str(e)}', 'warning')
|
||||
|
||||
job = Job(
|
||||
user_id=current_user.id,
|
||||
branch_name=branch_name,
|
||||
scenarios=json.dumps(scenarios),
|
||||
environment=environment,
|
||||
test_mode=test_mode,
|
||||
status='waiting', # Start with waiting (sand watch)
|
||||
remote_queue_id=remote_queue_id,
|
||||
remote_task_ids=json.dumps(remote_task_ids)
|
||||
)
|
||||
|
||||
db.session.add(job)
|
||||
db.session.commit()
|
||||
|
||||
flash('Test job submitted successfully', 'success')
|
||||
return redirect(url_for('dashboard.index'))
|
||||
|
||||
@@ -419,84 +420,138 @@ def view_job(job_id):
|
||||
@jobs_bp.route('/<int:job_id>/abort', methods=['POST'])
|
||||
@login_required
|
||||
def abort_job(job_id):
|
||||
return jsonify({'error': 'Abort functionality is not implemented yet'}), 400
|
||||
job = Job.query.get_or_404(job_id)
|
||||
|
||||
if not current_user.is_admin and job.user_id != current_user.id:
|
||||
return jsonify({'error': 'Access denied'}), 403
|
||||
|
||||
try:
|
||||
remote_url = f"http://asf-server.duckdns.org:8080/api/abort/{job.remote_queue_id}"
|
||||
response = requests.post(remote_url, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
job.status = 'aborted'
|
||||
db.session.commit()
|
||||
return jsonify({'success': True, 'message': 'Job aborted successfully'})
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to abort remote job: {e}")
|
||||
return jsonify({'error': f'Failed to abort remote job: {str(e)}'}), 500
|
||||
|
||||
@jobs_bp.route('/<int:job_id>/delete', methods=['POST', 'DELETE'])
|
||||
@login_required
|
||||
def delete_job(job_id):
|
||||
job = Job.query.get_or_404(job_id)
|
||||
|
||||
if not current_user.is_admin and job.user_id != current_user.id:
|
||||
return jsonify({'error': 'Access denied'}), 403
|
||||
|
||||
try:
|
||||
# 1. Try to delete from remote
|
||||
remote_url = f"http://asf-server.duckdns.org:8080/api/delete/{job.remote_queue_id}"
|
||||
try:
|
||||
response = requests.delete(remote_url, timeout=10)
|
||||
# We don't necessarily fail if remote delete fails (e.g. already gone)
|
||||
print(f"[DEBUG] Remote delete response: {response.status_code}")
|
||||
except Exception as re:
|
||||
print(f"[WARNING] Remote delete failed: {re}")
|
||||
|
||||
# 2. Delete from local DB
|
||||
db.session.delete(job)
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({'success': True, 'message': 'Job deleted successfully'})
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to delete job: {e}")
|
||||
return jsonify({'error': f'Failed to delete job: {str(e)}'}), 500
|
||||
|
||||
def update_job_status_internal(job):
|
||||
"""Internal function to poll remote status and update local job record."""
|
||||
if job.status in ['passed', 'failed', 'aborted']:
|
||||
return
|
||||
|
||||
try:
|
||||
# 1. Check Queue Status
|
||||
status_url = f"http://asf-server.duckdns.org:8080/api/status/{job.remote_queue_id}"
|
||||
q_resp = requests.get(status_url, timeout=5)
|
||||
if q_resp.status_code == 200:
|
||||
q_data = q_resp.json()
|
||||
remote_status = q_data.get('status', '').lower()
|
||||
|
||||
if remote_status == 'running':
|
||||
job.status = 'in_progress'
|
||||
elif remote_status == 'waiting':
|
||||
job.status = 'waiting'
|
||||
elif remote_status == 'aborted':
|
||||
job.status = 'aborted'
|
||||
|
||||
# 2. Fetch Queue Logs if running or finished
|
||||
if remote_status in ['running', 'finished', 'aborted']:
|
||||
log_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/queue_log.txt"
|
||||
l_resp = requests.get(log_url, timeout=5)
|
||||
if l_resp.status_code == 200:
|
||||
job.queue_log = l_resp.text
|
||||
|
||||
# 3. Check Task Statuses and Results
|
||||
task_ids = json.loads(job.remote_task_ids) if job.remote_task_ids else {}
|
||||
results = json.loads(job.remote_results) if job.remote_results else {}
|
||||
|
||||
all_finished = True
|
||||
any_failed = False
|
||||
|
||||
if not task_ids:
|
||||
all_finished = False # Should not happen if job is valid
|
||||
|
||||
for scenario, task_id in task_ids.items():
|
||||
# If we don't have result yet, check it
|
||||
if scenario not in results:
|
||||
# Check task status
|
||||
t_status_url = f"http://asf-server.duckdns.org:8080/api/status/{task_id}"
|
||||
t_resp = requests.get(t_status_url, timeout=2)
|
||||
if t_resp.status_code == 200:
|
||||
t_data = t_resp.json()
|
||||
if t_data.get('status') == 'Finished':
|
||||
# Fetch results
|
||||
res_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/{task_id}/final_summary.json"
|
||||
r_resp = requests.get(res_url, timeout=2)
|
||||
if r_resp.status_code == 200:
|
||||
r_data = r_resp.json()
|
||||
for key, val in r_data.items():
|
||||
if key.upper() == scenario.upper():
|
||||
results[scenario] = val
|
||||
if val[0] == 'FAIL':
|
||||
any_failed = True
|
||||
break
|
||||
else:
|
||||
all_finished = False
|
||||
elif t_data.get('status') == 'Aborted':
|
||||
results[scenario] = ['ABORTED', '#']
|
||||
all_finished = True # Or handle as aborted
|
||||
else:
|
||||
all_finished = False
|
||||
else:
|
||||
all_finished = False
|
||||
else:
|
||||
if results[scenario][0] == 'FAIL':
|
||||
any_failed = True
|
||||
|
||||
if all_finished and task_ids:
|
||||
job.status = 'failed' if any_failed else 'passed'
|
||||
from app import db
|
||||
job.completed_at = db.session.query(db.func.now()).scalar()
|
||||
|
||||
job.remote_results = json.dumps(results)
|
||||
from app import db
|
||||
db.session.commit()
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Status polling failed for job {job.id}: {e}")
|
||||
|
||||
@jobs_bp.route('/<int:job_id>/status')
|
||||
@login_required
|
||||
def get_job_status(job_id):
|
||||
job = Job.query.get_or_404(job_id)
|
||||
update_job_status_internal(job)
|
||||
|
||||
if job.status not in ['passed', 'failed', 'aborted']:
|
||||
# Poll remote API
|
||||
try:
|
||||
# 1. Check Queue Status
|
||||
status_url = f"http://asf-server.duckdns.org:8080/api/status/{job.remote_queue_id}"
|
||||
q_resp = requests.get(status_url, timeout=5)
|
||||
if q_resp.status_code == 200:
|
||||
q_data = q_resp.json()
|
||||
remote_status = q_data.get('status', '').lower()
|
||||
|
||||
if remote_status == 'running':
|
||||
job.status = 'in_progress'
|
||||
elif remote_status == 'waiting':
|
||||
job.status = 'waiting'
|
||||
|
||||
# 2. Fetch Queue Logs if running
|
||||
if remote_status == 'running':
|
||||
log_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/queue_log.txt"
|
||||
l_resp = requests.get(log_url, timeout=5)
|
||||
if l_resp.status_code == 200:
|
||||
job.queue_log = l_resp.text
|
||||
|
||||
# 3. Check Task Statuses and Results
|
||||
task_ids = json.loads(job.remote_task_ids) if job.remote_task_ids else {}
|
||||
results = json.loads(job.remote_results) if job.remote_results else {}
|
||||
|
||||
all_finished = True
|
||||
any_failed = False
|
||||
|
||||
for scenario, task_id in task_ids.items():
|
||||
# If we don't have result yet, check it
|
||||
if scenario not in results:
|
||||
# Check task status
|
||||
t_status_url = f"http://asf-server.duckdns.org:8080/api/status/{task_id}"
|
||||
t_resp = requests.get(t_status_url, timeout=2)
|
||||
if t_resp.status_code == 200:
|
||||
t_data = t_resp.json()
|
||||
if t_data.get('status') == 'Finished':
|
||||
# Fetch results
|
||||
res_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/{task_id}/final_summary.json"
|
||||
r_resp = requests.get(res_url, timeout=2)
|
||||
if r_resp.status_code == 200:
|
||||
r_data = r_resp.json()
|
||||
# User says: {"SCENARIO_NAME": ["PASS/FAIL", "link"]}
|
||||
# We need to find the key that matches our scenario (case insensitive maybe?)
|
||||
for key, val in r_data.items():
|
||||
if key.upper() == scenario.upper():
|
||||
results[scenario] = val
|
||||
if val[0] == 'FAIL':
|
||||
any_failed = True
|
||||
break
|
||||
else:
|
||||
all_finished = False
|
||||
else:
|
||||
all_finished = False
|
||||
else:
|
||||
all_finished = False
|
||||
else:
|
||||
if results[scenario][0] == 'FAIL':
|
||||
any_failed = True
|
||||
|
||||
if all_finished and task_ids:
|
||||
job.status = 'failed' if any_failed else 'passed'
|
||||
job.completed_at = db.session.query(db.func.now()).scalar()
|
||||
|
||||
job.remote_results = json.dumps(results)
|
||||
db.session.commit()
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Status polling failed: {e}")
|
||||
|
||||
return jsonify({
|
||||
'status': job.status,
|
||||
'status_icon': job.get_status_icon(),
|
||||
|
||||
Reference in New Issue
Block a user