多线程重构
This commit is contained in:
parent
2b53e18957
commit
96f6c7641a
@ -1,90 +1,49 @@
|
|||||||
import json
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
import requests
|
|
||||||
import csv
|
import csv
|
||||||
from utility import encrypt, cap_recognize
|
import threading
|
||||||
def study(username,password):
|
import subprocess
|
||||||
# 返回1:成功
|
|
||||||
# 返回0:失败
|
|
||||||
tryTime = 0
|
|
||||||
url = ''
|
|
||||||
while tryTime < 4:
|
|
||||||
try:
|
|
||||||
bjySession = requests.session() # 创建会话
|
|
||||||
bjySession.timeout = 5 # 设置会话超时
|
|
||||||
bjySession.headers.update({"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', })
|
|
||||||
touch = bjySession.get(url="https://m.bjyouth.net/site/login")
|
|
||||||
capUrl = "https://m.bjyouth.net" + re.findall(r'src="(/site/captcha.+)" alt=', touch.text)[0]
|
|
||||||
capText = cap_recognize(bjySession.get(url=capUrl).content)
|
|
||||||
login_r = bjySession.post('https://m.bjyouth.net/site/login',
|
|
||||||
data={
|
|
||||||
'_csrf_mobile': bjySession.cookies.get_dict()['_csrf_mobile'],
|
|
||||||
'Login[password]': encrypt(password),
|
|
||||||
'Login[username]': encrypt(username),
|
|
||||||
'Login[verifyCode]': capText
|
|
||||||
})
|
|
||||||
if login_r.text == '8':
|
|
||||||
print('Login:识别的验证码错误')
|
|
||||||
continue
|
|
||||||
if 'fail' in login_r.text:
|
|
||||||
tryTime += 9
|
|
||||||
raise Exception('Login:账号密码错误')
|
|
||||||
print('登录成功')
|
|
||||||
r = json.loads(bjySession.get("https://m.bjyouth.net/dxx/course").text)
|
|
||||||
if 'newCourse' not in r:
|
|
||||||
print(r)
|
|
||||||
# newCourse滞后于course中的课程,所以这里用course中的最新课程
|
|
||||||
url = r['data']['data'][0]['url']
|
|
||||||
title = r['data']['data'][0]['title']
|
|
||||||
courseId = r['data']['data'][0]['id']
|
|
||||||
break
|
|
||||||
except:
|
|
||||||
time.sleep(3)
|
|
||||||
tryTime += 1
|
|
||||||
|
|
||||||
if not url:
|
def process_account(row, result_list):
|
||||||
print('登入失败,退出')
|
username, password = row[0], row[1]
|
||||||
return 0
|
|
||||||
|
|
||||||
info = bjySession.get('https://m.bjyouth.net/dxx/my').json()['data']
|
try:
|
||||||
name = info['name'].split('(')[0]
|
output_bytes = subprocess.check_output(["python", "one_account.py", username, password])
|
||||||
org = info['org'].split('(')[0]
|
output = output_bytes.decode('utf-8') # Use utf-8 encoding to decode the bytes
|
||||||
print(f'当前用户: {name} {org}')
|
except subprocess.CalledProcessError as e:
|
||||||
|
output = f"Error: {e.output.decode('utf-8')}" # Use utf-8 encoding to decode the error output
|
||||||
|
|
||||||
nOrgID = int(bjySession.get('https://m.bjyouth.net/dxx/is-league').text)
|
result_list.append((username, output)) # 将结果存入结果列表
|
||||||
|
|
||||||
learnedInfo = 'https://m.bjyouth.net/dxx/my-study?page=1&limit=15&year=' + time.strftime("%Y", time.localtime())
|
|
||||||
haveLearned = bjySession.get(learnedInfo).json()
|
|
||||||
|
|
||||||
if f"学习课程:《{title}》" in list(map(lambda x: x['text'], haveLearned['data'])):
|
|
||||||
print(f'{title} 在运行前已完成,退出')
|
|
||||||
return 1
|
|
||||||
study_url = f"https://m.bjyouth.net/dxx/check"
|
|
||||||
r = bjySession.post(study_url, json={"id": str(courseId), "org_id": int(nOrgID)}) # payload
|
|
||||||
if r.text:
|
|
||||||
print(f'Unexpected response: {r.text}')
|
|
||||||
return 0
|
|
||||||
|
|
||||||
haveLearned = bjySession.get(learnedInfo).json()
|
|
||||||
if f"学习课程:《{title}》" in list(map(lambda x: x['text'], haveLearned['data'])):
|
|
||||||
print(f'{title} 成功完成学习')
|
|
||||||
return 1
|
|
||||||
else:
|
|
||||||
print(f'完成{title}, 但未在已学习列表中找到, 请手动检查')
|
|
||||||
return 0
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# 读取account.csv配置文件,格式为:用户名,密码,备注
|
|
||||||
success = 0
|
success = 0
|
||||||
count = 0
|
count = 0
|
||||||
|
all_results = [] # 用于存储每个线程的结果列表
|
||||||
|
|
||||||
with open('account.csv', 'r', encoding='utf-8') as f:
|
with open('account.csv', 'r', encoding='utf-8') as f:
|
||||||
reader = csv.reader(f)
|
reader = csv.reader(f)
|
||||||
|
threads = []
|
||||||
|
|
||||||
for row in reader:
|
for row in reader:
|
||||||
count += 1
|
count += 1
|
||||||
print(f'用户{count}: {row[0]}')
|
print(f'用户{count}: {row[2]}({row[0]})')
|
||||||
if study(row[0], row[1]):
|
|
||||||
success += 1
|
|
||||||
print("\n")
|
|
||||||
print(f'共{count}个用户,成功{success}个,失败{count-success}个')
|
|
||||||
|
|
||||||
|
# 创建一个线程并启动,为每个线程创建独立的结果列表
|
||||||
|
results = []
|
||||||
|
thread = threading.Thread(target=process_account, args=(row, results))
|
||||||
|
threads.append(thread)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
# 将每个线程的结果列表存入总的结果列表
|
||||||
|
all_results.append(results)
|
||||||
|
|
||||||
|
# 等待所有线程执行完毕
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
print('\n所有用户运行完毕\n')
|
||||||
|
# 统计成功和失败的用户
|
||||||
|
for results in all_results:
|
||||||
|
for username, output in results:
|
||||||
|
if "Error" not in output:
|
||||||
|
success += 1
|
||||||
|
print(f'用户{username}运行结果:\n{output}')
|
||||||
|
|
||||||
|
print(f'共{count}个用户,成功{success}个,失败{count - success}个')
|
||||||
|
83
one_account.py
Normal file
83
one_account.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
import json
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
import requests
|
||||||
|
from utility import encrypt, cap_recognize
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
def study(username,password):
|
||||||
|
# 返回1:成功
|
||||||
|
# 返回0:失败
|
||||||
|
tryTime = 0
|
||||||
|
url = ''
|
||||||
|
while tryTime < 4:
|
||||||
|
try:
|
||||||
|
bjySession = requests.session() # 创建会话
|
||||||
|
bjySession.timeout = 5 # 设置会话超时
|
||||||
|
bjySession.headers.update({"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', })
|
||||||
|
touch = bjySession.get(url="https://m.bjyouth.net/site/login")
|
||||||
|
capUrl = "https://m.bjyouth.net" + re.findall(r'src="(/site/captcha.+)" alt=', touch.text)[0]
|
||||||
|
capText = cap_recognize(bjySession.get(url=capUrl).content)
|
||||||
|
login_r = bjySession.post('https://m.bjyouth.net/site/login',
|
||||||
|
data={
|
||||||
|
'_csrf_mobile': bjySession.cookies.get_dict()['_csrf_mobile'],
|
||||||
|
'Login[password]': encrypt(password),
|
||||||
|
'Login[username]': encrypt(username),
|
||||||
|
'Login[verifyCode]': capText
|
||||||
|
})
|
||||||
|
if login_r.text == '8':
|
||||||
|
print('Login:识别的验证码错误')
|
||||||
|
continue
|
||||||
|
if 'fail' in login_r.text:
|
||||||
|
tryTime += 9
|
||||||
|
raise Exception('Login:账号密码错误')
|
||||||
|
print('登录成功')
|
||||||
|
r = json.loads(bjySession.get("https://m.bjyouth.net/dxx/course").text)
|
||||||
|
if 'newCourse' not in r:
|
||||||
|
print(r)
|
||||||
|
# newCourse滞后于course中的课程,所以这里用course中的最新课程
|
||||||
|
url = r['data']['data'][0]['url']
|
||||||
|
title = r['data']['data'][0]['title']
|
||||||
|
courseId = r['data']['data'][0]['id']
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
time.sleep(3)
|
||||||
|
tryTime += 1
|
||||||
|
|
||||||
|
if not url:
|
||||||
|
print('登入失败,退出')
|
||||||
|
return 0
|
||||||
|
|
||||||
|
info = bjySession.get('https://m.bjyouth.net/dxx/my').json()['data']
|
||||||
|
name = info['name'].split('(')[0]
|
||||||
|
org = info['org'].split('(')[0]
|
||||||
|
print(f'当前用户: {name} {org}')
|
||||||
|
|
||||||
|
nOrgID = int(bjySession.get('https://m.bjyouth.net/dxx/is-league').text)
|
||||||
|
|
||||||
|
learnedInfo = 'https://m.bjyouth.net/dxx/my-study?page=1&limit=15&year=' + time.strftime("%Y", time.localtime())
|
||||||
|
haveLearned = bjySession.get(learnedInfo).json()
|
||||||
|
|
||||||
|
if f"学习课程:《{title}》" in list(map(lambda x: x['text'], haveLearned['data'])):
|
||||||
|
print(f'{title} 在运行前已完成,退出')
|
||||||
|
return 1
|
||||||
|
study_url = f"https://m.bjyouth.net/dxx/check"
|
||||||
|
r = bjySession.post(study_url, json={"id": str(courseId), "org_id": int(nOrgID)}) # payload
|
||||||
|
if r.text:
|
||||||
|
print(f'Unexpected response: {r.text}')
|
||||||
|
return 0
|
||||||
|
|
||||||
|
haveLearned = bjySession.get(learnedInfo).json()
|
||||||
|
if f"学习课程:《{title}》" in list(map(lambda x: x['text'], haveLearned['data'])):
|
||||||
|
print(f'{title} 成功完成学习')
|
||||||
|
return 1
|
||||||
|
else:
|
||||||
|
print(f'完成{title}, 但未在已学习列表中找到, 请手动检查')
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(description='Login script with captcha for multiple users.')
|
||||||
|
parser.add_argument('username', type=str, help='Username for login')
|
||||||
|
parser.add_argument('password', type=str, help='Password for login')
|
||||||
|
args = parser.parse_args()
|
||||||
|
study(args.username, args.password)
|
Loading…
x
Reference in New Issue
Block a user