Files
testarena/app/routes/jobs.py
2026-01-04 18:12:22 +01:00

571 lines
24 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
from flask_login import login_required, current_user
from app.models import Job
from app import db
import json
import subprocess
import os
import requests
import random
import string
import traceback
def generate_remote_id(length=6):
return ''.join(random.choices(string.digits, k=length))
jobs_bp = Blueprint('jobs', __name__, url_prefix='/jobs')
@jobs_bp.route('/submit')
@login_required
def submit():
return render_template('jobs/submit.html')
@jobs_bp.route('/debug/test-step2')
@login_required
def debug_test_step2():
"""Debug endpoint to test step2 template with mock data"""
if not current_user.is_admin:
return jsonify({'error': 'Admin access required'}), 403
# Mock data similar to what we expect from scenario scan
organized_data = {
"drivers": {
"diag_protocol_stack": ["diag_protocol_stack_init_test"]
},
"application_layer": {
"data_pool": ["data_pool_init_test"],
"actuator_manager": ["actuator_manager_init_test2", "actuator_manager_init_test"],
"event_system": ["event_system_init_test"]
}
}
scenario_map = {
"diag_protocol_stack_init_test": "drivers/diag_protocol_stack/test/diag_protocol_stack_init_test.test_scenario.xml",
"data_pool_init_test": "application_layer/DP_stack/data_pool/test/data_pool_init_test.test_scenario.xml",
"actuator_manager_init_test2": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test2.test_scenario.xml",
"actuator_manager_init_test": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test.test_scenario.xml",
"event_system_init_test": "application_layer/business_stack/event_system/test/event_system_init_test.test_scenario.xml"
}
try:
return render_template('jobs/submit_step2.html',
branch_name='test_branch',
organized_data=organized_data,
scenario_map=scenario_map)
except Exception as e:
return jsonify({
'error': str(e),
'type': type(e).__name__,
'template_exists': True
})
@jobs_bp.route('/debug/ssh-test')
@login_required
def debug_ssh_test():
"""Debug endpoint to test SSH connectivity"""
if not current_user.is_admin:
return jsonify({'error': 'Admin access required'}), 403
try:
ssh_password = os.environ.get('SSH_PASSWORD', 'default_password')
ssh_host = os.environ.get('SSH_HOST', 'remote_host')
ssh_user = os.environ.get('SSH_USER', 'asf')
ssh_port = os.environ.get('SSH_PORT', '22')
# SSH options for better connectivity
ssh_options = f"-p {ssh_port} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
# Test basic SSH connectivity
test_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} 'echo SSH_CONNECTION_SUCCESS'"
result = subprocess.run(test_cmd, shell=True, capture_output=True, text=True, timeout=30)
return jsonify({
'ssh_config': {
'user': ssh_user,
'host': ssh_host,
'port': ssh_port,
'password_set': bool(ssh_password and ssh_password != 'default_password')
},
'command_executed': test_cmd.replace(ssh_password, '***'),
'test_result': {
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
},
'success': result.returncode == 0 and 'SSH_CONNECTION_SUCCESS' in result.stdout
})
except Exception as e:
return jsonify({
'error': str(e),
'success': False
})
@jobs_bp.route('/submit/step1', methods=['POST'])
@login_required
def submit_step1():
branch_name = request.form.get('branch_name')
if not branch_name:
return jsonify({
'success': False,
'error': 'Branch name is required',
'output': ''
})
# Validate branch exists on remote
try:
# Get SSH configuration from environment variables
ssh_password = os.environ.get('SSH_PASSWORD', 'default_password')
ssh_host = os.environ.get('SSH_HOST', 'remote_host')
ssh_user = os.environ.get('SSH_USER', 'asf')
ssh_port = os.environ.get('SSH_PORT', '22')
print(f"[DEBUG] Starting branch validation for: {branch_name}")
print(f"[DEBUG] SSH Config - User: {ssh_user}, Host: {ssh_host}, Port: {ssh_port}")
# SSH options for better connectivity
ssh_options = f"-p {ssh_port} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
# First, clone the repository
clone_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh clone'"
print(f"[DEBUG] Executing clone command: {clone_cmd.replace(ssh_password, '***')}")
clone_result = subprocess.run(clone_cmd, shell=True, capture_output=True, text=True, timeout=60)
print(f"[DEBUG] Clone result - Return code: {clone_result.returncode}")
print(f"[DEBUG] Clone stdout: {clone_result.stdout}")
print(f"[DEBUG] Clone stderr: {clone_result.stderr}")
# Then, checkout the branch
checkout_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh checkout {branch_name}'"
print(f"[DEBUG] Executing checkout command: {checkout_cmd.replace(ssh_password, '***')}")
checkout_result = subprocess.run(checkout_cmd, shell=True, capture_output=True, text=True, timeout=60)
print(f"[DEBUG] Checkout result - Return code: {checkout_result.returncode}")
print(f"[DEBUG] Checkout stdout: {checkout_result.stdout}")
print(f"[DEBUG] Checkout stderr: {checkout_result.stderr}")
# Combine all output for analysis
full_output = f"CLONE OUTPUT:\n{clone_result.stdout}\n{clone_result.stderr}\n\nCHECKOUT OUTPUT:\n{checkout_result.stdout}\n{checkout_result.stderr}"
# Check if checkout was successful
# Look for "fatal:" in the output which indicates failure
checkout_failed = (
"fatal:" in checkout_result.stdout.lower() or
"fatal:" in checkout_result.stderr.lower() or
checkout_result.returncode != 0
)
print(f"[DEBUG] Checkout failed: {checkout_failed}")
if checkout_failed:
error_msg = f'Branch "{branch_name}" not found on remote. Please push the branch first.'
# Try to extract more specific error from output
if "fatal:" in checkout_result.stdout:
fatal_line = [line for line in checkout_result.stdout.split('\n') if 'fatal:' in line.lower()]
if fatal_line:
error_msg += f" Error: {fatal_line[0].strip()}"
elif "fatal:" in checkout_result.stderr:
fatal_line = [line for line in checkout_result.stderr.split('\n') if 'fatal:' in line.lower()]
if fatal_line:
error_msg += f" Error: {fatal_line[0].strip()}"
return jsonify({
'success': False,
'error': error_msg,
'output': full_output,
'debug': {
'clone_returncode': clone_result.returncode,
'checkout_returncode': checkout_result.returncode,
'branch_name': branch_name
}
})
# If successful, run scenario scan to get available scenarios
print(f"[DEBUG] Running scenario scan...")
scan_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} 'python3 TPF/Sensor_hub_repo/Tools/TPF/scenario_scan.py'"
print(f"[DEBUG] Executing scan command: {scan_cmd.replace(ssh_password, '***')}")
scan_result = subprocess.run(scan_cmd, shell=True, capture_output=True, text=True, timeout=120)
print(f"[DEBUG] Scan result - Return code: {scan_result.returncode}")
print(f"[DEBUG] Scan stdout: {scan_result.stdout}")
print(f"[DEBUG] Scan stderr: {scan_result.stderr}")
# Parse the JSON response from scenario scan
try:
scan_data = json.loads(scan_result.stdout)
organized_data = scan_data.get('organized_data', {})
scenario_map = scan_data.get('scenario_map', {})
print(f"[DEBUG] Found {len(scenario_map)} scenarios in {len(organized_data)} categories")
except json.JSONDecodeError as e:
print(f"[ERROR] Failed to parse scenario scan JSON: {e}")
print(f"[ERROR] Raw output: {scan_result.stdout}")
# Fallback to mock data if scan fails
organized_data = {
"drivers": {
"diag_protocol_stack": ["diag_protocol_stack_init_test"]
},
"application_layer": {
"data_pool": ["data_pool_init_test"],
"actuator_manager": ["actuator_manager_init_test2", "actuator_manager_init_test"],
"event_system": ["event_system_init_test"]
}
}
scenario_map = {
"diag_protocol_stack_init_test": "drivers/diag_protocol_stack/test/diag_protocol_stack_init_test.test_scenario.xml",
"data_pool_init_test": "application_layer/DP_stack/data_pool/test/data_pool_init_test.test_scenario.xml",
"actuator_manager_init_test2": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test2.test_scenario.xml",
"actuator_manager_init_test": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test.test_scenario.xml",
"event_system_init_test": "application_layer/business_stack/event_system/test/event_system_init_test.test_scenario.xml"
}
print(f"[DEBUG] Branch validation successful for: {branch_name}")
return jsonify({
'success': True,
'organized_data': organized_data,
'scenario_map': scenario_map,
'message': f'Branch "{branch_name}" validated successfully',
'output': full_output,
'debug': {
'clone_returncode': clone_result.returncode,
'checkout_returncode': checkout_result.returncode,
'scan_returncode': scan_result.returncode if 'scan_result' in locals() else 'N/A',
'branch_name': branch_name,
'scenarios_found': len(scenario_map)
}
})
except subprocess.TimeoutExpired:
error_msg = 'Branch validation timed out. Please try again.'
print(f"[ERROR] {error_msg}")
return jsonify({
'success': False,
'error': error_msg,
'output': 'Command timed out after 60 seconds'
})
except Exception as e:
error_msg = f'Error validating branch: {str(e)}'
print(f"[ERROR] {error_msg}")
return jsonify({
'success': False,
'error': error_msg,
'output': f'Exception: {str(e)}'
})
@jobs_bp.route('/submit/step2', methods=['POST'])
@login_required
def submit_step2():
branch_name = request.form.get('branch_name')
scenario_map_json = request.form.get('scenario_map')
selected_scenarios_json = request.form.get('selected_scenarios')
try:
scenario_map = json.loads(scenario_map_json) if scenario_map_json else {}
selected_scenarios = json.loads(selected_scenarios_json) if selected_scenarios_json else []
except json.JSONDecodeError as e:
print(f"[ERROR] JSON Decode Error in submit_step2: {e}")
print(f"[DEBUG] scenario_map_json: {scenario_map_json}")
print(f"[DEBUG] selected_scenarios_json: {selected_scenarios_json}")
flash('Invalid scenario data', 'error')
return redirect(url_for('jobs.submit'))
if not selected_scenarios:
flash('Please select at least one scenario', 'error')
return redirect(url_for('jobs.submit'))
return render_template('jobs/submit_review.html',
branch_name=branch_name,
scenarios=selected_scenarios,
scenario_map=scenario_map)
@jobs_bp.route('/submit/step2-validated', methods=['POST'])
@login_required
def submit_step2_validated():
try:
branch_name = request.form.get('branch_name')
organized_data_json = request.form.get('organized_data')
scenario_map_json = request.form.get('scenario_map')
print(f"[DEBUG] Step2 - Branch: {branch_name}")
print(f"[DEBUG] Step2 - Organized data length: {len(organized_data_json) if organized_data_json else 0}")
print(f"[DEBUG] Step2 - Scenario map length: {len(scenario_map_json) if scenario_map_json else 0}")
if not branch_name:
print("[ERROR] Step2 - No branch name provided")
flash('Branch name is required', 'error')
return redirect(url_for('jobs.submit'))
try:
organized_data = json.loads(organized_data_json) if organized_data_json else {}
scenario_map = json.loads(scenario_map_json) if scenario_map_json else {}
print(f"[DEBUG] Step2 - Parsed organized_data keys: {list(organized_data.keys())}")
print(f"[DEBUG] Step2 - Parsed scenario_map count: {len(scenario_map)}")
except json.JSONDecodeError as e:
print(f"[ERROR] Step2 - JSON decode error: {e}")
print(f"[DEBUG] organized_data_json: {organized_data_json}")
print(f"[DEBUG] scenario_map_json: {scenario_map_json}")
flash('Invalid scenario data', 'error')
return redirect(url_for('jobs.submit'))
print(f"[DEBUG] Step2 - Rendering template with {len(organized_data)} layers")
return render_template('jobs/submit_step2.html',
branch_name=branch_name,
organized_data=organized_data,
scenario_map=scenario_map)
except Exception as e:
print(f"[ERROR] Step2 - Unexpected error: {str(e)}")
print(f"[ERROR] Step2 - Error type: {type(e).__name__}")
import traceback
print(f"[ERROR] Step2 - Traceback: {traceback.format_exc()}")
flash(f'Error loading scenario selection: {str(e)}', 'error')
return redirect(url_for('jobs.submit'))
@jobs_bp.route('/submit/final', methods=['POST'])
@login_required
def submit_final():
branch_name = request.form.get('branch_name')
scenarios_json = request.form.get('scenarios')
scenario_map_json = request.form.get('scenario_map')
environment = request.form.get('environment', 'staging')
test_mode = request.form.get('test_mode', 'simulator')
try:
scenarios = json.loads(scenarios_json) if scenarios_json else []
scenario_map = json.loads(scenario_map_json) if scenario_map_json else {}
except json.JSONDecodeError as e:
print(f"[ERROR] JSON Decode Error in submit_final: {e}")
flash('Invalid scenario data', 'error')
return redirect(url_for('jobs.submit'))
# Create Job record first to get the ID
job = Job(
user_id=current_user.id,
branch_name=branch_name,
scenarios=json.dumps(scenarios),
environment=environment,
test_mode=test_mode,
status='waiting'
)
db.session.add(job)
db.session.commit()
# Use Job ID as Remote Queue ID
remote_queue_id = str(job.id)
remote_task_ids = {s: f"{job.id}_{i+1}" for i, s in enumerate(scenarios)}
# Update job with remote IDs
job.remote_queue_id = remote_queue_id
job.remote_task_ids = json.dumps(remote_task_ids)
db.session.commit()
# Prepare Remote Payload
# Format: {"source": "branch", "345": ["staging", {"task1": "path"}]}
payload = {
"source": branch_name,
remote_queue_id: [
environment,
{remote_task_ids[s]: scenario_map.get(s, s) for s in scenarios}
]
}
# Trigger Remote Queue
try:
remote_url = "http://asf-server.duckdns.org:8080/api/queue"
response = requests.post(remote_url, json=payload, timeout=10)
response.raise_for_status()
print(f"[DEBUG] Remote queue triggered: {response.json()}")
except Exception as e:
print(f"[ERROR] Failed to trigger remote queue: {e}")
flash(f'Warning: Job saved but failed to trigger remote queue: {str(e)}', 'warning')
flash('Test job submitted successfully', 'success')
return redirect(url_for('dashboard.index'))
@jobs_bp.route('/<int:job_id>')
@login_required
def view_job(job_id):
job = Job.query.get_or_404(job_id)
if not current_user.is_admin and job.user_id != current_user.id:
flash('Access denied', 'error')
return redirect(url_for('dashboard.index'))
return jsonify({
'id': job.id,
'submitter': job.submitter.username,
'branch_name': job.branch_name,
'scenarios': job.scenarios,
'environment': job.environment,
'test_mode': job.test_mode,
'status': job.status,
'submitted_at': job.submitted_at.strftime('%Y-%m-%d %H:%M:%S'),
'completed_at': job.completed_at.strftime('%Y-%m-%d %H:%M:%S') if job.completed_at else None,
'duration': job.duration,
'remote_queue_id': job.remote_queue_id,
'remote_task_ids': job.remote_task_ids,
'remote_results': job.remote_results,
'queue_log': job.queue_log
})
@jobs_bp.route('/<int:job_id>/abort', methods=['POST'])
@login_required
def abort_job(job_id):
job = Job.query.get_or_404(job_id)
if not current_user.is_admin and job.user_id != current_user.id:
return jsonify({'error': 'Access denied'}), 403
try:
remote_url = f"http://asf-server.duckdns.org:8080/api/abort/{job.remote_queue_id}"
response = requests.post(remote_url, timeout=10)
response.raise_for_status()
job.status = 'aborted'
db.session.commit()
return jsonify({'success': True, 'message': 'Job aborted successfully'})
except Exception as e:
print(f"[ERROR] Failed to abort remote job: {e}")
return jsonify({'error': f'Failed to abort remote job: {str(e)}'}), 500
@jobs_bp.route('/<int:job_id>/delete', methods=['POST', 'DELETE'])
@login_required
def delete_job(job_id):
job = Job.query.get_or_404(job_id)
if not current_user.is_admin and job.user_id != current_user.id:
return jsonify({'error': 'Access denied'}), 403
try:
# 1. Try to delete from remote
remote_url = f"http://asf-server.duckdns.org:8080/api/delete/{job.remote_queue_id}"
try:
response = requests.delete(remote_url, timeout=10)
# We don't necessarily fail if remote delete fails (e.g. already gone)
print(f"[DEBUG] Remote delete response: {response.status_code}")
except Exception as re:
print(f"[WARNING] Remote delete failed: {re}")
# 2. Delete from local DB
db.session.delete(job)
db.session.commit()
return jsonify({'success': True, 'message': 'Job deleted successfully'})
except Exception as e:
print(f"[ERROR] Failed to delete job: {e}")
return jsonify({'error': f'Failed to delete job: {str(e)}'}), 500
def update_job_status_internal(job):
"""Internal function to poll remote status and update local job record."""
if job.status in ['passed', 'failed', 'aborted']:
return
try:
# 1. Check Queue Status
status_url = f"http://asf-server.duckdns.org:8080/api/status/{job.remote_queue_id}"
q_resp = requests.get(status_url, timeout=5)
if q_resp.status_code == 200:
q_data = q_resp.json()
remote_status = q_data.get('status', '').lower()
if remote_status == 'running':
job.status = 'in_progress'
elif remote_status == 'waiting':
job.status = 'waiting'
elif remote_status == 'aborted':
job.status = 'aborted'
# 2. Fetch Queue Logs if running or finished
if remote_status in ['running', 'finished', 'aborted']:
log_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/queue_log.txt"
l_resp = requests.get(log_url, timeout=5)
if l_resp.status_code == 200:
job.queue_log = l_resp.text
# 3. Check Task Statuses and Results
task_ids = json.loads(job.remote_task_ids) if job.remote_task_ids else {}
results = json.loads(job.remote_results) if job.remote_results else {}
all_finished = True
any_failed = False
if not task_ids:
all_finished = False # Should not happen if job is valid
for scenario, task_id in task_ids.items():
# If we don't have result yet, check it
if scenario not in results:
# Check task status
t_status_url = f"http://asf-server.duckdns.org:8080/api/status/{task_id}"
t_resp = requests.get(t_status_url, timeout=2)
if t_resp.status_code == 200:
t_data = t_resp.json()
if t_data.get('status') == 'Finished':
# Fetch results
res_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/{task_id}/final_summary.json"
r_resp = requests.get(res_url, timeout=2)
if r_resp.status_code == 200:
r_data = r_resp.json()
for key, val in r_data.items():
if key.upper() == scenario.upper():
# Construct standardized execution report link
report_link = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/{task_id}/execution_report.html"
# val[0] is status (PASS/FAIL/ERROR), val[1] was the old link
results[scenario] = [val[0], report_link]
if val[0] in ['FAIL', 'ERROR']:
any_failed = True
break
else:
all_finished = False
elif t_data.get('status') == 'Aborted':
results[scenario] = ['ABORTED', '#']
all_finished = True
elif t_data.get('status') == 'Error':
results[scenario] = ['ERROR', '#']
any_failed = True
all_finished = True
else:
all_finished = False
else:
all_finished = False
else:
if results[scenario][0] in ['FAIL', 'ERROR']:
any_failed = True
if all_finished and task_ids:
# If any task has FAIL or ERROR, the job status is 'failed'
job.status = 'failed' if any_failed else 'passed'
from app import db
job.completed_at = db.session.query(db.func.now()).scalar()
job.remote_results = json.dumps(results)
from app import db
db.session.commit()
except Exception as e:
print(f"[ERROR] Status polling failed for job {job.id}: {e}")
@jobs_bp.route('/<int:job_id>/status')
@login_required
def get_job_status(job_id):
job = Job.query.get_or_404(job_id)
update_job_status_internal(job)
return jsonify({
'status': job.status,
'status_icon': job.get_status_icon(),
'remote_results': job.remote_results,
'queue_log': job.queue_log
})