Files
ASF_tools_legacy/asf-cloud-server/testarena_1/app/routes/jobs.py

183 lines
6.4 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
from flask_login import login_required, current_user
from app.models import Job
from app import db
import json
import subprocess
import os
jobs_bp = Blueprint('jobs', __name__, url_prefix='/jobs')
@jobs_bp.route('/submit')
@login_required
def submit():
return render_template('jobs/submit.html')
@jobs_bp.route('/submit/step1', methods=['POST'])
@login_required
def submit_step1():
branch_name = request.form.get('branch_name')
if not branch_name:
flash('Branch name is required', 'error')
return redirect(url_for('jobs.submit'))
# Validate branch exists on remote
try:
# Get SSH password from environment variable
ssh_password = os.environ.get('SSH_PASSWORD', 'default_password')
ssh_host = os.environ.get('SSH_HOST', 'remote_host')
ssh_user = os.environ.get('SSH_USER', 'asf')
# First, clone the repository
clone_cmd = f"sshpass -p '{ssh_password}' ssh -o StrictHostKeyChecking=no {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh clone'"
clone_result = subprocess.run(clone_cmd, shell=True, capture_output=True, text=True, timeout=60)
# Then, checkout the branch
checkout_cmd = f"sshpass -p '{ssh_password}' ssh -o StrictHostKeyChecking=no {ssh_user}@{ssh_host} './TPF/gitea_repo_controller.sh checkout {branch_name}'"
checkout_result = subprocess.run(checkout_cmd, shell=True, capture_output=True, text=True, timeout=60)
# Check if checkout was successful (no "fatal:" in output)
if "fatal:" in checkout_result.stdout or "fatal:" in checkout_result.stderr or checkout_result.returncode != 0:
return jsonify({
'success': False,
'error': f'Branch "{branch_name}" not found on remote. Please push the branch first.',
'output': checkout_result.stdout + checkout_result.stderr
})
# If successful, get available scenarios (mock for now)
scenarios = [
'Scenario_1_Basic_Test',
'Scenario_2_Advanced_Test',
'Scenario_3_Integration_Test',
'Scenario_4_Performance_Test',
'Scenario_5_Security_Test'
]
return jsonify({
'success': True,
'scenarios': scenarios,
'message': f'Branch "{branch_name}" validated successfully'
})
except subprocess.TimeoutExpired:
return jsonify({
'success': False,
'error': 'Branch validation timed out. Please try again.',
'output': ''
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Error validating branch: {str(e)}',
'output': ''
})
@jobs_bp.route('/submit/step2-validated', methods=['POST'])
@login_required
def submit_step2_validated():
branch_name = request.form.get('branch_name')
scenarios_json = request.form.get('scenarios')
try:
scenarios = json.loads(scenarios_json)
except:
flash('Invalid scenarios data', 'error')
return redirect(url_for('jobs.submit'))
return render_template('jobs/submit_step2.html', branch_name=branch_name, scenarios=scenarios)
@login_required
def submit_step2():
branch_name = request.form.get('branch_name')
selected_scenarios = request.form.getlist('scenarios')
if not selected_scenarios:
flash('Please select at least one scenario', 'error')
return redirect(url_for('jobs.submit'))
return render_template('jobs/submit_step3.html',
branch_name=branch_name,
scenarios=selected_scenarios)
@jobs_bp.route('/submit/step3', methods=['POST'])
@login_required
def submit_step3():
branch_name = request.form.get('branch_name')
scenarios = request.form.get('scenarios')
environment = request.form.get('environment')
return render_template('jobs/submit_step4.html',
branch_name=branch_name,
scenarios=scenarios,
environment=environment)
@jobs_bp.route('/submit/final', methods=['POST'])
@login_required
def submit_final():
branch_name = request.form.get('branch_name')
scenarios = request.form.get('scenarios')
environment = request.form.get('environment')
test_mode = request.form.get('test_mode')
keep_devbenches = request.form.get('keep_devbenches') == 'on'
reuse_results = request.form.get('reuse_results') == 'on'
job = Job(
user_id=current_user.id,
branch_name=branch_name,
scenarios=scenarios,
environment=environment,
test_mode=test_mode,
keep_devbenches=keep_devbenches,
reuse_results=reuse_results,
status='in_progress'
)
db.session.add(job)
db.session.commit()
# TODO: Start test execution in background
flash('Test job submitted successfully', 'success')
return redirect(url_for('dashboard.index'))
@jobs_bp.route('/<int:job_id>')
@login_required
def view_job(job_id):
job = Job.query.get_or_404(job_id)
if not current_user.is_admin and job.user_id != current_user.id:
flash('Access denied', 'error')
return redirect(url_for('dashboard.index'))
return jsonify({
'id': job.id,
'submitter': job.submitter.username,
'branch_name': job.branch_name,
'scenarios': job.scenarios,
'environment': job.environment,
'test_mode': job.test_mode,
'status': job.status,
'submitted_at': job.submitted_at.strftime('%Y-%m-%d %H:%M:%S'),
'completed_at': job.completed_at.strftime('%Y-%m-%d %H:%M:%S') if job.completed_at else None,
'duration': job.duration,
'keep_devbenches': job.keep_devbenches,
'reuse_results': job.reuse_results,
'results_path': job.results_path
})
@jobs_bp.route('/<int:job_id>/abort', methods=['POST'])
@login_required
def abort_job(job_id):
job = Job.query.get_or_404(job_id)
if not current_user.is_admin and job.user_id != current_user.id:
return jsonify({'error': 'Access denied'}), 403
if job.status == 'in_progress':
job.status = 'aborted'
db.session.commit()
# TODO: Kill the running process
return jsonify({'success': True})
return jsonify({'error': 'Job is not in progress'}), 400