from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify from flask_login import login_required, current_user from app.models import Job from app import db import json import subprocess import os import requests import random import string import traceback def generate_remote_id(length=6): return ''.join(random.choices(string.digits, k=length)) jobs_bp = Blueprint('jobs', __name__, url_prefix='/jobs') @jobs_bp.route('/submit') @login_required def submit(): return render_template('jobs/submit.html') @jobs_bp.route('/debug/test-step2') @login_required def debug_test_step2(): """Debug endpoint to test step2 template with mock data""" if not current_user.is_admin: return jsonify({'error': 'Admin access required'}), 403 # Mock data similar to what we expect from scenario scan organized_data = { "drivers": { "diag_protocol_stack": ["diag_protocol_stack_init_test"] }, "application_layer": { "data_pool": ["data_pool_init_test"], "actuator_manager": ["actuator_manager_init_test2", "actuator_manager_init_test"], "event_system": ["event_system_init_test"] } } scenario_map = { "diag_protocol_stack_init_test": "drivers/diag_protocol_stack/test/diag_protocol_stack_init_test.test_scenario.xml", "data_pool_init_test": "application_layer/DP_stack/data_pool/test/data_pool_init_test.test_scenario.xml", "actuator_manager_init_test2": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test2.test_scenario.xml", "actuator_manager_init_test": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test.test_scenario.xml", "event_system_init_test": "application_layer/business_stack/event_system/test/event_system_init_test.test_scenario.xml" } try: return render_template('jobs/submit_step2.html', branch_name='test_branch', organized_data=organized_data, scenario_map=scenario_map) except Exception as e: return jsonify({ 'error': str(e), 'type': type(e).__name__, 'template_exists': True }) @jobs_bp.route('/debug/ssh-test') @login_required def debug_ssh_test(): """Debug endpoint to test SSH connectivity""" if not current_user.is_admin: return jsonify({'error': 'Admin access required'}), 403 try: ssh_password = os.environ.get('SSH_PASSWORD', 'default_password') ssh_host = os.environ.get('SSH_HOST', 'remote_host') ssh_user = os.environ.get('SSH_USER', 'asf') ssh_port = os.environ.get('SSH_PORT', '22') # SSH options for better connectivity ssh_options = f"-p {ssh_port} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" # Test basic SSH connectivity test_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} 'echo SSH_CONNECTION_SUCCESS'" result = subprocess.run(test_cmd, shell=True, capture_output=True, text=True, timeout=30) return jsonify({ 'ssh_config': { 'user': ssh_user, 'host': ssh_host, 'port': ssh_port, 'password_set': bool(ssh_password and ssh_password != 'default_password') }, 'command_executed': test_cmd.replace(ssh_password, '***'), 'test_result': { 'returncode': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr }, 'success': result.returncode == 0 and 'SSH_CONNECTION_SUCCESS' in result.stdout }) except Exception as e: return jsonify({ 'error': str(e), 'success': False }) @jobs_bp.route('/submit/step1', methods=['POST']) @login_required def submit_step1(): branch_name = request.form.get('branch_name') if not branch_name: return jsonify({ 'success': False, 'error': 'Branch name is required', 'output': '' }) # Validate branch exists on remote try: # Get SSH configuration from environment variables ssh_password = os.environ.get('SSH_PASSWORD', 'default_password') ssh_host = os.environ.get('SSH_HOST', 'remote_host') ssh_user = os.environ.get('SSH_USER', 'asf') ssh_port = os.environ.get('SSH_PORT', '22') print(f"[DEBUG] Starting branch validation for: {branch_name}") print(f"[DEBUG] SSH Config - User: {ssh_user}, Host: {ssh_host}, Port: {ssh_port}") # SSH options for better connectivity ssh_options = f"-p {ssh_port} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null" # First, clone the repository clone_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh clone'" print(f"[DEBUG] Executing clone command: {clone_cmd.replace(ssh_password, '***')}") clone_result = subprocess.run(clone_cmd, shell=True, capture_output=True, text=True, timeout=60) print(f"[DEBUG] Clone result - Return code: {clone_result.returncode}") print(f"[DEBUG] Clone stdout: {clone_result.stdout}") print(f"[DEBUG] Clone stderr: {clone_result.stderr}") # Then, checkout the branch checkout_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh checkout {branch_name}'" print(f"[DEBUG] Executing checkout command: {checkout_cmd.replace(ssh_password, '***')}") checkout_result = subprocess.run(checkout_cmd, shell=True, capture_output=True, text=True, timeout=60) print(f"[DEBUG] Checkout result - Return code: {checkout_result.returncode}") print(f"[DEBUG] Checkout stdout: {checkout_result.stdout}") print(f"[DEBUG] Checkout stderr: {checkout_result.stderr}") # Combine all output for analysis full_output = f"CLONE OUTPUT:\n{clone_result.stdout}\n{clone_result.stderr}\n\nCHECKOUT OUTPUT:\n{checkout_result.stdout}\n{checkout_result.stderr}" # Check if checkout was successful # Look for "fatal:" in the output which indicates failure checkout_failed = ( "fatal:" in checkout_result.stdout.lower() or "fatal:" in checkout_result.stderr.lower() or checkout_result.returncode != 0 ) print(f"[DEBUG] Checkout failed: {checkout_failed}") if checkout_failed: error_msg = f'Branch "{branch_name}" not found on remote. Please push the branch first.' # Try to extract more specific error from output if "fatal:" in checkout_result.stdout: fatal_line = [line for line in checkout_result.stdout.split('\n') if 'fatal:' in line.lower()] if fatal_line: error_msg += f" Error: {fatal_line[0].strip()}" elif "fatal:" in checkout_result.stderr: fatal_line = [line for line in checkout_result.stderr.split('\n') if 'fatal:' in line.lower()] if fatal_line: error_msg += f" Error: {fatal_line[0].strip()}" return jsonify({ 'success': False, 'error': error_msg, 'output': full_output, 'debug': { 'clone_returncode': clone_result.returncode, 'checkout_returncode': checkout_result.returncode, 'branch_name': branch_name } }) # If successful, run scenario scan to get available scenarios print(f"[DEBUG] Running scenario scan...") scan_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} 'python3 TPF/Sensor_hub_repo/Tools/TPF/scenario_scan.py'" print(f"[DEBUG] Executing scan command: {scan_cmd.replace(ssh_password, '***')}") scan_result = subprocess.run(scan_cmd, shell=True, capture_output=True, text=True, timeout=120) print(f"[DEBUG] Scan result - Return code: {scan_result.returncode}") print(f"[DEBUG] Scan stdout: {scan_result.stdout}") print(f"[DEBUG] Scan stderr: {scan_result.stderr}") # Parse the JSON response from scenario scan try: scan_data = json.loads(scan_result.stdout) organized_data = scan_data.get('organized_data', {}) scenario_map = scan_data.get('scenario_map', {}) print(f"[DEBUG] Found {len(scenario_map)} scenarios in {len(organized_data)} categories") except json.JSONDecodeError as e: print(f"[ERROR] Failed to parse scenario scan JSON: {e}") print(f"[ERROR] Raw output: {scan_result.stdout}") # Fallback to mock data if scan fails organized_data = { "drivers": { "diag_protocol_stack": ["diag_protocol_stack_init_test"] }, "application_layer": { "data_pool": ["data_pool_init_test"], "actuator_manager": ["actuator_manager_init_test2", "actuator_manager_init_test"], "event_system": ["event_system_init_test"] } } scenario_map = { "diag_protocol_stack_init_test": "drivers/diag_protocol_stack/test/diag_protocol_stack_init_test.test_scenario.xml", "data_pool_init_test": "application_layer/DP_stack/data_pool/test/data_pool_init_test.test_scenario.xml", "actuator_manager_init_test2": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test2.test_scenario.xml", "actuator_manager_init_test": "application_layer/business_stack/actuator_manager/test/actuator_manager_init_test.test_scenario.xml", "event_system_init_test": "application_layer/business_stack/event_system/test/event_system_init_test.test_scenario.xml" } print(f"[DEBUG] Branch validation successful for: {branch_name}") return jsonify({ 'success': True, 'organized_data': organized_data, 'scenario_map': scenario_map, 'message': f'Branch "{branch_name}" validated successfully', 'output': full_output, 'debug': { 'clone_returncode': clone_result.returncode, 'checkout_returncode': checkout_result.returncode, 'scan_returncode': scan_result.returncode if 'scan_result' in locals() else 'N/A', 'branch_name': branch_name, 'scenarios_found': len(scenario_map) } }) except subprocess.TimeoutExpired: error_msg = 'Branch validation timed out. Please try again.' print(f"[ERROR] {error_msg}") return jsonify({ 'success': False, 'error': error_msg, 'output': 'Command timed out after 60 seconds' }) except Exception as e: error_msg = f'Error validating branch: {str(e)}' print(f"[ERROR] {error_msg}") return jsonify({ 'success': False, 'error': error_msg, 'output': f'Exception: {str(e)}' }) @jobs_bp.route('/submit/step2', methods=['POST']) @login_required def submit_step2(): branch_name = request.form.get('branch_name') scenario_map_json = request.form.get('scenario_map') selected_scenarios_json = request.form.get('selected_scenarios') try: scenario_map = json.loads(scenario_map_json) if scenario_map_json else {} selected_scenarios = json.loads(selected_scenarios_json) if selected_scenarios_json else [] except json.JSONDecodeError as e: print(f"[ERROR] JSON Decode Error in submit_step2: {e}") print(f"[DEBUG] scenario_map_json: {scenario_map_json}") print(f"[DEBUG] selected_scenarios_json: {selected_scenarios_json}") flash('Invalid scenario data', 'error') return redirect(url_for('jobs.submit')) if not selected_scenarios: flash('Please select at least one scenario', 'error') return redirect(url_for('jobs.submit')) return render_template('jobs/submit_review.html', branch_name=branch_name, scenarios=selected_scenarios, scenario_map=scenario_map) @jobs_bp.route('/submit/step2-validated', methods=['POST']) @login_required def submit_step2_validated(): try: branch_name = request.form.get('branch_name') organized_data_json = request.form.get('organized_data') scenario_map_json = request.form.get('scenario_map') print(f"[DEBUG] Step2 - Branch: {branch_name}") print(f"[DEBUG] Step2 - Organized data length: {len(organized_data_json) if organized_data_json else 0}") print(f"[DEBUG] Step2 - Scenario map length: {len(scenario_map_json) if scenario_map_json else 0}") if not branch_name: print("[ERROR] Step2 - No branch name provided") flash('Branch name is required', 'error') return redirect(url_for('jobs.submit')) try: organized_data = json.loads(organized_data_json) if organized_data_json else {} scenario_map = json.loads(scenario_map_json) if scenario_map_json else {} print(f"[DEBUG] Step2 - Parsed organized_data keys: {list(organized_data.keys())}") print(f"[DEBUG] Step2 - Parsed scenario_map count: {len(scenario_map)}") except json.JSONDecodeError as e: print(f"[ERROR] Step2 - JSON decode error: {e}") print(f"[DEBUG] organized_data_json: {organized_data_json}") print(f"[DEBUG] scenario_map_json: {scenario_map_json}") flash('Invalid scenario data', 'error') return redirect(url_for('jobs.submit')) print(f"[DEBUG] Step2 - Rendering template with {len(organized_data)} layers") return render_template('jobs/submit_step2.html', branch_name=branch_name, organized_data=organized_data, scenario_map=scenario_map) except Exception as e: print(f"[ERROR] Step2 - Unexpected error: {str(e)}") print(f"[ERROR] Step2 - Error type: {type(e).__name__}") import traceback print(f"[ERROR] Step2 - Traceback: {traceback.format_exc()}") flash(f'Error loading scenario selection: {str(e)}', 'error') return redirect(url_for('jobs.submit')) @jobs_bp.route('/submit/final', methods=['POST']) @login_required def submit_final(): branch_name = request.form.get('branch_name') scenarios_json = request.form.get('scenarios') scenario_map_json = request.form.get('scenario_map') environment = request.form.get('environment', 'staging') test_mode = request.form.get('test_mode', 'simulator') try: scenarios = json.loads(scenarios_json) if scenarios_json else [] scenario_map = json.loads(scenario_map_json) if scenario_map_json else {} except json.JSONDecodeError as e: print(f"[ERROR] JSON Decode Error in submit_final: {e}") flash('Invalid scenario data', 'error') return redirect(url_for('jobs.submit')) # Create Job record first to get the ID job = Job( user_id=current_user.id, branch_name=branch_name, scenarios=json.dumps(scenarios), environment=environment, test_mode=test_mode, status='waiting' ) db.session.add(job) db.session.commit() # Use Job ID as Remote Queue ID remote_queue_id = str(job.id) remote_task_ids = {s: f"{job.id}_{i+1}" for i, s in enumerate(scenarios)} # Update job with remote IDs job.remote_queue_id = remote_queue_id job.remote_task_ids = json.dumps(remote_task_ids) db.session.commit() # Prepare Remote Payload # Format: {"source": "branch", "345": ["staging", {"task1": "path"}]} payload = { "source": branch_name, remote_queue_id: [ environment, {remote_task_ids[s]: scenario_map.get(s, s) for s in scenarios} ] } # Trigger Remote Queue try: remote_url = "http://asf-server.duckdns.org:8080/api/queue" response = requests.post(remote_url, json=payload, timeout=10) response.raise_for_status() print(f"[DEBUG] Remote queue triggered: {response.json()}") except Exception as e: print(f"[ERROR] Failed to trigger remote queue: {e}") flash(f'Warning: Job saved but failed to trigger remote queue: {str(e)}', 'warning') flash('Test job submitted successfully', 'success') return redirect(url_for('dashboard.index')) @jobs_bp.route('/') @login_required def view_job(job_id): job = Job.query.get_or_404(job_id) if not current_user.is_admin and job.user_id != current_user.id: flash('Access denied', 'error') return redirect(url_for('dashboard.index')) return jsonify({ 'id': job.id, 'submitter': job.submitter.username, 'branch_name': job.branch_name, 'scenarios': job.scenarios, 'environment': job.environment, 'test_mode': job.test_mode, 'status': job.status, 'submitted_at': job.submitted_at.strftime('%Y-%m-%d %H:%M:%S'), 'completed_at': job.completed_at.strftime('%Y-%m-%d %H:%M:%S') if job.completed_at else None, 'duration': job.duration, 'remote_queue_id': job.remote_queue_id, 'remote_task_ids': job.remote_task_ids, 'remote_results': job.remote_results, 'queue_log': job.queue_log }) @jobs_bp.route('//abort', methods=['POST']) @login_required def abort_job(job_id): job = Job.query.get_or_404(job_id) if not current_user.is_admin and job.user_id != current_user.id: return jsonify({'error': 'Access denied'}), 403 try: remote_url = f"http://asf-server.duckdns.org:8080/api/abort/{job.remote_queue_id}" response = requests.post(remote_url, timeout=10) response.raise_for_status() job.status = 'aborted' db.session.commit() return jsonify({'success': True, 'message': 'Job aborted successfully'}) except Exception as e: print(f"[ERROR] Failed to abort remote job: {e}") return jsonify({'error': f'Failed to abort remote job: {str(e)}'}), 500 @jobs_bp.route('//delete', methods=['POST', 'DELETE']) @login_required def delete_job(job_id): job = Job.query.get_or_404(job_id) if not current_user.is_admin and job.user_id != current_user.id: return jsonify({'error': 'Access denied'}), 403 try: # 1. Try to delete from remote remote_url = f"http://asf-server.duckdns.org:8080/api/delete/{job.remote_queue_id}" try: response = requests.delete(remote_url, timeout=10) # We don't necessarily fail if remote delete fails (e.g. already gone) print(f"[DEBUG] Remote delete response: {response.status_code}") except Exception as re: print(f"[WARNING] Remote delete failed: {re}") # 2. Delete from local DB db.session.delete(job) db.session.commit() return jsonify({'success': True, 'message': 'Job deleted successfully'}) except Exception as e: print(f"[ERROR] Failed to delete job: {e}") return jsonify({'error': f'Failed to delete job: {str(e)}'}), 500 def update_job_status_internal(job): """Internal function to poll remote status and update local job record.""" if job.status in ['passed', 'failed', 'aborted']: return try: # 1. Check Queue Status status_url = f"http://asf-server.duckdns.org:8080/api/status/{job.remote_queue_id}" q_resp = requests.get(status_url, timeout=5) if q_resp.status_code == 200: q_data = q_resp.json() remote_status = q_data.get('status', '').lower() if remote_status == 'running': job.status = 'in_progress' elif remote_status == 'waiting': job.status = 'waiting' elif remote_status == 'aborted': job.status = 'aborted' # 2. Fetch Queue Logs if running or finished if remote_status in ['running', 'finished', 'aborted']: log_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/queue_log.txt" l_resp = requests.get(log_url, timeout=5) if l_resp.status_code == 200: job.queue_log = l_resp.text # 3. Check Task Statuses and Results task_ids = json.loads(job.remote_task_ids) if job.remote_task_ids else {} results = json.loads(job.remote_results) if job.remote_results else {} all_finished = True any_failed = False if not task_ids: all_finished = False # Should not happen if job is valid for scenario, task_id in task_ids.items(): # If we don't have result yet, check it if scenario not in results: # Check task status t_status_url = f"http://asf-server.duckdns.org:8080/api/status/{task_id}" t_resp = requests.get(t_status_url, timeout=2) if t_resp.status_code == 200: t_data = t_resp.json() if t_data.get('status') == 'Finished': # Fetch results res_url = f"http://asf-server.duckdns.org:8080/results/{job.remote_queue_id}/{task_id}/final_summary.json" r_resp = requests.get(res_url, timeout=2) if r_resp.status_code == 200: r_data = r_resp.json() for key, val in r_data.items(): if key.upper() == scenario.upper(): # Transform file:/// link to web link if isinstance(val, list) and len(val) > 1 and val[1].startswith('file:///'): val[1] = val[1].replace('file:///home/asf/testarena/', 'http://asf-server.duckdns.org:8080/results/') results[scenario] = val if val[0] == 'FAIL': any_failed = True break else: all_finished = False elif t_data.get('status') == 'Aborted': results[scenario] = ['ABORTED', '#'] all_finished = True elif t_data.get('status') == 'Error': results[scenario] = ['ERROR', '#'] any_failed = True all_finished = True else: all_finished = False else: all_finished = False else: if results[scenario][0] in ['FAIL', 'ERROR']: any_failed = True if all_finished and task_ids: # If any task has ERROR, the job status could be 'error' or 'failed' # User said "mark it in the dash board as error" has_error = any(r[0] == 'ERROR' for r in results.values()) if has_error: job.status = 'error' else: job.status = 'failed' if any_failed else 'passed' from app import db job.completed_at = db.session.query(db.func.now()).scalar() job.remote_results = json.dumps(results) from app import db db.session.commit() except Exception as e: print(f"[ERROR] Status polling failed for job {job.id}: {e}") @jobs_bp.route('//status') @login_required def get_job_status(job_id): job = Job.query.get_or_404(job_id) update_job_status_internal(job) return jsonify({ 'status': job.status, 'status_icon': job.get_status_icon(), 'remote_results': job.remote_results, 'queue_log': job.queue_log })