CDK_Change/app.py

1080 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, jsonify, flash, redirect, url_for, session
from flask_sqlalchemy import SQLAlchemy
import requests
from bs4 import BeautifulSoup
import os
from dotenv import load_dotenv
import random
import string
import pymysql
import re
import hashlib
from functools import wraps
from config import AD_TEMPLATE, SERVER_INFO, VERIFICATION_CONFIG, NAME_TYPES
from sqlalchemy import create_engine
# 设置PyMySQL作为MySQL驱动
pymysql.install_as_MySQLdb()
load_dotenv()
app = Flask(__name__)
# 设置 Flask secret key
app.secret_key = os.getenv('SECRET_KEY', 'dev_key_123') # 在生产环境中使用环境变量设置
# 数据库配置
database_url = os.getenv('DATABASE_URL')
app.config['SQLALCHEMY_DATABASE_URI'] = database_url
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 创建数据库连接
def create_database():
try:
# 解析数据库URL
url_parts = database_url.split('/')
db_name = url_parts[-1]
base_url = '/'.join(url_parts[:-1])
# 创建数据库引擎(不指定数据库名)
engine = create_engine(base_url)
# 连接并创建数据库
with engine.connect() as conn:
conn.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
print(f"数据库 {db_name} 创建成功")
except Exception as e:
print(f"创建数据库失败:{str(e)}")
db = SQLAlchemy(app)
# 定义CDK模型
class CDKCode(db.Model):
__tablename__ = '兑换码'
ID = db.Column(db.Integer, primary_key=True)
兑换码 = db.Column(db.String(20), unique=True, nullable=False)
奖励模板ID = db.Column(db.Integer, nullable=False)
# 定义已使用的CDK记录
class UsedCDK(db.Model):
__tablename__ = 'used_cdks'
id = db.Column(db.Integer, primary_key=True)
character_name = db.Column(db.String(50), nullable=False)
post_url = db.Column(db.String(255), nullable=False)
cdk = db.Column(db.String(20), nullable=False)
used_at = db.Column(db.DateTime, server_default=db.func.now())
# 定义管理员模型
class Admin(db.Model):
__tablename__ = 'admins'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(50), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
role = db.Column(db.String(20), default='user') # 'admin' 或 'user'
created_at = db.Column(db.DateTime, server_default=db.func.now())
def set_password(self, password):
self.password_hash = hashlib.sha256(password.encode()).hexdigest()
def check_password(self, password):
return self.password_hash == hashlib.sha256(password.encode()).hexdigest()
def is_admin(self):
return self.role == 'admin'
# 定义数据库配置模型
class DatabaseConfig(db.Model):
__tablename__ = 'database_configs'
id = db.Column(db.Integer, primary_key=True)
config_name = db.Column(db.String(50), nullable=False)
config_value = db.Column(db.String(255), nullable=False)
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())
# 管理员登录装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin_id' not in session:
flash('请先登录', 'danger')
return redirect(url_for('admin_login'))
# 检查是否为管理员
admin = Admin.query.get(session['admin_id'])
if not admin or not admin.is_admin():
flash('您没有权限访问此页面', 'danger')
return redirect(url_for('user_dashboard'))
return f(*args, **kwargs)
return decorated_function
# 用户登录装饰器(包括管理员和普通用户)
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin_id' not in session:
flash('请先登录', 'danger')
return redirect(url_for('admin_login'))
return f(*args, **kwargs)
return decorated_function
def verify_ad_content(content, character_name):
# 清理内容中的空白字符
content = ' '.join(content.split())
# 构建预期的广告内容
expected_ad = AD_TEMPLATE.format(
server_name=SERVER_INFO["server_name"],
name_type=NAME_TYPES["character"],
name=character_name,
server_features=SERVER_INFO["server_features"],
website=SERVER_INFO["website"]
)
# 清理预期内容中的空白字符
expected_ad = ' '.join(expected_ad.split())
# 直接比较核心内容
core_content = re.sub(r'\s+', '', content)
core_expected = re.sub(r'\s+', '', expected_ad)
# 检查核心内容是否包含在发帖内容中
if core_expected in core_content:
return True
# 如果完全匹配失败,进行更宽松的验证
for keyword in VERIFICATION_CONFIG['required_keywords']:
if keyword not in content:
return False
for pattern in VERIFICATION_CONFIG['required_patterns']:
if not re.search(pattern, content, re.DOTALL):
return False
if character_name not in content:
return False
return True
@app.route('/')
def index():
return render_template('index.html')
@app.route('/generate')
@login_required
def generate():
# 获取URL参数中的角色名
character_name = request.args.get('character_name')
# 获取当前用户的游戏账号
user_id = session.get('admin_id')
user = Admin.query.get(user_id)
# 获取用户的角色列表
characters = []
conn = get_auth_db_connection()
if conn:
try:
account_table = get_db_config('auth_account_table', 'account')
# 获取与用户名匹配的游戏账号
query = f"SELECT id FROM {account_table} WHERE username = %s"
result = conn.execute(query, (user.username,))
account_ids = [row[0] for row in result]
if account_ids:
char_conn = get_char_db_connection()
if char_conn:
try:
char_table = get_db_config('char_table', 'characters')
placeholders = ', '.join(['%s'] * len(account_ids))
query = f"""
SELECT guid, account, name, level, race, class, gender
FROM {char_table}
WHERE account IN ({placeholders})
ORDER BY level DESC
"""
result = char_conn.execute(query, account_ids)
characters = [dict(row) for row in result]
# 添加种族和职业的文本描述
for character in characters:
character['race_text'] = get_race_name(character['race'])
character['class_text'] = get_class_name(character['class'])
except Exception as e:
flash(f'获取角色列表失败: {str(e)}', 'danger')
finally:
char_conn.close()
except Exception as e:
flash(f'获取游戏账号失败: {str(e)}', 'danger')
finally:
conn.close()
# 如果没有角色,显示提示信息
if not characters:
flash('您需要先在游戏中创建角色才能生成宣传内容', 'warning')
return render_template('generate.html', server_info=SERVER_INFO, characters=characters, selected_character=character_name)
@app.route('/create-ad', methods=['POST'])
@login_required
def create_ad():
character_name = request.form.get('character_name')
if not character_name:
return jsonify({'success': False, 'message': '请选择角色'})
# 验证角色是否属于当前用户
user_id = session.get('admin_id')
user = Admin.query.get(user_id)
# 获取用户的游戏账号ID
account_ids = []
conn = get_auth_db_connection()
if conn:
try:
account_table = get_db_config('auth_account_table', 'account')
query = f"SELECT id FROM {account_table} WHERE username = %s"
result = conn.execute(query, (user.username,))
account_ids = [row[0] for row in result]
finally:
conn.close()
# 验证角色是否属于用户
if account_ids:
char_conn = get_char_db_connection()
if char_conn:
try:
char_table = get_db_config('char_table', 'characters')
placeholders = ', '.join(['%s'] * len(account_ids))
query = f"""
SELECT guid, account, name, level, race, class
FROM {char_table}
WHERE name = %s AND account IN ({placeholders})
"""
params = [character_name] + account_ids
result = char_conn.execute(query, params)
character = result.fetchone()
if not character:
return jsonify({'success': False, 'message': '您选择的角色不存在或不属于您'})
# 获取角色的种族和职业文本
race_text = get_race_name(character['race'])
class_text = get_class_name(character['class'])
# 生成宣传内容
ad_content = AD_TEMPLATE.format(
server_name=SERVER_INFO['server_name'],
name_type=f"推荐{race_text}{class_text}",
name=character_name,
server_features=SERVER_INFO['server_features'],
website=SERVER_INFO['website']
)
return jsonify({
'success': True,
'ad_content': ad_content
})
finally:
char_conn.close()
return jsonify({'success': False, 'message': '无法验证角色信息'})
@app.route('/verify', methods=['POST'])
def verify_post():
character_name = request.form.get('character_name')
post_url = request.form.get('post_url')
if not character_name or not post_url:
return jsonify({'success': False, 'message': '请填写所有必要信息'})
# 检查该角色是否已经领取过CDK
used = UsedCDK.query.filter_by(character_name=character_name).first()
if used:
return jsonify({'success': False, 'message': '该角色已经领取过CDK'})
try:
# 获取帖子内容
response = requests.get(post_url)
soup = BeautifulSoup(response.text, 'html.parser')
# 尝试多种方式提取文本
post_content = soup.get_text()
# 打印调试信息
print(f"URL: {post_url}")
print(f"获取到的内容长度: {len(post_content)}")
print(f"内容预览: {post_content[:200]}...")
# 验证广告内容
if not verify_ad_content(post_content, character_name):
# 尝试其他方法获取内容
# 例如尝试获取特定的div或段落
post_divs = soup.find_all(['div', 'p', 'article', 'section'])
# 遍历所有可能的内容区块
for div in post_divs:
div_text = div.get_text()
if len(div_text) > 100: # 只检查长度合理的文本块
print(f"尝试验证区块: {div_text[:100]}...")
if verify_ad_content(div_text, character_name):
# 找到了有效的广告内容
post_content = div_text
break
else:
# 如果所有区块都验证失败
return jsonify({'success': False, 'message': '未找到有效的宣传内容,请确保完整复制宣传模板或检查发帖网站兼容性'})
# 获取未使用的CDK
cdk = CDKCode.query.filter(
~CDKCode.兑换码.in_(
db.session.query(UsedCDK.cdk)
)
).first()
if not cdk:
return jsonify({'success': False, 'message': 'CDK已经用完请联系管理员'})
# 记录使用情况
used_cdk = UsedCDK(
character_name=character_name,
post_url=post_url,
cdk=cdk.兑换码
)
db.session.add(used_cdk)
db.session.commit()
return jsonify({
'success': True,
'message': '验证成功!',
'cdk': cdk.兑换码
})
except Exception as e:
print(f"验证过程中出错: {str(e)}")
return jsonify({'success': False, 'message': f'验证失败:{str(e)}'})
@app.route('/redeem', methods=['GET', 'POST'])
@login_required
def redeem():
if request.method == 'POST':
character_name = request.form.get('character_name')
post_url = request.form.get('link')
if not character_name or not post_url:
flash('请填写所有必要信息', 'danger')
return redirect(url_for('redeem'))
# 验证角色是否属于当前用户
user_id = session.get('admin_id')
user = Admin.query.get(user_id)
# 获取用户的游戏账号ID
account_ids = []
conn = get_auth_db_connection()
if conn:
try:
account_table = get_db_config('auth_account_table', 'account')
query = f"SELECT id FROM {account_table} WHERE username = %s"
result = conn.execute(query, (user.username,))
account_ids = [row[0] for row in result]
finally:
conn.close()
# 验证角色是否属于用户
character_exists = False
if account_ids:
char_conn = get_char_db_connection()
if char_conn:
try:
char_table = get_db_config('char_table', 'characters')
placeholders = ', '.join(['%s'] * len(account_ids))
query = f"""
SELECT guid, account, name
FROM {char_table}
WHERE name = %s AND account IN ({placeholders})
"""
params = [character_name] + account_ids
result = char_conn.execute(query, params)
character = result.fetchone()
if character:
character_exists = True
finally:
char_conn.close()
if not character_exists:
flash('您选择的角色不存在或不属于您', 'danger')
return redirect(url_for('redeem'))
# 检查该角色是否已经领取过CDK
used = UsedCDK.query.filter_by(character_name=character_name).first()
if used:
flash(f'该角色已经领取过CDK: {used.cdk},领取时间: {used.used_at}', 'danger')
return redirect(url_for('redeem'))
# 获取帖子内容并验证
try:
response = requests.get(post_url)
soup = BeautifulSoup(response.text, 'html.parser')
post_content = soup.get_text()
# 验证广告内容
if not verify_ad_content(post_content, character_name):
flash('未找到有效的宣传内容请确保完整复制宣传模板2', 'danger')
flash(post_content, 'danger')
flash(character_name, 'danger')
return redirect(url_for('redeem'))
# 获取未使用的CDK
cdk = CDKCode.query.filter(
~CDKCode.兑换码.in_(
db.session.query(UsedCDK.cdk)
)
).first()
if not cdk:
flash('CDK已经用完请联系管理员', 'danger')
return redirect(url_for('redeem'))
# 记录使用情况
used_cdk = UsedCDK(
character_name=character_name,
post_url=post_url,
cdk=cdk.兑换码
)
db.session.add(used_cdk)
db.session.commit()
flash(f'恭喜!您的角色 {character_name} 已成功兑换CDK: {cdk.兑换码}', 'success')
except Exception as e:
flash(f'验证失败: {str(e)}', 'danger')
return redirect(url_for('redeem'))
# 获取当前用户的角色列表
characters = []
user_id = session.get('admin_id')
user = Admin.query.get(user_id)
conn = get_auth_db_connection()
if conn:
try:
account_table = get_db_config('auth_account_table', 'account')
query = f"SELECT id FROM {account_table} WHERE username = %s"
result = conn.execute(query, (user.username,))
account_ids = [row[0] for row in result]
if account_ids:
char_conn = get_char_db_connection()
if char_conn:
try:
char_table = get_db_config('char_table', 'characters')
placeholders = ', '.join(['%s'] * len(account_ids))
query = f"""
SELECT guid, account, name, level, race, class
FROM {char_table}
WHERE account IN ({placeholders})
ORDER BY level DESC
"""
result = char_conn.execute(query, account_ids)
characters = [dict(row) for row in result]
# 添加种族和职业的文本描述
for character in characters:
character['race_text'] = get_race_name(character['race'])
character['class_text'] = get_class_name(character['class'])
# 检查角色是否已经兑换过CDK
used = UsedCDK.query.filter_by(character_name=character['name']).first()
character['has_cdk'] = used is not None
if used:
character['cdk'] = used.cdk
character['redeem_time'] = used.used_at
finally:
char_conn.close()
finally:
conn.close()
return render_template('redeem.html', server_info=SERVER_INFO, characters=characters)
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
admin = Admin.query.filter_by(username=username).first()
if admin and admin.check_password(password):
session['admin_id'] = admin.id
session['admin_username'] = admin.username
session['admin_role'] = admin.role
flash('登录成功', 'success')
# 根据角色重定向到不同的仪表盘
if admin.is_admin():
return redirect(url_for('admin_dashboard'))
else:
return redirect(url_for('user_dashboard'))
else:
flash('用户名或密码错误', 'danger')
return render_template('admin_login.html')
@app.route('/admin/logout')
def admin_logout():
session.pop('admin_id', None)
session.pop('admin_username', None)
session.pop('admin_role', None)
flash('已退出登录', 'success')
return redirect(url_for('admin_login'))
@app.route('/admin/dashboard')
@admin_required
def admin_dashboard():
return render_template('admin_dashboard.html')
@app.route('/user/dashboard')
@login_required
def user_dashboard():
# 获取当前用户信息
user_id = session.get('admin_id')
user = Admin.query.get(user_id)
# 获取用户的游戏账号信息
game_accounts = []
conn = get_auth_db_connection()
if conn:
try:
account_table = get_db_config('auth_account_table', 'account')
# 这里假设用户名与游戏账号用户名相同,实际情况可能需要调整
query = f"""
SELECT
id, username, email,
last_login, expansion,
CASE WHEN online > 0 THEN '在线' ELSE '离线' END as status
FROM {account_table}
WHERE username = %s
"""
result = conn.execute(query, (user.username,))
game_accounts = [dict(row) for row in result]
except Exception as e:
flash(f'获取游戏账号失败: {str(e)}', 'danger')
finally:
conn.close()
# 获取用户的角色信息
characters = []
if game_accounts:
conn = get_char_db_connection()
if conn:
try:
char_table = get_db_config('char_table', 'characters')
account_ids = [acc['id'] for acc in game_accounts]
placeholders = ', '.join(['%s'] * len(account_ids))
query = f"""
SELECT
guid, account, name, level, race, class, gender,
CASE WHEN online > 0 THEN '在线' ELSE '离线' END as online_status,
money/10000 as gold,
totaltime/3600 as hours_played,
map
FROM {char_table}
WHERE account IN ({placeholders})
ORDER BY level DESC
"""
result = conn.execute(query, account_ids)
characters = [dict(row) for row in result]
# 添加种族、职业和性别的文本描述
for character in characters:
character['race_text'] = get_race_name(character['race'])
character['class_text'] = get_class_name(character['class'])
character['gender_text'] = '' if character['gender'] == 1 else ''
except Exception as e:
flash(f'获取角色列表失败: {str(e)}', 'danger')
finally:
conn.close()
return render_template('user_dashboard.html', user=user, game_accounts=game_accounts, characters=characters)
def generate_srp6_verifier(username, password):
import hashlib
from random import getrandbits
N = int('894B645E89E1535BBDAD5B8B290650530801B18EBFBF5E8FAB3C82872A3E9BB7', 16)
g = 7
# 生成随机salt
salt = getrandbits(256).to_bytes(32, 'little')
# 计算验证器
x = hashlib.sha1(salt + hashlib.sha1(f"{username}:{password}".encode()).digest()).digest()
x = int.from_bytes(x, 'little')
verifier = pow(g, x, N)
verifier = verifier.to_bytes(32, 'little')
return salt, verifier
def create_game_account(username, password):
username = username.upper()
password = password.upper()
salt, verifier = generate_srp6_verifier(username, password)
conn = get_auth_db_connection()
if conn:
try:
sql = """
INSERT INTO account
(username, salt, verifier, expansion, joindate)
VALUES (%s, %s, %s, %s, NOW())
"""
conn.execute(sql, (username, salt, verifier, 2))
return True, "账号创建成功"
except Exception as e:
return False, f"数据库错误: {str(e)}"
finally:
conn.close()
# 然后定义路由处理函数
@app.route('/admin/register', methods=['GET', 'POST'])
def admin_register():
has_admin = Admin.query.filter_by(role='admin').first() is not None
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
if not username or not password:
flash('请填写所有必填字段', 'danger')
elif password != confirm_password:
flash('两次输入的密码不一致', 'danger')
elif Admin.query.filter_by(username=username).first():
flash('用户名已存在', 'danger')
else:
admin = Admin(username=username)
admin.set_password(password)
if not has_admin:
admin.role = 'admin'
flash('管理员账号创建成功,请登录', 'success')
else:
admin.role = 'user'
# 正确的调用方式应该是:
success, message = create_game_account(username, password)
if not success:
flash(f'创建游戏账号失败: {message}', 'danger')
return render_template('admin_register.html', is_admin_exists=has_admin)
flash('用户账号创建成功,同时已创建游戏账号', 'success')
db.session.add(admin)
db.session.commit()
return redirect(url_for('admin_login'))
return render_template('admin_register.html', is_admin_exists=has_admin)
@app.route('/admin/create_admin', methods=['GET', 'POST'])
@admin_required
def create_admin():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
role = request.form.get('role', 'user')
if not username or not password:
flash('请填写所有必填字段', 'danger')
elif Admin.query.filter_by(username=username).first():
flash('用户名已存在', 'danger')
else:
admin = Admin(username=username, role=role)
admin.set_password(password)
db.session.add(admin)
db.session.commit()
flash(f'{"管理员" if role == "admin" else "用户"}账号创建成功', 'success')
return redirect(url_for('admin_dashboard'))
return render_template('create_admin.html')
@app.route('/admin/db_config', methods=['GET', 'POST'])
@admin_required
def db_config():
if request.method == 'POST':
# 保存数据库配置
auth_host = request.form.get('auth_host')
auth_port = request.form.get('auth_port')
auth_user = request.form.get('auth_user')
auth_pass = request.form.get('auth_pass')
auth_db = request.form.get('auth_db')
auth_account_table = request.form.get('auth_account_table')
char_host = request.form.get('char_host')
char_port = request.form.get('char_port')
char_user = request.form.get('char_user')
char_pass = request.form.get('char_pass')
char_db = request.form.get('char_db')
char_table = request.form.get('char_table')
# 更新或创建配置
configs = {
'auth_host': auth_host,
'auth_port': auth_port,
'auth_user': auth_user,
'auth_pass': auth_pass,
'auth_db': auth_db,
'auth_account_table': auth_account_table,
'char_host': char_host,
'char_port': char_port,
'char_user': char_user,
'char_pass': char_pass,
'char_db': char_db,
'char_table': char_table
}
for key, value in configs.items():
config = DatabaseConfig.query.filter_by(config_name=key).first()
if config:
config.config_value = value
else:
config = DatabaseConfig(config_name=key, config_value=value)
db.session.add(config)
db.session.commit()
flash('数据库配置已保存', 'success')
return redirect(url_for('db_config'))
# 获取现有配置
configs = {}
db_configs = DatabaseConfig.query.all()
for config in db_configs:
configs[config.config_name] = config.config_value
return render_template('db_config.html', configs=configs)
@app.route('/admin/test_connection', methods=['POST'])
@admin_required
def test_connection():
connection_type = request.form.get('type')
if connection_type == 'auth':
host = request.form.get('auth_host')
port = request.form.get('auth_port')
user = request.form.get('auth_user')
password = request.form.get('auth_pass')
db_name = request.form.get('auth_db')
else: # char
host = request.form.get('char_host')
port = request.form.get('char_port')
user = request.form.get('char_user')
password = request.form.get('char_pass')
db_name = request.form.get('char_db')
try:
connection_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}"
engine = create_engine(connection_string)
connection = engine.connect()
connection.close()
return jsonify({'success': True, 'message': '连接成功'})
except Exception as e:
return jsonify({'success': False, 'message': f'连接失败: {str(e)}'})
# 在应用启动时创建管理员账号(如果不存在)
@app.before_first_request
def create_admin():
db.create_all()
if Admin.query.filter_by(role='admin').count() == 0:
# 如果没有管理员账号,创建一个默认账号
admin = Admin(username='admin', role='admin')
admin.set_password('admin')
db.session.add(admin)
db.session.commit()
print("已创建默认管理员账号: admin/admin")
# 获取数据库配置
def get_db_config(config_name, default=None):
config = DatabaseConfig.query.filter_by(config_name=config_name).first()
if config:
return config.config_value
return default
# 连接到AzerothCore的Auth数据库
def get_auth_db_connection():
host = get_db_config('auth_host', '127.0.0.1')
port = get_db_config('auth_port', '3306')
user = get_db_config('auth_user', 'acore')
password = get_db_config('auth_pass', 'acore')
db_name = get_db_config('auth_db', 'acore_auth')
try:
connection_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}"
engine = create_engine(connection_string)
return engine.connect()
except Exception as e:
print(f"连接Auth数据库失败: {str(e)}")
return None
# 连接到AzerothCore的Characters数据库
def get_char_db_connection():
host = get_db_config('char_host', '127.0.0.1')
port = get_db_config('char_port', '3306')
user = get_db_config('char_user', 'acore')
password = get_db_config('char_pass', 'acore')
db_name = get_db_config('char_db', 'acore_characters')
try:
connection_string = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}"
engine = create_engine(connection_string)
return engine.connect()
except Exception as e:
print(f"连接Characters数据库失败: {str(e)}")
return None
@app.route('/admin/account_management', methods=['GET', 'POST'])
@admin_required
def account_management():
if request.method == 'POST':
action = request.form.get('action')
if action == 'create_account':
username = request.form.get('username')
password = request.form.get('password')
email = request.form.get('email', '')
if not username or not password:
flash('用户名和密码不能为空', 'danger')
return redirect(url_for('account_management'))
# 连接到Auth数据库
conn = get_auth_db_connection()
if not conn:
flash('无法连接到Auth数据库', 'danger')
return redirect(url_for('account_management'))
try:
# 检查账号是否已存在
account_table = get_db_config('auth_account_table', 'account')
result = conn.execute(f"SELECT id FROM {account_table} WHERE username = %s", (username,))
if result.fetchone():
flash('账号已存在', 'danger')
return redirect(url_for('account_management'))
# 根据提供的表结构我们需要生成salt和verifier
# 这里使用简化的方法实际应该使用SRP6算法
import os
import hashlib
import binascii
# 生成随机salt (32字节)
salt = os.urandom(32)
# 计算verifier (简化版本实际应使用SRP6)
# 在实际的SRP6中verifier的计算更复杂
# 这里我们使用一个简化的方法,仅用于演示
h1 = hashlib.sha1((username.upper() + ":" + password.upper()).encode()).digest()
h2 = hashlib.sha1(salt + h1).digest()
verifier = h2
# 构建SQL
sql = f"""
INSERT INTO {account_table}
(username, salt, verifier, email, reg_mail, joindate)
VALUES (%s, %s, %s, %s, %s, NOW())
"""
# 执行SQL
conn.execute(sql, (username, salt, verifier, email, email))
flash('账号创建成功', 'success')
except Exception as e:
flash(f'创建账号失败: {str(e)}', 'danger')
finally:
conn.close()
return redirect(url_for('account_management'))
# 获取账号列表
accounts = []
conn = get_auth_db_connection()
if conn:
try:
account_table = get_db_config('auth_account_table', 'account')
# 根据提供的表结构构建查询
query = f"""
SELECT id, username, email,
last_login, expansion,
CASE WHEN online > 0 THEN '在线' ELSE '离线' END as status
FROM {account_table}
ORDER BY id DESC
LIMIT 100
"""
result = conn.execute(query)
accounts = [dict(row) for row in result]
except Exception as e:
flash(f'获取账号列表失败: {str(e)}', 'danger')
finally:
conn.close()
return render_template('account_management.html', accounts=accounts)
@app.route('/admin/character_management')
@admin_required
def character_management():
# 获取角色列表
characters = []
conn = get_char_db_connection()
if conn:
try:
char_table = get_db_config('char_table', 'characters')
# 根据提供的表结构构建查询
query = f"""
SELECT
guid, account, name, level, race, class, gender,
CASE WHEN online > 0 THEN '在线' ELSE '离线' END as online_status,
money/10000 as gold,
totaltime/3600 as hours_played,
map
FROM {char_table}
ORDER BY guid DESC
LIMIT 100
"""
result = conn.execute(query)
characters = [dict(row) for row in result]
# 添加种族、职业和性别的文本描述
for character in characters:
character['race_text'] = get_race_name(character['race'])
character['class_text'] = get_class_name(character['class'])
character['gender_text'] = '' if character['gender'] == 1 else ''
except Exception as e:
flash(f'获取角色列表失败: {str(e)}', 'danger')
finally:
conn.close()
return render_template('character_management.html', characters=characters)
@app.route('/admin/link_account', methods=['POST'])
@admin_required
def link_account():
character_name = request.form.get('character_name')
post_url = request.form.get('post_url')
if not character_name or not post_url:
flash('请填写所有必要信息', 'danger')
return redirect(url_for('character_management'))
# 验证角色是否存在
char_conn = get_char_db_connection()
if not char_conn:
flash('无法连接到Characters数据库', 'danger')
return redirect(url_for('character_management'))
try:
char_table = get_db_config('char_table', 'characters')
# 查询角色
result = char_conn.execute(f"SELECT guid, account, name, level FROM {char_table} WHERE name = %s", (character_name,))
character = result.fetchone()
if not character:
flash('角色不存在', 'danger')
return redirect(url_for('character_management'))
# 检查该角色是否已经领取过CDK
used = UsedCDK.query.filter_by(character_name=character_name).first()
if used:
flash(f'该角色已经领取过CDK: {used.cdk},领取时间: {used.used_at}', 'warning')
return redirect(url_for('character_management'))
# 获取未使用的CDK
cdk = CDKCode.query.filter(
~CDKCode.兑换码.in_(
db.session.query(UsedCDK.cdk)
)
).first()
if not cdk:
flash('CDK已经用完请添加新的CDK', 'danger')
return redirect(url_for('character_management'))
# 记录使用情况
used_cdk = UsedCDK(
character_name=character_name,
post_url=post_url,
cdk=cdk.兑换码
)
db.session.add(used_cdk)
db.session.commit()
flash(f'已成功为角色 {character_name} (等级 {character["level"]}) 分配CDK: {cdk.兑换码}', 'success')
except Exception as e:
flash(f'操作失败: {str(e)}', 'danger')
finally:
char_conn.close()
return redirect(url_for('character_management'))
# 获取种族名称
def get_race_name(race_id):
races = {
1: '人类',
2: '兽人',
3: '矮人',
4: '暗夜精灵',
5: '亡灵',
6: '牛头人',
7: '侏儒',
8: '巨魔',
9: '地精',
10: '血精灵',
11: '德莱尼',
22: '狼人',
24: '熊猫人',
25: '熊猫人',
26: '熊猫人'
}
return races.get(race_id, f'未知({race_id})')
# 获取职业名称
def get_class_name(class_id):
classes = {
1: '战士',
2: '圣骑士',
3: '猎人',
4: '盗贼',
5: '牧师',
6: '死亡骑士',
7: '萨满',
8: '法师',
9: '术士',
10: '武僧',
11: '德鲁伊'
}
return classes.get(class_id, f'未知({class_id})')
if __name__ == '__main__':
# 创建数据库
create_database()
with app.app_context():
try:
# 创建所有表
db.create_all()
print("数据库表创建成功")
except Exception as e:
print(f"数据库表创建失败:{str(e)}")
app.run(debug=True)