Files
ASF_tools_legacy/asf-cloud-server/testarena_1/app/routes/jobs.py

288 lines
11 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
from flask_login import login_required, current_user
from app.models import Job
from app import db
import json
import subprocess
import os
jobs_bp = Blueprint('jobs', __name__, url_prefix='/jobs')
@jobs_bp.route('/submit')
@login_required
def submit():
return render_template('jobs/submit.html')
@jobs_bp.route('/debug/ssh-test')
@login_required
def debug_ssh_test():
"""Debug endpoint to test SSH connectivity"""
if not current_user.is_admin:
return jsonify({'error': 'Admin access required'}), 403
try:
ssh_password = os.environ.get('SSH_PASSWORD', 'default_password')
ssh_host = os.environ.get('SSH_HOST', 'remote_host')
ssh_user = os.environ.get('SSH_USER', 'asf')
ssh_port = os.environ.get('SSH_PORT', '22')
# SSH options for better connectivity
ssh_options = f"-p {ssh_port} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
# Test basic SSH connectivity
test_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} 'echo SSH_CONNECTION_SUCCESS'"
result = subprocess.run(test_cmd, shell=True, capture_output=True, text=True, timeout=30)
return jsonify({
'ssh_config': {
'user': ssh_user,
'host': ssh_host,
'port': ssh_port,
'password_set': bool(ssh_password and ssh_password != 'default_password')
},
'command_executed': test_cmd.replace(ssh_password, '***'),
'test_result': {
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
},
'success': result.returncode == 0 and 'SSH_CONNECTION_SUCCESS' in result.stdout
})
except Exception as e:
return jsonify({
'error': str(e),
'success': False
})
@jobs_bp.route('/submit/step1', methods=['POST'])
@login_required
def submit_step1():
branch_name = request.form.get('branch_name')
if not branch_name:
return jsonify({
'success': False,
'error': 'Branch name is required',
'output': ''
})
# Validate branch exists on remote
try:
# Get SSH configuration from environment variables
ssh_password = os.environ.get('SSH_PASSWORD', 'default_password')
ssh_host = os.environ.get('SSH_HOST', 'remote_host')
ssh_user = os.environ.get('SSH_USER', 'asf')
ssh_port = os.environ.get('SSH_PORT', '22')
print(f"[DEBUG] Starting branch validation for: {branch_name}")
print(f"[DEBUG] SSH Config - User: {ssh_user}, Host: {ssh_host}, Port: {ssh_port}")
# SSH options for better connectivity
ssh_options = f"-p {ssh_port} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
# First, clone the repository
clone_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh clone'"
print(f"[DEBUG] Executing clone command: {clone_cmd.replace(ssh_password, '***')}")
clone_result = subprocess.run(clone_cmd, shell=True, capture_output=True, text=True, timeout=60)
print(f"[DEBUG] Clone result - Return code: {clone_result.returncode}")
print(f"[DEBUG] Clone stdout: {clone_result.stdout}")
print(f"[DEBUG] Clone stderr: {clone_result.stderr}")
# Then, checkout the branch
checkout_cmd = f"sshpass -p '{ssh_password}' ssh {ssh_options} {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh checkout {branch_name}'"
print(f"[DEBUG] Executing checkout command: {checkout_cmd.replace(ssh_password, '***')}")
checkout_result = subprocess.run(checkout_cmd, shell=True, capture_output=True, text=True, timeout=60)
print(f"[DEBUG] Checkout result - Return code: {checkout_result.returncode}")
print(f"[DEBUG] Checkout stdout: {checkout_result.stdout}")
print(f"[DEBUG] Checkout stderr: {checkout_result.stderr}")
# Combine all output for analysis
full_output = f"CLONE OUTPUT:\n{clone_result.stdout}\n{clone_result.stderr}\n\nCHECKOUT OUTPUT:\n{checkout_result.stdout}\n{checkout_result.stderr}"
# Check if checkout was successful
# Look for "fatal:" in the output which indicates failure
checkout_failed = (
"fatal:" in checkout_result.stdout.lower() or
"fatal:" in checkout_result.stderr.lower() or
checkout_result.returncode != 0
)
print(f"[DEBUG] Checkout failed: {checkout_failed}")
if checkout_failed:
error_msg = f'Branch "{branch_name}" not found on remote. Please push the branch first.'
# Try to extract more specific error from output
if "fatal:" in checkout_result.stdout:
fatal_line = [line for line in checkout_result.stdout.split('\n') if 'fatal:' in line.lower()]
if fatal_line:
error_msg += f" Error: {fatal_line[0].strip()}"
elif "fatal:" in checkout_result.stderr:
fatal_line = [line for line in checkout_result.stderr.split('\n') if 'fatal:' in line.lower()]
if fatal_line:
error_msg += f" Error: {fatal_line[0].strip()}"
return jsonify({
'success': False,
'error': error_msg,
'output': full_output,
'debug': {
'clone_returncode': clone_result.returncode,
'checkout_returncode': checkout_result.returncode,
'branch_name': branch_name
}
})
# If successful, get available scenarios (mock for now)
scenarios = [
'Scenario_1_Basic_Test',
'Scenario_2_Advanced_Test',
'Scenario_3_Integration_Test',
'Scenario_4_Performance_Test',
'Scenario_5_Security_Test'
]
print(f"[DEBUG] Branch validation successful for: {branch_name}")
return jsonify({
'success': True,
'scenarios': scenarios,
'message': f'Branch "{branch_name}" validated successfully',
'output': full_output,
'debug': {
'clone_returncode': clone_result.returncode,
'checkout_returncode': checkout_result.returncode,
'branch_name': branch_name
}
})
except subprocess.TimeoutExpired:
error_msg = 'Branch validation timed out. Please try again.'
print(f"[ERROR] {error_msg}")
return jsonify({
'success': False,
'error': error_msg,
'output': 'Command timed out after 60 seconds'
})
except Exception as e:
error_msg = f'Error validating branch: {str(e)}'
print(f"[ERROR] {error_msg}")
return jsonify({
'success': False,
'error': error_msg,
'output': f'Exception: {str(e)}'
})
@jobs_bp.route('/submit/step2-validated', methods=['POST'])
@login_required
def submit_step2_validated():
branch_name = request.form.get('branch_name')
scenarios_json = request.form.get('scenarios')
try:
scenarios = json.loads(scenarios_json)
except:
flash('Invalid scenarios data', 'error')
return redirect(url_for('jobs.submit'))
return render_template('jobs/submit_step2.html', branch_name=branch_name, scenarios=scenarios)
@login_required
def submit_step2():
branch_name = request.form.get('branch_name')
selected_scenarios = request.form.getlist('scenarios')
if not selected_scenarios:
flash('Please select at least one scenario', 'error')
return redirect(url_for('jobs.submit'))
return render_template('jobs/submit_step3.html',
branch_name=branch_name,
scenarios=selected_scenarios)
@jobs_bp.route('/submit/step3', methods=['POST'])
@login_required
def submit_step3():
branch_name = request.form.get('branch_name')
scenarios = request.form.get('scenarios')
environment = request.form.get('environment')
return render_template('jobs/submit_step4.html',
branch_name=branch_name,
scenarios=scenarios,
environment=environment)
@jobs_bp.route('/submit/final', methods=['POST'])
@login_required
def submit_final():
branch_name = request.form.get('branch_name')
scenarios = request.form.get('scenarios')
environment = request.form.get('environment')
test_mode = request.form.get('test_mode')
keep_devbenches = request.form.get('keep_devbenches') == 'on'
reuse_results = request.form.get('reuse_results') == 'on'
job = Job(
user_id=current_user.id,
branch_name=branch_name,
scenarios=scenarios,
environment=environment,
test_mode=test_mode,
keep_devbenches=keep_devbenches,
reuse_results=reuse_results,
status='in_progress'
)
db.session.add(job)
db.session.commit()
# TODO: Start test execution in background
flash('Test job submitted successfully', 'success')
return redirect(url_for('dashboard.index'))
@jobs_bp.route('/<int:job_id>')
@login_required
def view_job(job_id):
job = Job.query.get_or_404(job_id)
if not current_user.is_admin and job.user_id != current_user.id:
flash('Access denied', 'error')
return redirect(url_for('dashboard.index'))
return jsonify({
'id': job.id,
'submitter': job.submitter.username,
'branch_name': job.branch_name,
'scenarios': job.scenarios,
'environment': job.environment,
'test_mode': job.test_mode,
'status': job.status,
'submitted_at': job.submitted_at.strftime('%Y-%m-%d %H:%M:%S'),
'completed_at': job.completed_at.strftime('%Y-%m-%d %H:%M:%S') if job.completed_at else None,
'duration': job.duration,
'keep_devbenches': job.keep_devbenches,
'reuse_results': job.reuse_results,
'results_path': job.results_path
})
@jobs_bp.route('/<int:job_id>/abort', methods=['POST'])
@login_required
def abort_job(job_id):
job = Job.query.get_or_404(job_id)
if not current_user.is_admin and job.user_id != current_user.id:
return jsonify({'error': 'Access denied'}), 403
if job.status == 'in_progress':
job.status = 'aborted'
db.session.commit()
# TODO: Kill the running process
return jsonify({'success': True})
return jsonify({'error': 'Job is not in progress'}), 400