2025-07-03 17:50:42 +08:00
|
|
|
|
import os
|
|
|
|
|
import math
|
|
|
|
|
from PIL import Image
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
2025-07-29 18:01:15 +08:00
|
|
|
|
from tools.dataproccess import get_resource_path
|
2025-07-03 17:50:42 +08:00
|
|
|
|
|
|
|
|
|
def resize_and_reduce_quality(image_path, output_path, target_width = None):
|
|
|
|
|
try:
|
|
|
|
|
# 检查图片文件大小
|
|
|
|
|
if os.path.getsize(image_path) < 10 * 1024 * 1024: # 10MB
|
|
|
|
|
print("图片文件大小小于10MB,不进行调整")
|
|
|
|
|
return image_path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 打开图片
|
|
|
|
|
with Image.open(image_path) as img:
|
|
|
|
|
# 计算新的高度以保持宽高比
|
|
|
|
|
if target_width is None:
|
|
|
|
|
target_width = img.width
|
|
|
|
|
aspect_ratio = img.height / img.width
|
|
|
|
|
new_height = int(target_width * aspect_ratio)
|
|
|
|
|
|
|
|
|
|
# 调整图片大小
|
|
|
|
|
img_resized = img.resize((target_width, new_height), Image.LANCZOS)
|
|
|
|
|
|
|
|
|
|
# 降低图片质量
|
|
|
|
|
quality = 70 # 质量从1(最差)到95(最好),可以根据需要调整
|
|
|
|
|
img_resized.save(output_path, quality=quality)
|
|
|
|
|
|
|
|
|
|
return output_path
|
|
|
|
|
except Exception as e:
|
|
|
|
|
return f"调整图片大小和质量时出现问题: {str(e)}"
|
|
|
|
|
|
|
|
|
|
def get_picture_nums(source_path: str) -> int:
|
|
|
|
|
picture_count = 0
|
|
|
|
|
for root, dirs, files in os.walk(source_path):
|
|
|
|
|
for file in files:
|
|
|
|
|
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
|
|
|
|
|
picture_count += 1
|
|
|
|
|
return picture_count
|
|
|
|
|
|
|
|
|
|
def collect_defect_data(
|
|
|
|
|
Y: str,
|
|
|
|
|
picture_dir: str,
|
|
|
|
|
search_file_list: list = [],
|
|
|
|
|
) -> tuple[int, dict]:
|
|
|
|
|
"""
|
|
|
|
|
收集指定年份的缺陷图片数据,并根据布尔值决定是否扫描特定类型的缺陷图。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
Y: 叶片号,如 "Y1"、"Y2"、"Y3"
|
|
|
|
|
picture_dir: 图片根目录
|
|
|
|
|
search_file_list (list, optional): 要搜索的文件列表.规定为3个元素
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
(缺陷图片总数, 缺陷图片文件名字典)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
total_num = 0
|
|
|
|
|
result_dict = {}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
for defect_type in search_file_list:
|
|
|
|
|
dir_path = os.path.join(picture_dir, Y, defect_type)
|
|
|
|
|
num, img_dict = get_picture_nums_and_image_with_name(dir_path)
|
|
|
|
|
total_num += num
|
|
|
|
|
result_dict.update(img_dict)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"获取图片数据时出现问题: {str(e)},搜寻的目录:{dir_path}")
|
|
|
|
|
|
|
|
|
|
return total_num, result_dict
|
|
|
|
|
|
|
|
|
|
def get_picture_nums_and_image_with_name(source_path: str) -> tuple[int, dict]:
|
|
|
|
|
"""
|
|
|
|
|
获取指定目录下图片的数量,并返回每个图片的路径和名称(字典)
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
source_path (str): 要搜索的目录路径
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
tuple: 包含两个元素的元组
|
|
|
|
|
picture_count (int): 图片数量
|
|
|
|
|
image_with_name (dict): 图片路径和名称的字典,格式为 {图片名称: 图片完整路径}
|
|
|
|
|
"""
|
|
|
|
|
picture_count = 0
|
|
|
|
|
image_with_name = {}
|
|
|
|
|
name_list = []
|
|
|
|
|
|
|
|
|
|
for root, dirs, files in os.walk(source_path):
|
|
|
|
|
for file in files:
|
|
|
|
|
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
|
|
|
|
|
picture_count += 1
|
|
|
|
|
image_with_name[os.path.splitext(file)[0]] = os.path.join(root, file)
|
|
|
|
|
|
|
|
|
|
return picture_count, image_with_name
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_image(directory, image_name):
|
|
|
|
|
"""
|
|
|
|
|
在指定目录中查找指定名称的图片文件
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
directory (str): 要搜索的目录路径
|
|
|
|
|
image_name (str): 要查找的图片文件名(可带扩展名或不带)
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
str: 找到的图片完整路径,如果未找到则返回None
|
|
|
|
|
"""
|
|
|
|
|
# 支持的图片扩展名列表
|
|
|
|
|
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']
|
|
|
|
|
|
|
|
|
|
# 遍历目录中的所有文件
|
|
|
|
|
for root, dirs, files in os.walk(directory):
|
|
|
|
|
for file in files:
|
|
|
|
|
# 获取文件名和扩展名
|
|
|
|
|
filename, ext = os.path.splitext(file)
|
|
|
|
|
|
|
|
|
|
# 检查是否匹配图片名称(带或不带扩展名)
|
|
|
|
|
if (file.lower() == image_name.lower() or
|
|
|
|
|
filename.lower() == image_name.lower() and ext.lower() in image_extensions):
|
|
|
|
|
return os.path.join(root, file)
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
async def make_Thumbnail(source_path: str, output_path: str, size: tuple = (436, 233)) -> str:
|
|
|
|
|
"""获取目录下所有图片,将所有图片合并制作略缩图并保存
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
source_path: 源目录
|
|
|
|
|
output_path: 输出目录
|
|
|
|
|
size: 合并后的略缩图总大小 (宽度, 高度)
|
|
|
|
|
"""
|
|
|
|
|
print("略缩图处理中")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
if not os.path.exists(output_path):
|
|
|
|
|
print(f"无输出目录,创建中,输出目录为:{output_path}")
|
|
|
|
|
os.makedirs(output_path)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"输出目录有问题:{e}")
|
|
|
|
|
return ""
|
|
|
|
|
#如果存在merged_thumbnail.jpg文件,则直接返回该文件路径
|
|
|
|
|
if os.path.exists(os.path.join(output_path,'merged_thumbnail.jpg')):
|
|
|
|
|
print(f"已有略缩图,不用处理, 目前如需重新生成,请去往{output_path}目录 删除 merged_thumbnail.jpg 图片")
|
|
|
|
|
"""
|
|
|
|
|
此处可预留接口,询问用户是否重新生成一份略缩图
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
return os.path.join(output_path,'merged_thumbnail.jpg')
|
|
|
|
|
print("目录中无略缩图,合并略缩图中")
|
|
|
|
|
# 获取源目录下所有的图片文件
|
|
|
|
|
try:
|
|
|
|
|
image_files = []
|
|
|
|
|
for root, dirs, files in os.walk(source_path):
|
|
|
|
|
for file in files:
|
|
|
|
|
if file.lower().endswith(('.jpg', '.jpeg', '.png')):
|
|
|
|
|
image_files.append(os.path.join(root, file))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"递归获取图片失败,原因:{e}")
|
|
|
|
|
|
|
|
|
|
if not image_files:
|
|
|
|
|
print("源目录中没有找到图片文件")
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
# 计算每个缩略图的大小
|
|
|
|
|
num_images = len(image_files)
|
|
|
|
|
target_width, target_height = size
|
|
|
|
|
|
|
|
|
|
# 计算最佳的缩略图排列方式
|
|
|
|
|
# 先尝试计算每行可以放多少个缩略图
|
|
|
|
|
aspect_ratio = target_width / target_height
|
|
|
|
|
cols = math.ceil(math.sqrt(num_images * aspect_ratio))
|
|
|
|
|
rows = math.ceil(num_images / cols)
|
|
|
|
|
|
|
|
|
|
# 计算单个缩略图的大小
|
|
|
|
|
thumb_width = target_width // cols
|
|
|
|
|
thumb_height = target_height // rows
|
|
|
|
|
|
|
|
|
|
# 创建线程池处理图片
|
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
|
|
|
thumbnails = list(executor.map(
|
|
|
|
|
lambda file: create_thumbnail(file, (thumb_width, thumb_height)),
|
|
|
|
|
image_files
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
# 过滤掉 None 值
|
|
|
|
|
thumbnails = [thumb for thumb in thumbnails if thumb is not None]
|
|
|
|
|
|
|
|
|
|
if not thumbnails:
|
|
|
|
|
print("没有成功创建任何略缩图")
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
# 计算实际需要的行数和列数
|
|
|
|
|
actual_cols = min(len(thumbnails), cols)
|
|
|
|
|
actual_rows = math.ceil(len(thumbnails) / actual_cols)
|
|
|
|
|
|
|
|
|
|
# 创建合并后的图像
|
|
|
|
|
merged_image = Image.new('RGB', (actual_cols * thumb_width, actual_rows * thumb_height))
|
|
|
|
|
|
|
|
|
|
# 粘贴缩略图
|
|
|
|
|
for index, thumb in enumerate(thumbnails):
|
|
|
|
|
row = index // actual_cols
|
|
|
|
|
col = index % actual_cols
|
|
|
|
|
merged_image.paste(thumb, (col * thumb_width, row * thumb_height))
|
|
|
|
|
|
|
|
|
|
# 如果最终尺寸不完全匹配,调整大小
|
|
|
|
|
if merged_image.size != size:
|
|
|
|
|
merged_image = merged_image.resize(size, Image.LANCZOS)
|
|
|
|
|
|
|
|
|
|
# 保存合并后的略缩图
|
|
|
|
|
merged_thumbnail_path = os.path.join(output_path, 'merged_thumbnail.jpg')
|
|
|
|
|
merged_image.save(merged_thumbnail_path)
|
|
|
|
|
|
|
|
|
|
print(f"合并后的略缩图已保存到:{merged_thumbnail_path}")
|
|
|
|
|
return merged_thumbnail_path
|
|
|
|
|
|
|
|
|
|
def create_thumbnail(file_path: str, size: tuple) -> Image:
|
|
|
|
|
"""创建单个图片的略缩图
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
file_path: 图片文件路径
|
|
|
|
|
size: 缩略图大小
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
with Image.open(file_path) as img:
|
|
|
|
|
# 保持原始宽高比
|
|
|
|
|
img.thumbnail(size, Image.LANCZOS)
|
|
|
|
|
|
|
|
|
|
# 创建新图像确保尺寸一致
|
|
|
|
|
new_img = Image.new('RGB', size)
|
|
|
|
|
new_img.paste(img, ((size[0] - img.width) // 2, (size[1] - img.height) // 2))
|
|
|
|
|
return new_img
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"图片处理有问题:{e}")
|
2025-07-08 10:41:14 +08:00
|
|
|
|
return None
|
|
|
|
|
|
2025-07-24 18:00:03 +08:00
|
|
|
|
def process_picture_data(picture_data : dict[list[dict]],
|
|
|
|
|
image_source_to_find : list[str]
|
|
|
|
|
) -> tuple[dict[dict[list[dict]]],int]:
|
2025-07-08 10:41:14 +08:00
|
|
|
|
"""处理从数据库获取的图片数据
|
|
|
|
|
|
|
|
|
|
Args:
|
2025-07-24 18:00:03 +08:00
|
|
|
|
picture_data (dict[list[dict]]: 图片数据(按部件分类,返回的列表也要按部件分类)
|
2025-07-08 10:41:14 +08:00
|
|
|
|
image_source_to_find (list[str]): 要查找的图片来源枚举(外内防雷)
|
|
|
|
|
Returns:
|
|
|
|
|
tuple(
|
2025-07-24 18:00:03 +08:00
|
|
|
|
filtered_picture_data, 按部件、缺陷类型、典型类型、其他类型层层分类的图片数据
|
2025-07-09 17:19:59 +08:00
|
|
|
|
total_num 总图片数量
|
2025-07-08 10:41:14 +08:00
|
|
|
|
)
|
|
|
|
|
"""
|
2025-07-22 16:39:17 +08:00
|
|
|
|
# 过滤目标来源的图片数据
|
2025-07-24 18:00:03 +08:00
|
|
|
|
filtered_picture_data = {}
|
|
|
|
|
total_num = 0
|
|
|
|
|
for partName, values in picture_data.items():
|
|
|
|
|
total_num = total_num + len(values)
|
|
|
|
|
for pic in values:
|
|
|
|
|
if pic['imageSource'] in image_source_to_find:
|
|
|
|
|
if partName not in filtered_picture_data:
|
|
|
|
|
filtered_picture_data[partName] = {}
|
|
|
|
|
if pic['imageType'] not in filtered_picture_data[partName]:
|
|
|
|
|
filtered_picture_data[partName][pic['imageType']] = []
|
|
|
|
|
filtered_picture_data[partName][pic['imageType']].append(pic)
|
|
|
|
|
|
|
|
|
|
return filtered_picture_data, total_num
|
2025-07-22 16:39:17 +08:00
|
|
|
|
|
2025-07-24 18:00:03 +08:00
|
|
|
|
def print_data(filtered_picture_data : dict[dict[list[dict]]]) -> None:
|
|
|
|
|
"""打印图片数据"""
|
|
|
|
|
for partName, types in filtered_picture_data.items():
|
|
|
|
|
print(f"Part Name: {partName}")
|
|
|
|
|
for imageType, pictures in types.items():
|
|
|
|
|
print(f" Image Type: {imageType}, Number of Pictures: {len(pictures)}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_records_with_pic(records: list[dict], filtered_picture_data: dict[str, dict[str, list[dict]]], defect_enum: str) -> tuple[dict[str, dict[str, list[dict]]], dict[str, list[dict]]]:
|
2025-07-22 16:39:17 +08:00
|
|
|
|
"""获取指定图片ID的记录
|
|
|
|
|
|
|
|
|
|
Args:
|
2025-07-24 18:00:03 +08:00
|
|
|
|
records (list[dict]): 记录列表,每条记录包含部件名、缺陷类型等信息
|
|
|
|
|
filtered_picture_data (dict[str, dict[str, list[dict]]]): 图片数据,最外层的字典的键是部件名,内层的字典的键是缺陷类型,值是图片信息列表
|
|
|
|
|
defect_enum (str): 指定的缺陷类型
|
|
|
|
|
|
2025-07-22 16:39:17 +08:00
|
|
|
|
Returns:
|
|
|
|
|
tuple(
|
2025-07-24 18:00:03 +08:00
|
|
|
|
records_with_defect_pic: 按部件名分类的缺陷记录,映射到图片的imagePath
|
|
|
|
|
defect_pic_with_no_records: 对应缺陷类型的图片没有记录的部件名映射
|
|
|
|
|
)
|
2025-07-22 16:39:17 +08:00
|
|
|
|
"""
|
2025-07-24 18:00:03 +08:00
|
|
|
|
records_with_defect_pic = {}
|
|
|
|
|
defect_pic_with_no_records = {}
|
|
|
|
|
|
|
|
|
|
# 收集所有filtered_picture_data中的imageId
|
|
|
|
|
all_image_ids = set()
|
|
|
|
|
for partName, types in filtered_picture_data.items():
|
|
|
|
|
defect_pictures = types.get(defect_enum, [])
|
|
|
|
|
for pic in defect_pictures:
|
|
|
|
|
all_image_ids.add(pic.get('imageId'))
|
|
|
|
|
|
|
|
|
|
# 过滤掉records中imageId不在all_image_ids中的记录
|
|
|
|
|
filtered_records = [record for record in records if record.get('imageId') in all_image_ids]
|
|
|
|
|
|
|
|
|
|
# 遍历每个部件
|
|
|
|
|
for partName, types in filtered_picture_data.items():
|
|
|
|
|
# 获取指定缺陷类型的图片列表
|
|
|
|
|
defect_pictures = types.get(defect_enum, [])
|
|
|
|
|
if defect_pictures:
|
|
|
|
|
# 初始化部件名的字典
|
|
|
|
|
if partName not in records_with_defect_pic:
|
|
|
|
|
records_with_defect_pic[partName] = {}
|
|
|
|
|
|
|
|
|
|
# 初始化缺陷类型的字典
|
|
|
|
|
records_with_defect_pic[partName][defect_enum] = []
|
|
|
|
|
|
|
|
|
|
# 遍历缺陷图片列表,找到对应的记录并添加到字典中
|
|
|
|
|
for pic in defect_pictures:
|
|
|
|
|
pic_id = pic.get('imageId')
|
|
|
|
|
matched_records = [record for record in filtered_records if record.get('imageId') == pic_id]
|
|
|
|
|
|
|
|
|
|
# 如果有匹配的记录,则添加到字典中
|
|
|
|
|
if matched_records:
|
|
|
|
|
for record in matched_records:
|
|
|
|
|
records_with_defect_pic[partName][defect_enum].append({
|
|
|
|
|
'record': record,
|
|
|
|
|
'imagePath': pic.get('imagePath')
|
|
|
|
|
})
|
|
|
|
|
else:
|
|
|
|
|
# 如果没有匹配的记录,则将图片信息添加到没有记录的字典中
|
|
|
|
|
if partName not in defect_pic_with_no_records:
|
|
|
|
|
defect_pic_with_no_records[partName] = []
|
|
|
|
|
defect_pic_with_no_records[partName].append({
|
|
|
|
|
'defect_enum': defect_enum,
|
|
|
|
|
'imagePath': pic.get('imagePath')
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return records_with_defect_pic, defect_pic_with_no_records
|
|
|
|
|
|
2025-07-29 18:01:15 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_template_pic(pic_name : str):
|
|
|
|
|
"""获取模板图片路径"""
|
|
|
|
|
return get_resource_path(pic_name)
|
|
|
|
|
|