to learn goupby learn pandas groupby groupby learn pandas groupby 优化正则提取番号和集数 待理解下载图片逻辑 还有剪裁+背景图逻辑 修改所有config[ 将整理生成nfo的代码 可缓存番号信息和缩略图和海报 可以识别番号后集数和尾部集数,赞不能分辨-C中文字幕片 改正一个错误 嵌套字典存储数据 整理函数 修正匹配时间正则 pipenv 添加依赖 修改优先取三位数字的规则:heyzo四位数除外 添加了依赖 和 有番号的优化 修改了啥 我也记不得了
417 lines
17 KiB
Python
Executable File
417 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import glob
|
||
import os
|
||
import time
|
||
import fuckit
|
||
from tenacity import retry, stop_after_delay, wait_fixed
|
||
import json
|
||
import shutil
|
||
import itertools
|
||
import argparse
|
||
from pathlib import Path
|
||
|
||
from core import *
|
||
from ConfigApp import ConfigApp
|
||
from PathNameProcessor import PathNameProcessor
|
||
|
||
# TODO 封装聚合解耦:CORE
|
||
# TODO (学习)统一依赖管理工具
|
||
# TODO 不同媒体服务器尽量兼容统一一种元数据 如nfo 海报等(emby,jellyfin,plex)
|
||
# TODO 字幕整理功能 文件夹中读取所有字幕 并提番号放入对应缓存文件夹中TEMP
|
||
|
||
config = ConfigApp()
|
||
|
||
|
||
def safe_list_get(list_in, idx, default=None):
|
||
"""
|
||
数组安全取值
|
||
:param list_in:
|
||
:param idx:
|
||
:param default:
|
||
:return:
|
||
"""
|
||
try:
|
||
return list_in[idx]
|
||
except IndexError:
|
||
return default
|
||
|
||
|
||
def UpdateCheck(version):
|
||
if UpdateCheckSwitch() == '1':
|
||
html2 = get_html('https://raw.githubusercontent.com/yoshiko2/AV_Data_Capture/master/update_check.json')
|
||
html = json.loads(str(html2))
|
||
|
||
if not version == html['version']:
|
||
print('[*] * New update ' + html['version'] + ' *')
|
||
print('[*] ↓ Download ↓')
|
||
print('[*] ' + html['download'])
|
||
print('[*]======================================================')
|
||
else:
|
||
print('[+]Update Check disabled!')
|
||
|
||
|
||
def argparse_get_file():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("file", default='', nargs='?', help="Write the file path on here")
|
||
args = parser.parse_args()
|
||
if args.file == '':
|
||
return ''
|
||
else:
|
||
return args.file
|
||
|
||
|
||
def movie_lists(escape_folders):
|
||
escape_folders = re.split('[,,]', escape_folders)
|
||
total = []
|
||
|
||
for root, dirs, files in os.walk(config.search_folder):
|
||
if root in escape_folders:
|
||
continue
|
||
for file in files:
|
||
if re.search(PathNameProcessor.pattern_of_file_name_suffixes, file, re.IGNORECASE):
|
||
path = os.path.join(root, file)
|
||
total.append(path)
|
||
return total
|
||
|
||
|
||
# def CEF(path):
|
||
# try:
|
||
# files = os.listdir(path) # 获取路径下的子文件(夹)列表
|
||
# for file in files:
|
||
# os.removedirs(path + '/' + file) # 删除这个空文件夹
|
||
# print('[+]Deleting empty folder', path + '/' + file)
|
||
# except:
|
||
# a = ''
|
||
#
|
||
|
||
|
||
def get_numbers(paths):
|
||
"""提取对应路径的番号+集数"""
|
||
|
||
def get_number(filepath, absolute_path=False):
|
||
"""
|
||
获取番号,集数
|
||
:param filepath:
|
||
:param absolute_path:
|
||
:return:
|
||
"""
|
||
name = filepath.upper() # 转大写
|
||
if absolute_path:
|
||
name = name.replace('\\', '/')
|
||
# 移除干扰字段
|
||
name = PathNameProcessor.remove_distractions(name)
|
||
# 抽取 文件路径中可能存在的尾部集数,和抽取尾部集数的后的文件路径
|
||
suffix_episode, name = PathNameProcessor.extract_suffix_episode(name)
|
||
# 抽取 文件路径中可能存在的 番号后跟随的集数 和 处理后番号
|
||
episode_behind_code, code_number = PathNameProcessor.extract_code(name)
|
||
# 无番号 则设置空字符
|
||
code_number = code_number if code_number else ''
|
||
# 优先取尾部集数,无则取番号后的集数(几率低),都无则为空字符
|
||
episode = suffix_episode if suffix_episode else episode_behind_code if episode_behind_code else ''
|
||
|
||
return code_number, episode
|
||
|
||
maps = {}
|
||
for path in paths:
|
||
number, episode = get_number(path)
|
||
maps[path] = (number, episode)
|
||
|
||
return maps
|
||
|
||
|
||
def create_folder(paths):
|
||
for path_to_make in paths:
|
||
if path_to_make:
|
||
try:
|
||
os.makedirs(path_to_make)
|
||
except FileExistsError as e:
|
||
# name = f'{folder=}'.split('=')[0].split('.')[-1]
|
||
print(path_to_make + " 已经存在")
|
||
pass
|
||
except Exception as exception:
|
||
print('! 创建文件夹 ' + path_to_make + ' 失败,文件夹路径错误或权限不够')
|
||
raise exception
|
||
else:
|
||
raise Exception('!创建的文件夹路径为空,请确认')
|
||
|
||
|
||
if __name__ == '__main__':
|
||
version = '2.8.2'
|
||
|
||
print('[*]================== AV Data Capture ===================')
|
||
print('[*] Version ' + version)
|
||
print('[*]======================================================')
|
||
|
||
# UpdateCheck(version)
|
||
|
||
CreatFailedFolder(config.failed_folder)
|
||
os.chdir(os.getcwd())
|
||
|
||
# 创建文件夹
|
||
create_folder([config.failed_folder, config.search_folder, config.temp_folder])
|
||
|
||
# temp 文件夹中infos放 番号json信息,pics中放图片信息
|
||
path_infos = config.temp_folder + '/infos'
|
||
path_pics = config.temp_folder + '/pics'
|
||
|
||
create_folder([path_infos, path_pics])
|
||
|
||
# 遍历搜索目录下所有视频的路径
|
||
movie_list = movie_lists(config.escape_folder)
|
||
|
||
# 以下是从文本中提取测试的数据
|
||
# f = open('TestPathNFO.txt', 'r')
|
||
# f = open('TestPathSpecial.txt', 'r')
|
||
# movie_list = [line[:-1] for line in f.readlines()]
|
||
# f.close()
|
||
|
||
# 获取 番号,集数,路径 的字典->list
|
||
code_ep_paths = [[codeEposode[0], codeEposode[1], path] for path, codeEposode in get_numbers(movie_list).items()]
|
||
[print(i) for i in code_ep_paths]
|
||
# 按番号分组片子列表(重点),用于寻找相同番号的片子
|
||
'''
|
||
这里利用pandas分组 "https://pandas.pydata.org/pandas-docs/stable/user_guide/groupby.html"
|
||
|
||
'''
|
||
# # 设置打印时显示所有列
|
||
# pd.set_option('display.max_columns', None)
|
||
# # 显示所有行
|
||
# pd.set_option('display.max_rows', None)
|
||
# # 设置value的显示长度为100,默认为50
|
||
# pd.set_option('max_colwidth', 30)
|
||
# # 创建框架
|
||
# df = pd.DataFrame(code_ep_paths, columns=('code', 'ep', 'path'))
|
||
# # 以番号分组
|
||
# groupedCode_code_ep_paths = df.groupby(['code'])
|
||
# # print(df.groupby(['code', 'ep']).describe().unstack())
|
||
# grouped_code_ep = df.groupby(['code', 'ep'])['path']
|
||
#
|
||
sorted_code_list = sorted(code_ep_paths, key=lambda code_ep_path: code_ep_path[0])
|
||
group_code_list = itertools.groupby(sorted_code_list, key=lambda code_ep_path: code_ep_path[0])
|
||
|
||
|
||
def group_code_list_to_dict(group_code_list):
|
||
data_dict = {}
|
||
for code, code_ep_path_group in group_code_list:
|
||
code_ep_path_list = list(code_ep_path_group)
|
||
eps_of_code = {}
|
||
group_ep_list = itertools.groupby(code_ep_path_list, key=lambda code_ep_path: code_ep_path[1])
|
||
for ep, group_ep_group in group_ep_list:
|
||
group_ep_list = list(group_ep_group)
|
||
eps_of_code[ep] = [code_ep_path[2] for code_ep_path in group_ep_list]
|
||
data_dict[code] = eps_of_code
|
||
|
||
return data_dict
|
||
|
||
|
||
def print_same_code_ep_path(data_dict_in):
|
||
for code_in in data_dict_in:
|
||
ep_path_list = data_dict_in[code_in]
|
||
if len(ep_path_list) > 1:
|
||
print('--' * 60)
|
||
print("|" + (code_in if code_in else 'unknown') + ":")
|
||
|
||
# group_ep_list = itertools.groupby(code_ep_path_list.items(), key=lambda code_ep_path: code_ep_path[0])
|
||
for ep in ep_path_list:
|
||
path_list = ep_path_list[ep]
|
||
print('--' * 12)
|
||
ep = ep if ep else ' '
|
||
if len(path_list) == 1:
|
||
print('| 集数:' + ep + ' 文件: ' + path_list[0])
|
||
else:
|
||
print('| 集数:' + ep + ' 文件: ')
|
||
for path in path_list:
|
||
print('| ' + path)
|
||
|
||
else:
|
||
pass
|
||
|
||
|
||
# 分好组的数据 {code:{ep:[path]}}
|
||
data_dict_groupby_code_ep = group_code_list_to_dict(group_code_list)
|
||
|
||
print('--' * 100)
|
||
print("找到影片数量:" + str(len(movie_list)))
|
||
print("合计番号数量:" + str(len(data_dict_groupby_code_ep)) + " (多个相同番号的影片只统计一个,不能识别的番号 都统一为'unknown')")
|
||
print('Warning:!!!! 以下为相同番号的电影明细')
|
||
print('◤' + '--' * 80)
|
||
print_same_code_ep_path(data_dict_groupby_code_ep)
|
||
print('◣' + '--' * 80)
|
||
|
||
isContinue = input('任意键继续? N 退出 \n')
|
||
if isContinue.strip(' ') == "N":
|
||
exit(1)
|
||
|
||
|
||
# ========== 野鸡番号拖动 ==========
|
||
# number_argparse = argparse_get_file()
|
||
# if not number_argparse == '':
|
||
# print("[!]Making Data for [" + number_argparse + "], the number is [" + getNumber(number_argparse,
|
||
# absolute_path=True) + "]")
|
||
# nfo = core_main(number_argparse, getNumber(number_argparse, absolute_path=True))
|
||
# print("[*]======================================================")
|
||
# CEF(config.success_folder)
|
||
# CEF(config.failed_folder)
|
||
# print("[+]All finished!!!")
|
||
# input("[+][+]Press enter key exit, you can check the error messge before you exit.")
|
||
# os._exit(0)
|
||
# ========== 野鸡番号拖动 ==========
|
||
|
||
def download_code_infos(code_list, is_read_cache=True):
|
||
"""
|
||
遍历按番号分组的集合,刮取番号信息并缓存
|
||
|
||
:param is_read_cache: 是否读取缓存数据
|
||
:param code_list:
|
||
:return: {code:nfo}
|
||
"""
|
||
count_all_grouped = len(code_list)
|
||
count = 0
|
||
code_info_dict = {}
|
||
|
||
for code in code_list:
|
||
count = count + 1
|
||
percentage = str(count / int(count_all_grouped) * 100)[:4] + '%'
|
||
print('[!] - ' + percentage + ' [' + str(count) + '/' + str(count_all_grouped) + '] -')
|
||
try:
|
||
print("[!]搜刮数据 [" + code + "]")
|
||
if code:
|
||
# 创建番号的文件夹
|
||
file_path = path_infos + '/' + code + '.json'
|
||
nfo = {}
|
||
# 读取缓存信息,如果没有则联网搜刮
|
||
|
||
path = Path(file_path)
|
||
if is_read_cache and (path.exists() and path.is_file() and path.stat().st_size > 0):
|
||
print('找到缓存信息')
|
||
with open(file_path) as fp:
|
||
nfo = json.load(fp)
|
||
else:
|
||
|
||
# 核心功能 - 联网抓取信息字典
|
||
print('联网搜刮')
|
||
nfo = core_main(code)
|
||
print('正在写入', end='')
|
||
|
||
# 把缓存信息写入缓存文件夹中,有时会设备占用而失败,重试即可
|
||
@retry(stop=stop_after_delay(3), wait=wait_fixed(2))
|
||
def read_file():
|
||
with open(file_path, 'w') as fp:
|
||
json.dump(nfo, fp)
|
||
|
||
read_file()
|
||
print('完成!')
|
||
# 将番号信息放入字典
|
||
code_info_dict[code] = nfo
|
||
print("[*]======================================================")
|
||
|
||
except Exception as e: # 番号的信息获取失败
|
||
code_info_dict[code] = ''
|
||
print("找不到信息:" + code + ',Reason:' + str(e))
|
||
|
||
# if config.soft_link:
|
||
# print('[-]Link', file_path_name, 'to failed folder')
|
||
# os.symlink(file_path_name, config.failed_folder + '/')
|
||
# else:
|
||
# try:
|
||
# print('[-]Move ' + file_path_name + ' to failed folder:' + config.failed_folder)
|
||
# shutil.move(file_path_name, config.failed_folder + '/')
|
||
# except FileExistsError:
|
||
# print('[!]File exists in failed!')
|
||
# except:
|
||
# print('[+]skip')
|
||
continue
|
||
return code_info_dict
|
||
|
||
|
||
print('----------------------------------')
|
||
code_infos = download_code_infos(data_dict_groupby_code_ep)
|
||
print("----未找到番号数据的番号----")
|
||
print([print(code) for code in code_infos if code_infos[code] == ''])
|
||
print("-------------------------")
|
||
|
||
|
||
def download_images_of_nfos(code_info_dict):
|
||
"""
|
||
遍历番号信息,下载番号电影的海报,图片
|
||
:param code_info_dict:
|
||
:return: 无图片的信息的番号
|
||
"""
|
||
|
||
code_list_empty_image = []
|
||
for code in code_info_dict:
|
||
nfo = code_info_dict[code]
|
||
if len(nfo.keys()) == 0:
|
||
code_list_empty_image.append(code)
|
||
continue
|
||
|
||
code_pics_folder_to_save = path_pics + '/' + code
|
||
# 1 创建 番号文件夹
|
||
os.makedirs(code_pics_folder_to_save, exist_ok=True)
|
||
# 下载缩略图
|
||
if nfo['imagecut'] == 3: # 3 是缩略图
|
||
path = Path(code_pics_folder_to_save + '/' + 'thumb.png')
|
||
if path.exists() and path.is_file() and path.stat().st_size > 0:
|
||
print(code + ':缩略图已有缓存')
|
||
else:
|
||
print(code + ':缩略图下载中...')
|
||
download_file(nfo['cover_small'], code_pics_folder_to_save, 'thumb.png')
|
||
print(code + ':缩略图下载完成')
|
||
# 下载海报
|
||
path = Path(code_pics_folder_to_save + '/' + 'poster.png')
|
||
if path.exists() and path.is_file() and path.stat().st_size > 0:
|
||
print(code + ':海报已有缓存')
|
||
else:
|
||
print(code + ':海报下载中...')
|
||
download_file(nfo['cover'], code_pics_folder_to_save, 'poster.png')
|
||
print(code + ':海报下载完成')
|
||
return code_list_empty_image
|
||
|
||
|
||
|
||
code_list_empty = download_images_of_nfos(code_infos)
|
||
print("----未找到集数的番号----")
|
||
print([print(code) for code in code_list_empty])
|
||
print("------搜刮未找到集数的番号------")
|
||
code_infos_of_no_ep = download_code_infos(code_list_empty, is_read_cache=False)
|
||
print("----还是未找到番号数据的番号----")
|
||
print([print(code) for code in code_infos_of_no_ep if code_infos_of_no_ep[code] == ''])
|
||
print("----------------------")
|
||
# 开始操作
|
||
# # 2 创建缩略图海报
|
||
# if nfo['imagecut'] == 3: # 3 是缩略图
|
||
# download_cover_file(nfo['cover_small'], code, code_pics_folder_to_save)
|
||
# # 3 创建图
|
||
# download_image(nfo['cover'], code, code_pics_folder_to_save)
|
||
# # 4 剪裁
|
||
# crop_image(nfo['imagecut'], code, code_pics_folder_to_save)
|
||
# # 5 背景图
|
||
# copy_images_to_background_image(code, code_pics_folder_to_save)
|
||
# 6 创建 mame.nfo(不需要,需要时从infos中josn文件转为nfo文件)
|
||
# make_nfo_file(nfo, code, temp_path_to_save)
|
||
# 相同番号处理:按集数添加-CD[X];视频格式 and 大小 分;
|
||
# TODO 方式1 刮削:添加nfo,封面,内容截图等
|
||
# 6 创建 mame.nfo(不需要,需要时从infos中josn文件转为nfo文件)
|
||
make_nfo_file(nfo, code, temp_path_to_save)
|
||
# TODO 方式2 整理:按规则移动影片,字幕 到 演员,发行商,有无🐎 等
|
||
|
||
# if config.program_mode == '1':
|
||
# if multi_part == 1:
|
||
# number += part # 这时number会被附加上CD1后缀
|
||
# smallCoverCheck(path, number, imagecut, json_data['cover_small'], c_word, option, filepath, config.failed_folder) # 检查小封面
|
||
# imageDownload(option, json_data['cover'], number, c_word, path, multi_part, filepath, config.failed_folder) # creatFoder会返回番号路径
|
||
# cutImage(option, imagecut, path, number, c_word) # 裁剪图
|
||
# copyRenameJpgToBackdrop(option, path, number, c_word)
|
||
# PrintFiles(option, path, c_word, json_data['naming_rule'], part, cn_sub, json_data, filepath, config.failed_folder, tag) # 打印文件 .nfo
|
||
# pasteFileToFolder(filepath, path, number, c_word) # 移动文件
|
||
# # =======================================================================整理模式
|
||
# elif config.program_mode == '2':
|
||
# pasteFileToFolder_mode2(filepath, path, multi_part, number, part, c_word) # 移动文件
|
||
|
||
# CEF(config.success_folder)
|
||
# CEF(config.failed_folder)
|
||
print("[+]All finished!!!")
|
||
input("[+][+]Press enter key exit, you can check the error message before you exit.")
|