from flask import Flask, render_template, request, jsonify, flash, redirect, url_for, session from flask_sqlalchemy import SQLAlchemy import requests from bs4 import BeautifulSoup import os from dotenv import load_dotenv import random import string import pymysql import re import hashlib from functools import wraps from config import AD_TEMPLATE, SERVER_INFO, VERIFICATION_CONFIG, NAME_TYPES from sqlalchemy import create_engine # 设置PyMySQL作为MySQL驱动 pymysql.install_as_MySQLdb() load_dotenv() app = Flask(__name__) # 设置 Flask secret key app.secret_key = os.getenv('SECRET_KEY', 'dev_key_123') # 在生产环境中使用环境变量设置 # 数据库配置 database_url = os.getenv('DATABASE_URL') app.config['SQLALCHEMY_DATABASE_URI'] = database_url app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 创建数据库连接 def create_database(): try: # 解析数据库URL url_parts = database_url.split('/') db_name = url_parts[-1] base_url = '/'.join(url_parts[:-1]) # 创建数据库引擎(不指定数据库名) engine = create_engine(base_url) # 连接并创建数据库 with engine.connect() as conn: conn.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}") print(f"数据库 {db_name} 创建成功") except Exception as e: print(f"创建数据库失败:{str(e)}") db = SQLAlchemy(app) # 定义CDK模型 class CDKCode(db.Model): __tablename__ = '兑换码' ID = db.Column(db.Integer, primary_key=True) 兑换码 = db.Column(db.String(20), unique=True, nullable=False) 奖励模板ID = db.Column(db.Integer, nullable=False) # 定义已使用的CDK记录 class UsedCDK(db.Model): __tablename__ = 'used_cdks' id = db.Column(db.Integer, primary_key=True) character_name = db.Column(db.String(50), nullable=False) post_url = db.Column(db.String(255), nullable=False) cdk = db.Column(db.String(20), nullable=False) used_at = db.Column(db.DateTime, server_default=db.func.now()) # 定义管理员模型 class Admin(db.Model): __tablename__ = 'admins' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(50), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) role = db.Column(db.String(20), default='user') # 'admin' 或 'user' created_at = db.Column(db.DateTime, server_default=db.func.now()) def set_password(self, password): self.password_hash = hashlib.sha256(password.encode()).hexdigest() def check_password(self, password): return self.password_hash == hashlib.sha256(password.encode()).hexdigest() def is_admin(self): return self.role == 'admin' # 定义数据库配置模型 class DatabaseConfig(db.Model): __tablename__ = 'database_configs' id = db.Column(db.Integer, primary_key=True) config_name = db.Column(db.String(50), nullable=False) config_value = db.Column(db.String(255), nullable=False) updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now()) # 管理员登录装饰器 def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'admin_id' not in session: flash('请先登录', 'danger') return redirect(url_for('admin_login')) # 检查是否为管理员 admin = Admin.query.get(session['admin_id']) if not admin or not admin.is_admin(): flash('您没有权限访问此页面', 'danger') return redirect(url_for('user_dashboard')) return f(*args, **kwargs) return decorated_function # 用户登录装饰器(包括管理员和普通用户) def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'admin_id' not in session: flash('请先登录', 'danger') return redirect(url_for('admin_login')) return f(*args, **kwargs) return decorated_function def verify_ad_content(content, character_name): # 清理内容中的空白字符 content = ' '.join(content.split()) # 构建预期的广告内容 expected_ad = AD_TEMPLATE.format( server_name=SERVER_INFO["server_name"], name_type=NAME_TYPES["character"], name=character_name, server_features=SERVER_INFO["server_features"], website=SERVER_INFO["website"] ) # 清理预期内容中的空白字符 expected_ad = ' '.join(expected_ad.split()) # 直接比较核心内容 core_content = re.sub(r'\s+', '', content) core_expected = re.sub(r'\s+', '', expected_ad) # 检查核心内容是否包含在发帖内容中 if core_expected in core_content: return True # 如果完全匹配失败,进行更宽松的验证 for keyword in VERIFICATION_CONFIG['required_keywords']: if keyword not in content: return False for pattern in VERIFICATION_CONFIG['required_patterns']: if not re.search(pattern, content, re.DOTALL): return False if character_name not in content: return False return True @app.route('/') def index(): return render_template('index.html') @app.route('/generate') @login_required def generate(): # 获取URL参数中的角色名 character_name = request.args.get('character_name') # 获取当前用户的游戏账号 user_id = session.get('admin_id') user = Admin.query.get(user_id) # 获取用户的角色列表 characters = [] conn = get_auth_db_connection() if conn: try: account_table = get_db_config('auth_account_table', 'account') # 获取与用户名匹配的游戏账号 query = f"SELECT id FROM {account_table} WHERE username = %s" result = conn.execute(query, (user.username,)) account_ids = [row[0] for row in result] if account_ids: char_conn = get_char_db_connection() if char_conn: try: char_table = get_db_config('char_table', 'characters') placeholders = ', '.join(['%s'] * len(account_ids)) query = f""" SELECT guid, account, name, level, race, class, gender FROM {char_table} WHERE account IN ({placeholders}) ORDER BY level DESC """ result = char_conn.execute(query, account_ids) characters = [dict(row) for row in result] # 添加种族和职业的文本描述 for character in characters: character['race_text'] = get_race_name(character['race']) character['class_text'] = get_class_name(character['class']) except Exception as e: flash(f'获取角色列表失败: {str(e)}', 'danger') finally: char_conn.close() except Exception as e: flash(f'获取游戏账号失败: {str(e)}', 'danger') finally: conn.close() # 如果没有角色,显示提示信息 if not characters: flash('您需要先在游戏中创建角色才能生成宣传内容', 'warning') return render_template('generate.html', server_info=SERVER_INFO, characters=characters, selected_character=character_name) @app.route('/create-ad', methods=['POST']) @login_required def create_ad(): character_name = request.form.get('character_name') if not character_name: return jsonify({'success': False, 'message': '请选择角色'}) # 验证角色是否属于当前用户 user_id = session.get('admin_id') user = Admin.query.get(user_id) # 获取用户的游戏账号ID account_ids = [] conn = get_auth_db_connection() if conn: try: account_table = get_db_config('auth_account_table', 'account') query = f"SELECT id FROM {account_table} WHERE username = %s" result = conn.execute(query, (user.username,)) account_ids = [row[0] for row in result] finally: conn.close() # 验证角色是否属于用户 if account_ids: char_conn = get_char_db_connection() if char_conn: try: char_table = get_db_config('char_table', 'characters') placeholders = ', '.join(['%s'] * len(account_ids)) query = f""" SELECT guid, account, name, level, race, class FROM {char_table} WHERE name = %s AND account IN ({placeholders}) """ params = [character_name] + account_ids result = char_conn.execute(query, params) character = result.fetchone() if not character: return jsonify({'success': False, 'message': '您选择的角色不存在或不属于您'}) # 获取角色的种族和职业文本 race_text = get_race_name(character['race']) class_text = get_class_name(character['class']) # 生成宣传内容 ad_content = AD_TEMPLATE.format( server_name=SERVER_INFO['server_name'], name_type=f"推荐{race_text}{class_text}", name=character_name, server_features=SERVER_INFO['server_features'], website=SERVER_INFO['website'] ) return jsonify({ 'success': True, 'ad_content': ad_content }) finally: char_conn.close() return jsonify({'success': False, 'message': '无法验证角色信息'}) @app.route('/verify', methods=['POST']) def verify_post(): character_name = request.form.get('character_name') post_url = request.form.get('post_url') if not character_name or not post_url: return jsonify({'success': False, 'message': '请填写所有必要信息'}) # 检查该角色是否已经领取过CDK used = UsedCDK.query.filter_by(character_name=character_name).first() if used: return jsonify({'success': False, 'message': '该角色已经领取过CDK'}) try: # 获取帖子内容 response = requests.get(post_url) soup = BeautifulSoup(response.text, 'html.parser') # 尝试多种方式提取文本 post_content = soup.get_text() # 打印调试信息 print(f"URL: {post_url}") print(f"获取到的内容长度: {len(post_content)}") print(f"内容预览: {post_content[:200]}...") # 验证广告内容 if not verify_ad_content(post_content, character_name): # 尝试其他方法获取内容 # 例如尝试获取特定的div或段落 post_divs = soup.find_all(['div', 'p', 'article', 'section']) # 遍历所有可能的内容区块 for div in post_divs: div_text = div.get_text() if len(div_text) > 100: # 只检查长度合理的文本块 print(f"尝试验证区块: {div_text[:100]}...") if verify_ad_content(div_text, character_name): # 找到了有效的广告内容 post_content = div_text break else: # 如果所有区块都验证失败 return jsonify({'success': False, 'message': '未找到有效的宣传内容,请确保完整复制宣传模板或检查发帖网站兼容性'}) # 获取未使用的CDK cdk = CDKCode.query.filter( ~CDKCode.兑换码.in_( db.session.query(UsedCDK.cdk) ) ).first() if not cdk: return jsonify({'success': False, 'message': 'CDK已经用完,请联系管理员'}) # 记录使用情况 used_cdk = UsedCDK( character_name=character_name, post_url=post_url, cdk=cdk.兑换码 ) db.session.add(used_cdk) db.session.commit() return jsonify({ 'success': True, 'message': '验证成功!', 'cdk': cdk.兑换码 }) except Exception as e: print(f"验证过程中出错: {str(e)}") return jsonify({'success': False, 'message': f'验证失败:{str(e)}'}) @app.route('/redeem', methods=['GET', 'POST']) @login_required def redeem(): if request.method == 'POST': character_name = request.form.get('character_name') post_url = request.form.get('link') if not character_name or not post_url: flash('请填写所有必要信息', 'danger') return redirect(url_for('redeem')) # 验证角色是否属于当前用户 user_id = session.get('admin_id') user = Admin.query.get(user_id) # 获取用户的游戏账号ID account_ids = [] conn = get_auth_db_connection() if conn: try: account_table = get_db_config('auth_account_table', 'account') query = f"SELECT id FROM {account_table} WHERE username = %s" result = conn.execute(query, (user.username,)) account_ids = [row[0] for row in result] finally: conn.close() # 验证角色是否属于用户 character_exists = False if account_ids: char_conn = get_char_db_connection() if char_conn: try: char_table = get_db_config('char_table', 'characters') placeholders = ', '.join(['%s'] * len(account_ids)) query = f""" SELECT guid, account, name FROM {char_table} WHERE name = %s AND account IN ({placeholders}) """ params = [character_name] + account_ids result = char_conn.execute(query, params) character = result.fetchone() if character: character_exists = True finally: char_conn.close() if not character_exists: flash('您选择的角色不存在或不属于您', 'danger') return redirect(url_for('redeem')) # 检查该角色是否已经领取过CDK used = UsedCDK.query.filter_by(character_name=character_name).first() if used: flash(f'该角色已经领取过CDK: {used.cdk},领取时间: {used.used_at}', 'danger') return redirect(url_for('redeem')) # 获取帖子内容并验证 try: response = requests.get(post_url) soup = BeautifulSoup(response.text, 'html.parser') post_content = soup.get_text() # 验证广告内容 if not verify_ad_content(post_content, character_name): flash('未找到有效的宣传内容,请确保完整复制宣传模板2', 'danger') flash(post_content, 'danger') flash(character_name, 'danger') return redirect(url_for('redeem')) # 获取未使用的CDK cdk = CDKCode.query.filter( ~CDKCode.兑换码.in_( db.session.query(UsedCDK.cdk) ) ).first() if not cdk: flash('CDK已经用完,请联系管理员', 'danger') return redirect(url_for('redeem')) # 记录使用情况 used_cdk = UsedCDK( character_name=character_name, post_url=post_url, cdk=cdk.兑换码 ) db.session.add(used_cdk) db.session.commit() flash(f'恭喜!您的角色 {character_name} 已成功兑换CDK: {cdk.兑换码}', 'success') except Exception as e: flash(f'验证失败: {str(e)}', 'danger') return redirect(url_for('redeem')) # 获取当前用户的角色列表 characters = [] user_id = session.get('admin_id') user = Admin.query.get(user_id) conn = get_auth_db_connection() if conn: try: account_table = get_db_config('auth_account_table', 'account') query = f"SELECT id FROM {account_table} WHERE username = %s" result = conn.execute(query, (user.username,)) account_ids = [row[0] for row in result] if account_ids: char_conn = get_char_db_connection() if char_conn: try: char_table = get_db_config('char_table', 'characters') placeholders = ', '.join(['%s'] * len(account_ids)) query = f""" SELECT guid, account, name, level, race, class FROM {char_table} WHERE account IN ({placeholders}) ORDER BY level DESC """ result = char_conn.execute(query, account_ids) characters = [dict(row) for row in result] # 添加种族和职业的文本描述 for character in characters: character['race_text'] = get_race_name(character['race']) character['class_text'] = get_class_name(character['class']) # 检查角色是否已经兑换过CDK used = UsedCDK.query.filter_by(character_name=character['name']).first() character['has_cdk'] = used is not None if used: character['cdk'] = used.cdk character['redeem_time'] = used.used_at finally: char_conn.close() finally: conn.close() return render_template('redeem.html', server_info=SERVER_INFO, characters=characters) @app.route('/admin/login', methods=['GET', 'POST']) def admin_login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') admin = Admin.query.filter_by(username=username).first() if admin and admin.check_password(password): session['admin_id'] = admin.id session['admin_username'] = admin.username session['admin_role'] = admin.role flash('登录成功', 'success') # 根据角色重定向到不同的仪表盘 if admin.is_admin(): return redirect(url_for('admin_dashboard')) else: return redirect(url_for('user_dashboard')) else: flash('用户名或密码错误', 'danger') return render_template('admin_login.html') @app.route('/admin/logout') def admin_logout(): session.pop('admin_id', None) session.pop('admin_username', None) session.pop('admin_role', None) flash('已退出登录', 'success') return redirect(url_for('admin_login')) @app.route('/admin/dashboard') @admin_required def admin_dashboard(): return render_template('admin_dashboard.html') @app.route('/user/dashboard') @login_required def user_dashboard(): # 获取当前用户信息 user_id = session.get('admin_id') user = Admin.query.get(user_id) # 获取用户的游戏账号信息 game_accounts = [] conn = get_auth_db_connection() if conn: try: account_table = get_db_config('auth_account_table', 'account') # 这里假设用户名与游戏账号用户名相同,实际情况可能需要调整 query = f""" SELECT id, username, email, last_login, expansion, CASE WHEN online > 0 THEN '在线' ELSE '离线' END as status FROM {account_table} WHERE username = %s """ result = conn.execute(query, (user.username,)) game_accounts = [dict(row) for row in result] except Exception as e: flash(f'获取游戏账号失败: {str(e)}', 'danger') finally: conn.close() # 获取用户的角色信息 characters = [] if game_accounts: conn = get_char_db_connection() if conn: try: char_table = get_db_config('char_table', 'characters') account_ids = [acc['id'] for acc in game_accounts] placeholders = ', '.join(['%s'] * len(account_ids)) query = f""" SELECT guid, account, name, level, race, class, gender, CASE WHEN online > 0 THEN '在线' ELSE '离线' END as online_status, money/10000 as gold, totaltime/3600 as hours_played, map FROM {char_table} WHERE account IN ({placeholders}) ORDER BY level DESC """ result = conn.execute(query, account_ids) characters = [dict(row) for row in result] # 添加种族、职业和性别的文本描述 for character in characters: character['race_text'] = get_race_name(character['race']) character['class_text'] = get_class_name(character['class']) character['gender_text'] = '女' if character['gender'] == 1 else '男' except Exception as e: flash(f'获取角色列表失败: {str(e)}', 'danger') finally: conn.close() return render_template('user_dashboard.html', user=user, game_accounts=game_accounts, characters=characters) def generate_srp6_verifier(username, password): import hashlib from random import getrandbits N = int('894B645E89E1535BBDAD5B8B290650530801B18EBFBF5E8FAB3C82872A3E9BB7', 16) g = 7 # 生成随机salt salt = getrandbits(256).to_bytes(32, 'little') # 计算验证器 x = hashlib.sha1(salt + hashlib.sha1(f"{username}:{password}".encode()).digest()).digest() x = int.from_bytes(x, 'little') verifier = pow(g, x, N) verifier = verifier.to_bytes(32, 'little') return salt, verifier def create_game_account(username, password): username = username.upper() password = password.upper() salt, verifier = generate_srp6_verifier(username, password) conn = get_auth_db_connection() if conn: try: sql = """ INSERT INTO account (username, salt, verifier, expansion, joindate) VALUES (%s, %s, %s, %s, NOW()) """ conn.execute(sql, (username, salt, verifier, 2)) return True, "账号创建成功" except Exception as e: return False, f"数据库错误: {str(e)}" finally: conn.close() # 然后定义路由处理函数 @app.route('/admin/register', methods=['GET', 'POST']) def admin_register(): has_admin = Admin.query.filter_by(role='admin').first() is not None if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') confirm_password = request.form.get('confirm_password') if not username or not password: flash('请填写所有必填字段', 'danger') elif password != confirm_password: flash('两次输入的密码不一致', 'danger') elif Admin.query.filter_by(username=username).first(): flash('用户名已存在', 'danger') else: admin = Admin(username=username) admin.set_password(password) if not has_admin: admin.role = 'admin' flash('管理员账号创建成功,请登录', 'success') else: admin.role = 'user' # 正确的调用方式应该是: success, message = create_game_account(username, password) if not success: flash(f'创建游戏账号失败: {message}', 'danger') return render_template('admin_register.html', is_admin_exists=has_admin) flash('用户账号创建成功,同时已创建游戏账号', 'success') db.session.add(admin) db.session.commit() return redirect(url_for('admin_login')) return render_template('admin_register.html', is_admin_exists=has_admin) @app.route('/admin/create_admin', methods=['GET', 'POST']) @admin_required def create_admin(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') role = request.form.get('role', 'user') if not username or not password: flash('请填写所有必填字段', 'danger') elif Admin.query.filter_by(username=username).first(): flash('用户名已存在', 'danger') else: admin = Admin(username=username, role=role) admin.set_password(password) db.session.add(admin) db.session.commit() flash(f'{"管理员" if role == "admin" else "用户"}账号创建成功', 'success') return redirect(url_for('admin_dashboard')) return render_template('create_admin.html') @app.route('/admin/db_config', methods=['GET', 'POST']) @admin_required def db_config(): if request.method == 'POST': # 保存数据库配置 auth_host = request.form.get('auth_host') auth_port = request.form.get('auth_port') auth_user = request.form.get('auth_user') auth_pass = request.form.get('auth_pass') auth_db = request.form.get('auth_db') auth_account_table = request.form.get('auth_account_table') char_host = request.form.get('char_host') char_port = request.form.get('char_port') char_user = request.form.get('char_user') char_pass = request.form.get('char_pass') char_db = request.form.get('char_db') char_table = request.form.get('char_table') # 更新或创建配置 configs = { 'auth_host': auth_host, 'auth_port': auth_port, 'auth_user': auth_user, 'auth_pass': auth_pass, 'auth_db': auth_db, 'auth_account_table': auth_account_table, 'char_host': char_host, 'char_port': char_port, 'char_user': char_user, 'char_pass': char_pass, 'char_db': char_db, 'char_table': char_table } for key, value in configs.items(): config = DatabaseConfig.query.filter_by(config_name=key).first() if config: config.config_value = value else: config = DatabaseConfig(config_name=key, config_value=value) db.session.add(config) db.session.commit() flash('数据库配置已保存', 'success') return redirect(url_for('db_config')) # 获取现有配置 configs = {} db_configs = DatabaseConfig.query.all() for config in db_configs: configs[config.config_name] = config.config_value return render_template('db_config.html', configs=configs) @app.route('/admin/test_connection', methods=['POST']) @admin_required def test_connection(): connection_type = request.form.get('type') if connection_type == 'auth': host = request.form.get('auth_host') port = request.form.get('auth_port') user = request.form.get('auth_user') password = request.form.get('auth_pass') db_name = request.form.get('auth_db') else: # char host = request.form.get('char_host') port = request.form.get('char_port') user = request.form.get('char_user') password = request.form.get('char_pass') db_name = request.form.get('char_db') try: connection_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}" engine = create_engine(connection_string) connection = engine.connect() connection.close() return jsonify({'success': True, 'message': '连接成功'}) except Exception as e: return jsonify({'success': False, 'message': f'连接失败: {str(e)}'}) # 在应用启动时创建管理员账号(如果不存在) @app.before_first_request def create_admin(): db.create_all() if Admin.query.filter_by(role='admin').count() == 0: # 如果没有管理员账号,创建一个默认账号 admin = Admin(username='admin', role='admin') admin.set_password('admin') db.session.add(admin) db.session.commit() print("已创建默认管理员账号: admin/admin") # 获取数据库配置 def get_db_config(config_name, default=None): config = DatabaseConfig.query.filter_by(config_name=config_name).first() if config: return config.config_value return default # 连接到AzerothCore的Auth数据库 def get_auth_db_connection(): host = get_db_config('auth_host', '127.0.0.1') port = get_db_config('auth_port', '3306') user = get_db_config('auth_user', 'acore') password = get_db_config('auth_pass', 'acore') db_name = get_db_config('auth_db', 'acore_auth') try: connection_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}" engine = create_engine(connection_string) return engine.connect() except Exception as e: print(f"连接Auth数据库失败: {str(e)}") return None # 连接到AzerothCore的Characters数据库 def get_char_db_connection(): host = get_db_config('char_host', '127.0.0.1') port = get_db_config('char_port', '3306') user = get_db_config('char_user', 'acore') password = get_db_config('char_pass', 'acore') db_name = get_db_config('char_db', 'acore_characters') try: connection_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}" engine = create_engine(connection_string) return engine.connect() except Exception as e: print(f"连接Characters数据库失败: {str(e)}") return None @app.route('/admin/account_management', methods=['GET', 'POST']) @admin_required def account_management(): if request.method == 'POST': action = request.form.get('action') if action == 'create_account': username = request.form.get('username') password = request.form.get('password') email = request.form.get('email', '') if not username or not password: flash('用户名和密码不能为空', 'danger') return redirect(url_for('account_management')) # 连接到Auth数据库 conn = get_auth_db_connection() if not conn: flash('无法连接到Auth数据库', 'danger') return redirect(url_for('account_management')) try: # 检查账号是否已存在 account_table = get_db_config('auth_account_table', 'account') result = conn.execute(f"SELECT id FROM {account_table} WHERE username = %s", (username,)) if result.fetchone(): flash('账号已存在', 'danger') return redirect(url_for('account_management')) # 根据提供的表结构,我们需要生成salt和verifier # 这里使用简化的方法,实际应该使用SRP6算法 import os import hashlib import binascii # 生成随机salt (32字节) salt = os.urandom(32) # 计算verifier (简化版本,实际应使用SRP6) # 在实际的SRP6中,verifier的计算更复杂 # 这里我们使用一个简化的方法,仅用于演示 h1 = hashlib.sha1((username.upper() + ":" + password.upper()).encode()).digest() h2 = hashlib.sha1(salt + h1).digest() verifier = h2 # 构建SQL sql = f""" INSERT INTO {account_table} (username, salt, verifier, email, reg_mail, joindate) VALUES (%s, %s, %s, %s, %s, NOW()) """ # 执行SQL conn.execute(sql, (username, salt, verifier, email, email)) flash('账号创建成功', 'success') except Exception as e: flash(f'创建账号失败: {str(e)}', 'danger') finally: conn.close() return redirect(url_for('account_management')) # 获取账号列表 accounts = [] conn = get_auth_db_connection() if conn: try: account_table = get_db_config('auth_account_table', 'account') # 根据提供的表结构构建查询 query = f""" SELECT id, username, email, last_login, expansion, CASE WHEN online > 0 THEN '在线' ELSE '离线' END as status FROM {account_table} ORDER BY id DESC LIMIT 100 """ result = conn.execute(query) accounts = [dict(row) for row in result] except Exception as e: flash(f'获取账号列表失败: {str(e)}', 'danger') finally: conn.close() return render_template('account_management.html', accounts=accounts) @app.route('/admin/character_management') @admin_required def character_management(): # 获取角色列表 characters = [] conn = get_char_db_connection() if conn: try: char_table = get_db_config('char_table', 'characters') # 根据提供的表结构构建查询 query = f""" SELECT guid, account, name, level, race, class, gender, CASE WHEN online > 0 THEN '在线' ELSE '离线' END as online_status, money/10000 as gold, totaltime/3600 as hours_played, map FROM {char_table} ORDER BY guid DESC LIMIT 100 """ result = conn.execute(query) characters = [dict(row) for row in result] # 添加种族、职业和性别的文本描述 for character in characters: character['race_text'] = get_race_name(character['race']) character['class_text'] = get_class_name(character['class']) character['gender_text'] = '女' if character['gender'] == 1 else '男' except Exception as e: flash(f'获取角色列表失败: {str(e)}', 'danger') finally: conn.close() return render_template('character_management.html', characters=characters) @app.route('/admin/link_account', methods=['POST']) @admin_required def link_account(): character_name = request.form.get('character_name') post_url = request.form.get('post_url') if not character_name or not post_url: flash('请填写所有必要信息', 'danger') return redirect(url_for('character_management')) # 验证角色是否存在 char_conn = get_char_db_connection() if not char_conn: flash('无法连接到Characters数据库', 'danger') return redirect(url_for('character_management')) try: char_table = get_db_config('char_table', 'characters') # 查询角色 result = char_conn.execute(f"SELECT guid, account, name, level FROM {char_table} WHERE name = %s", (character_name,)) character = result.fetchone() if not character: flash('角色不存在', 'danger') return redirect(url_for('character_management')) # 检查该角色是否已经领取过CDK used = UsedCDK.query.filter_by(character_name=character_name).first() if used: flash(f'该角色已经领取过CDK: {used.cdk},领取时间: {used.used_at}', 'warning') return redirect(url_for('character_management')) # 获取未使用的CDK cdk = CDKCode.query.filter( ~CDKCode.兑换码.in_( db.session.query(UsedCDK.cdk) ) ).first() if not cdk: flash('CDK已经用完,请添加新的CDK', 'danger') return redirect(url_for('character_management')) # 记录使用情况 used_cdk = UsedCDK( character_name=character_name, post_url=post_url, cdk=cdk.兑换码 ) db.session.add(used_cdk) db.session.commit() flash(f'已成功为角色 {character_name} (等级 {character["level"]}) 分配CDK: {cdk.兑换码}', 'success') except Exception as e: flash(f'操作失败: {str(e)}', 'danger') finally: char_conn.close() return redirect(url_for('character_management')) # 获取种族名称 def get_race_name(race_id): races = { 1: '人类', 2: '兽人', 3: '矮人', 4: '暗夜精灵', 5: '亡灵', 6: '牛头人', 7: '侏儒', 8: '巨魔', 9: '地精', 10: '血精灵', 11: '德莱尼', 22: '狼人', 24: '熊猫人', 25: '熊猫人', 26: '熊猫人' } return races.get(race_id, f'未知({race_id})') # 获取职业名称 def get_class_name(class_id): classes = { 1: '战士', 2: '圣骑士', 3: '猎人', 4: '盗贼', 5: '牧师', 6: '死亡骑士', 7: '萨满', 8: '法师', 9: '术士', 10: '武僧', 11: '德鲁伊' } return classes.get(class_id, f'未知({class_id})') if __name__ == '__main__': # 创建数据库 create_database() with app.app_context(): try: # 创建所有表 db.create_all() print("数据库表创建成功") except Exception as e: print(f"数据库表创建失败:{str(e)}") app.run(debug=True)