Report_Generate_Server/tools/get_pictures.py

341 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import math
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
from tools.dataproccess import get_resource_path
def resize_and_reduce_quality(image_path, output_path, target_width = None):
try:
# 检查图片文件大小
if os.path.getsize(image_path) < 10 * 1024 * 1024: # 10MB
print("图片文件大小小于10MB不进行调整")
return image_path
# 打开图片
with Image.open(image_path) as img:
# 计算新的高度以保持宽高比
if target_width is None:
target_width = img.width
aspect_ratio = img.height / img.width
new_height = int(target_width * aspect_ratio)
# 调整图片大小
img_resized = img.resize((target_width, new_height), Image.LANCZOS)
# 降低图片质量
quality = 70 # 质量从1最差到95最好可以根据需要调整
img_resized.save(output_path, quality=quality)
return output_path
except Exception as e:
return f"调整图片大小和质量时出现问题: {str(e)}"
def get_picture_nums(source_path: str) -> int:
picture_count = 0
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
picture_count += 1
return picture_count
def collect_defect_data(
Y: str,
picture_dir: str,
search_file_list: list = [],
) -> tuple[int, dict]:
"""
收集指定年份的缺陷图片数据,并根据布尔值决定是否扫描特定类型的缺陷图。
Args:
Y: 叶片号,如 "Y1""Y2""Y3"
picture_dir: 图片根目录
search_file_list (list, optional): 要搜索的文件列表.规定为3个元素
Returns:
(缺陷图片总数, 缺陷图片文件名字典)
"""
total_num = 0
result_dict = {}
try:
for defect_type in search_file_list:
dir_path = os.path.join(picture_dir, Y, defect_type)
num, img_dict = get_picture_nums_and_image_with_name(dir_path)
total_num += num
result_dict.update(img_dict)
except Exception as e:
print(f"获取图片数据时出现问题: {str(e)},搜寻的目录:{dir_path}")
return total_num, result_dict
def get_picture_nums_and_image_with_name(source_path: str) -> tuple[int, dict]:
"""
获取指定目录下图片的数量,并返回每个图片的路径和名称(字典)
Args:
source_path (str): 要搜索的目录路径
Returns:
tuple: 包含两个元素的元组
picture_count (int): 图片数量
image_with_name (dict): 图片路径和名称的字典,格式为 {图片名称: 图片完整路径}
"""
picture_count = 0
image_with_name = {}
name_list = []
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
picture_count += 1
image_with_name[os.path.splitext(file)[0]] = os.path.join(root, file)
return picture_count, image_with_name
def find_image(directory, image_name):
"""
在指定目录中查找指定名称的图片文件
参数:
directory (str): 要搜索的目录路径
image_name (str): 要查找的图片文件名(可带扩展名或不带)
返回:
str: 找到的图片完整路径如果未找到则返回None
"""
# 支持的图片扩展名列表
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']
# 遍历目录中的所有文件
for root, dirs, files in os.walk(directory):
for file in files:
# 获取文件名和扩展名
filename, ext = os.path.splitext(file)
# 检查是否匹配图片名称(带或不带扩展名)
if (file.lower() == image_name.lower() or
filename.lower() == image_name.lower() and ext.lower() in image_extensions):
return os.path.join(root, file)
return None
async def make_Thumbnail(source_path: str, output_path: str, size: tuple = (436, 233)) -> str:
"""获取目录下所有图片,将所有图片合并制作略缩图并保存
Args:
source_path: 源目录
output_path: 输出目录
size: 合并后的略缩图总大小 (宽度, 高度)
"""
print("略缩图处理中")
try:
if not os.path.exists(output_path):
print(f"无输出目录,创建中,输出目录为:{output_path}")
os.makedirs(output_path)
except Exception as e:
print(f"输出目录有问题:{e}")
return ""
#如果存在merged_thumbnail.jpg文件则直接返回该文件路径
if os.path.exists(os.path.join(output_path,'merged_thumbnail.jpg')):
print(f"已有略缩图,不用处理, 目前如需重新生成,请去往{output_path}目录 删除 merged_thumbnail.jpg 图片")
"""
此处可预留接口,询问用户是否重新生成一份略缩图
"""
return os.path.join(output_path,'merged_thumbnail.jpg')
print("目录中无略缩图,合并略缩图中")
# 获取源目录下所有的图片文件
try:
image_files = []
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')):
image_files.append(os.path.join(root, file))
except Exception as e:
print(f"递归获取图片失败,原因:{e}")
if not image_files:
print("源目录中没有找到图片文件")
return ""
# 计算每个缩略图的大小
num_images = len(image_files)
target_width, target_height = size
# 计算最佳的缩略图排列方式
# 先尝试计算每行可以放多少个缩略图
aspect_ratio = target_width / target_height
cols = math.ceil(math.sqrt(num_images * aspect_ratio))
rows = math.ceil(num_images / cols)
# 计算单个缩略图的大小
thumb_width = target_width // cols
thumb_height = target_height // rows
# 创建线程池处理图片
with ThreadPoolExecutor() as executor:
thumbnails = list(executor.map(
lambda file: create_thumbnail(file, (thumb_width, thumb_height)),
image_files
))
# 过滤掉 None 值
thumbnails = [thumb for thumb in thumbnails if thumb is not None]
if not thumbnails:
print("没有成功创建任何略缩图")
return ""
# 计算实际需要的行数和列数
actual_cols = min(len(thumbnails), cols)
actual_rows = math.ceil(len(thumbnails) / actual_cols)
# 创建合并后的图像
merged_image = Image.new('RGB', (actual_cols * thumb_width, actual_rows * thumb_height))
# 粘贴缩略图
for index, thumb in enumerate(thumbnails):
row = index // actual_cols
col = index % actual_cols
merged_image.paste(thumb, (col * thumb_width, row * thumb_height))
# 如果最终尺寸不完全匹配,调整大小
if merged_image.size != size:
merged_image = merged_image.resize(size, Image.LANCZOS)
# 保存合并后的略缩图
merged_thumbnail_path = os.path.join(output_path, 'merged_thumbnail.jpg')
merged_image.save(merged_thumbnail_path)
print(f"合并后的略缩图已保存到:{merged_thumbnail_path}")
return merged_thumbnail_path
def create_thumbnail(file_path: str, size: tuple) -> Image:
"""创建单个图片的略缩图
Args:
file_path: 图片文件路径
size: 缩略图大小
"""
try:
with Image.open(file_path) as img:
# 保持原始宽高比
img.thumbnail(size, Image.LANCZOS)
# 创建新图像确保尺寸一致
new_img = Image.new('RGB', size)
new_img.paste(img, ((size[0] - img.width) // 2, (size[1] - img.height) // 2))
return new_img
except Exception as e:
print(f"图片处理有问题:{e}")
return None
def process_picture_data(picture_data : dict[list[dict]],
image_source_to_find : list[str]
) -> tuple[dict[dict[list[dict]]],int]:
"""处理从数据库获取的图片数据
Args:
picture_data (dict[list[dict]]: 图片数据(按部件分类,返回的列表也要按部件分类)
image_source_to_find (list[str]): 要查找的图片来源枚举(外内防雷)
Returns:
tuple(
filtered_picture_data, 按部件、缺陷类型、典型类型、其他类型层层分类的图片数据
total_num 总图片数量
)
"""
# 过滤目标来源的图片数据
filtered_picture_data = {}
total_num = 0
for partName, values in picture_data.items():
total_num = total_num + len(values)
for pic in values:
if pic['imageSource'] in image_source_to_find:
if partName not in filtered_picture_data:
filtered_picture_data[partName] = {}
if pic['imageType'] not in filtered_picture_data[partName]:
filtered_picture_data[partName][pic['imageType']] = []
filtered_picture_data[partName][pic['imageType']].append(pic)
return filtered_picture_data, total_num
def print_data(filtered_picture_data : dict[dict[list[dict]]]) -> None:
"""打印图片数据"""
for partName, types in filtered_picture_data.items():
print(f"Part Name: {partName}")
for imageType, pictures in types.items():
print(f" Image Type: {imageType}, Number of Pictures: {len(pictures)}")
def get_records_with_pic(records: list[dict], filtered_picture_data: dict[str, dict[str, list[dict]]], defect_enum: str) -> tuple[dict[str, dict[str, list[dict]]], dict[str, list[dict]]]:
"""获取指定图片ID的记录
Args:
records (list[dict]): 记录列表,每条记录包含部件名、缺陷类型等信息
filtered_picture_data (dict[str, dict[str, list[dict]]]): 图片数据,最外层的字典的键是部件名,内层的字典的键是缺陷类型,值是图片信息列表
defect_enum (str): 指定的缺陷类型
Returns:
tuple(
records_with_defect_pic: 按部件名分类的缺陷记录映射到图片的imagePath
defect_pic_with_no_records: 对应缺陷类型的图片没有记录的部件名映射
)
"""
records_with_defect_pic = {}
defect_pic_with_no_records = {}
# 收集所有filtered_picture_data中的imageId
all_image_ids = set()
for partName, types in filtered_picture_data.items():
defect_pictures = types.get(defect_enum, [])
for pic in defect_pictures:
all_image_ids.add(pic.get('imageId'))
# 过滤掉records中imageId不在all_image_ids中的记录
filtered_records = [record for record in records if record.get('imageId') in all_image_ids]
# 遍历每个部件
for partName, types in filtered_picture_data.items():
# 获取指定缺陷类型的图片列表
defect_pictures = types.get(defect_enum, [])
if defect_pictures:
# 初始化部件名的字典
if partName not in records_with_defect_pic:
records_with_defect_pic[partName] = {}
# 初始化缺陷类型的字典
records_with_defect_pic[partName][defect_enum] = []
# 遍历缺陷图片列表,找到对应的记录并添加到字典中
for pic in defect_pictures:
pic_id = pic.get('imageId')
matched_records = [record for record in filtered_records if record.get('imageId') == pic_id]
# 如果有匹配的记录,则添加到字典中
if matched_records:
for record in matched_records:
records_with_defect_pic[partName][defect_enum].append({
'record': record,
'imagePath': pic.get('imagePath')
})
else:
# 如果没有匹配的记录,则将图片信息添加到没有记录的字典中
if partName not in defect_pic_with_no_records:
defect_pic_with_no_records[partName] = []
defect_pic_with_no_records[partName].append({
'defect_enum': defect_enum,
'imagePath': pic.get('imagePath')
})
return records_with_defect_pic, defect_pic_with_no_records
def get_template_pic(pic_name : str):
"""获取模板图片路径"""
return get_resource_path(pic_name)