重构,精简相关代码,更新ddddocr

This commit is contained in:
Anyexyz 2023-12-18 12:09:45 +00:00
parent c82d11ff4b
commit 610a6cd329
3 changed files with 619 additions and 598 deletions

View File

@ -1,29 +1,21 @@
import json import json
import re import re
import time import time
import traceback
import requests import requests
from utility import encrypt, cap_recognize from utility import encrypt, cap_recognize
def study(username,password):
# 返回1:成功
def study(username, password, ua): # 返回0:失败
# return 1:success;0:fail
url = ''
tryTime = 0 tryTime = 0
url = ''
while tryTime < 4: while tryTime < 4:
try: try:
bjySession = requests.session() bjySession = requests.session() # 创建会话
bjySession.timeout = 5 # set session timeout bjySession.timeout = 5 # 设置会话超时
bjySession.headers.update({"User-Agent": ua, }) bjySession.headers.update({"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', })
touch = bjySession.get(url="https://m.bjyouth.net/site/login") touch = bjySession.get(url="https://m.bjyouth.net/site/login")
capUrl = "https://m.bjyouth.net" + re.findall( capUrl = "https://m.bjyouth.net" + re.findall(r'src="(/site/captcha.+)" alt=', touch.text)[0]
r'src="(/site/captcha.+)" alt=', touch.text)[0]
if "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQD5uIDebA2qU746e/NVPiQSBA0Q" not in touch.text:
print("记录的公钥没有出现")
capText = cap_recognize(bjySession.get(url=capUrl).content) capText = cap_recognize(bjySession.get(url=capUrl).content)
# print(f'验证码识别: {capText}')
login_r = bjySession.post('https://m.bjyouth.net/site/login', login_r = bjySession.post('https://m.bjyouth.net/site/login',
data={ data={
'_csrf_mobile': bjySession.cookies.get_dict()['_csrf_mobile'], '_csrf_mobile': bjySession.cookies.get_dict()['_csrf_mobile'],
@ -31,7 +23,6 @@ def study(username, password, ua):
'Login[username]': encrypt(username), 'Login[username]': encrypt(username),
'Login[verifyCode]': capText 'Login[verifyCode]': capText
}) })
if login_r.text == '8': if login_r.text == '8':
print('Login:识别的验证码错误') print('Login:识别的验证码错误')
continue continue
@ -40,9 +31,9 @@ def study(username, password, ua):
raise Exception('Login:账号密码错误') raise Exception('Login:账号密码错误')
print('登录成功') print('登录成功')
r = json.loads(bjySession.get("https://m.bjyouth.net/dxx/course").text) r = json.loads(bjySession.get("https://m.bjyouth.net/dxx/course").text)
# "rize" LOL
if 'newCourse' not in r: if 'newCourse' not in r:
print(r) print(r)
# newCourse滞后于course中的课程所以这里用course中的最新课程
url = r['data']['data'][0]['url'] url = r['data']['data'][0]['url']
title = r['data']['data'][0]['title'] title = r['data']['data'][0]['title']
courseId = r['data']['data'][0]['id'] courseId = r['data']['data'][0]['id']
@ -50,51 +41,38 @@ def study(username, password, ua):
except: except:
time.sleep(3) time.sleep(3)
tryTime += 1 tryTime += 1
print(traceback.format_exc())
if not url: if not url:
print('登入失败,退出') print('登入失败,退出')
return 0 return 0
orgIdTemp = '' info = bjySession.get('https://m.bjyouth.net/dxx/my').json()['data']
orgPattern = re.compile(r'\(|\s*(\d+)\s*|\)') # 组织id应该是被括号包的 name = info['name'].split('(')[0]
learnedInfo = 'https://m.bjyouth.net/dxx/my-study?page=1&limit=15&year=' + time.strftime("%Y", time.localtime()) org = info['org'].split('(')[0]
haveLearned = bjySession.get(learnedInfo).json() print(f'当前用户: {name} {org}')
orgID = ""
try:
orgIdTemp = orgPattern.search(haveLearned['data'][0]['orgname'])
orgID = orgIdTemp.group(1)
except:
print('获取组织id-2')
orgIdTemp = orgPattern.search(bjySession.get('https://m.bjyouth.net/dxx/my').json()['data']['org'])
if orgIdTemp:
orgID = orgIdTemp.group(1)
if not orgID:
orgID = '172442'
print(f"无法获取orgID")
nOrgID = int(bjySession.get('https://m.bjyouth.net/dxx/is-league').text) nOrgID = int(bjySession.get('https://m.bjyouth.net/dxx/is-league').text)
learnedInfo = 'https://m.bjyouth.net/dxx/my-study?page=1&limit=15&year=' + time.strftime("%Y", time.localtime())
haveLearned = bjySession.get(learnedInfo).json()
if f"学习课程:《{title}" in list(map(lambda x: x['text'], haveLearned['data'])): if f"学习课程:《{title}" in list(map(lambda x: x['text'], haveLearned['data'])):
print(f'{title} 在运行前已完成,退出') print(f'{title} 在运行前已完成,退出')
return 1 return 1
study_url = f"https://m.bjyouth.net/dxx/check" study_url = f"https://m.bjyouth.net/dxx/check"
r = bjySession.post(study_url, json={"id": str(courseId), "org_id": int(nOrgID)}) # payload r = bjySession.post(study_url, json={"id": str(courseId), "org_id": int(nOrgID)}) # payload
if r.text: if r.text:
print(f'Unexpected response: {r.text}') print(f'开始学习{title}')
return 0 return 0
haveLearned = bjySession.get(learnedInfo).json() haveLearned = bjySession.get(learnedInfo).json()
if int(orgID) != nOrgID:
raise Exception('组织id不匹配如果看到这个请开个issue说下')
if f"学习课程:《{title}" in list(map(lambda x: x['text'], haveLearned['data'])): if f"学习课程:《{title}" in list(map(lambda x: x['text'], haveLearned['data'])):
print(f'{title} 成功完成学习') print(f'{title} 成功完成学习')
return 1 return 1
else: else:
print(f'完成{title}, 但未在检查中确认') print(f'完成{title}, 但未在已学习列表中找到, 请手动检查')
return 0 return 0
if __name__ == '__main__':
study('16634486740','Anye20031003')

View File

@ -5,11 +5,14 @@ warnings.filterwarnings('ignore')
import io import io
import os import os
import base64 import base64
import json
import pathlib
import onnxruntime import onnxruntime
from PIL import Image from PIL import Image
import numpy as np import numpy as np
def base64_to_image(img_base64): def base64_to_image(img_base64):
img_data = base64.b64decode(img_base64) img_data = base64.b64decode(img_base64)
return Image.open(io.BytesIO(img_data)) return Image.open(io.BytesIO(img_data))
@ -21,24 +24,37 @@ def get_img_base64(single_image_path):
return img_base64.decode() return img_base64.decode()
class TypeError(Exception):
pass
class DdddOcr(object): class DdddOcr(object):
def __init__(self, use_gpu: bool = False, device_id: int = 0): def __init__(self, ocr: bool = True, det: bool = False, use_gpu: bool = False,
device_id: int = 0, import_onnx_path: str = "", charsets_path: str = ""):
if not hasattr(Image, 'ANTIALIAS'):
setattr(Image, 'ANTIALIAS', Image.LANCZOS)
self.use_import_onnx = False
self.__word = False
self.__resize = []
self.__channel = 1
if import_onnx_path != "":
det = False
ocr = False
self.__graph_path = import_onnx_path
with open(charsets_path, 'r', encoding="utf-8") as f:
info = json.loads(f.read())
self.__charset = info['charset']
self.__word = info['word']
self.__resize = info['image']
self.__channel = info['channel']
self.use_import_onnx = True
if det:
ocr = False
self.__graph_path = os.path.join(os.path.dirname(__file__), 'common_det.onnx')
self.__charset = []
if ocr:
self.__graph_path = os.path.join(os.path.dirname(__file__), 'common.onnx') self.__graph_path = os.path.join(os.path.dirname(__file__), 'common.onnx')
if use_gpu:
self.__providers = [
('CUDAExecutionProvider', {
'device_id': device_id,
'arena_extend_strategy': 'kNextPowerOfTwo',
'cuda_mem_limit': 2 * 1024 * 1024 * 1024,
'cudnn_conv_algo_search': 'EXHAUSTIVE',
'do_copy_in_default_stream': True,
}),
]
else:
self.__providers = [
'CPUExecutionProvider',
]
self.__ort_session = onnxruntime.InferenceSession(self.__graph_path, providers=self.__providers)
self.__charset = ["", "", "", "", "", "", "", "", "", "", "", "", "", "6", self.__charset = ["", "", "", "", "", "", "", "", "", "", "", "", "", "6",
"", "", "", "", "", "", "", "", "", "", "", "", "", "鴿", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "鴿", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
@ -553,20 +569,71 @@ class DdddOcr(object):
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "婿", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "婿", "", "", "", "", "", "", "", "", "",
"", "", "", ""] "", "", "", ""]
self.det = det
def classification(self, img_bytes: bytes = None, img_base64: str = None): if use_gpu:
if img_bytes: self.__providers = [
image = Image.open(io.BytesIO(img_bytes)) ('CUDAExecutionProvider', {
'device_id': device_id,
'arena_extend_strategy': 'kNextPowerOfTwo',
'cuda_mem_limit': 2 * 1024 * 1024 * 1024,
'cudnn_conv_algo_search': 'EXHAUSTIVE',
'do_copy_in_default_stream': True,
}),
]
else: else:
image = base64_to_image(img_base64) self.__providers = [
image = image.resize((int(image.size[0] * (64 / image.size[1])), 64), Image.LANCZOS).convert('L') 'CPUExecutionProvider',
]
if ocr or det or self.use_import_onnx:
self.__ort_session = onnxruntime.InferenceSession(self.__graph_path, providers=self.__providers)
def classification(self, img):
if not isinstance(img, (bytes, str, pathlib.PurePath, Image.Image)):
raise TypeError("未知图片类型")
if isinstance(img, bytes):
image = Image.open(io.BytesIO(img))
elif isinstance(img, Image.Image):
image = img.copy()
elif isinstance(img, str):
image = base64_to_image(img)
else:
assert isinstance(img, pathlib.PurePath)
image = Image.open(img)
if not self.use_import_onnx:
image = image.resize((int(image.size[0] * (64 / image.size[1])), 64), Image.ANTIALIAS).convert('L')
else:
if self.__resize[0] == -1:
if self.__word:
image = image.resize((self.__resize[1], self.__resize[1]), Image.ANTIALIAS)
else:
image = image.resize((int(image.size[0] * (self.__resize[1] / image.size[1])), self.__resize[1]), Image.ANTIALIAS)
else:
image = image.resize((self.__resize[0], self.__resize[1]), Image.ANTIALIAS)
if self.__channel == 1:
image = image.convert('L')
else:
image = image.convert('RGB')
image = np.array(image).astype(np.float32) image = np.array(image).astype(np.float32)
image = np.expand_dims(image, axis=0) / 255. image = np.expand_dims(image, axis=0) / 255.
if not self.use_import_onnx:
image = (image - 0.5) / 0.5 image = (image - 0.5) / 0.5
ort_inputs = {'input1': np.array([image])} else:
if self.__channel == 1:
image = (image - 0.456) / 0.224
else:
image = (image - np.array([0.485, 0.456, 0.406])) / np.array([0.229, 0.224, 0.225])
image = image[0]
image = image.transpose((2, 0, 1))
ort_inputs = {'input1': np.array([image]).astype(np.float32)}
ort_outs = self.__ort_session.run(None, ort_inputs) ort_outs = self.__ort_session.run(None, ort_inputs)
result = [] result = []
last_item = 0 last_item = 0
if self.__word:
for item in ort_outs[1]:
result.append(self.__charset[item])
else:
for item in ort_outs[0][0]: for item in ort_outs[0][0]:
if item == last_item: if item == last_item:
continue continue

24
main.py
View File

@ -1,24 +0,0 @@
import os
import time
import sys
from study import study
ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
accounts=[('*********', '*********')]
print(f'账号数量:{len(accounts)}')
successful = 0
count = 0
for username, password in accounts:
if username=='********':
continue
count += 1
print(f'--User {count}--')
if study(username, password, ua):
successful += 1
failed = count - successful
print('--Summary--')
print(f'成功:{successful},失败:{failed}')
if failed != 0:
raise Exception(f'{failed}个失败!')