Report_Generate_Server/tools/get_pictures.py

257 lines
9.2 KiB
Python
Raw Normal View History

import os
import math
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
def resize_and_reduce_quality(image_path, output_path, target_width = None):
try:
# 检查图片文件大小
if os.path.getsize(image_path) < 10 * 1024 * 1024: # 10MB
print("图片文件大小小于10MB不进行调整")
return image_path
# 打开图片
with Image.open(image_path) as img:
# 计算新的高度以保持宽高比
if target_width is None:
target_width = img.width
aspect_ratio = img.height / img.width
new_height = int(target_width * aspect_ratio)
# 调整图片大小
img_resized = img.resize((target_width, new_height), Image.LANCZOS)
# 降低图片质量
quality = 70 # 质量从1最差到95最好可以根据需要调整
img_resized.save(output_path, quality=quality)
return output_path
except Exception as e:
return f"调整图片大小和质量时出现问题: {str(e)}"
def get_picture_nums(source_path: str) -> int:
picture_count = 0
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
picture_count += 1
return picture_count
def collect_defect_data(
Y: str,
picture_dir: str,
search_file_list: list = [],
) -> tuple[int, dict]:
"""
收集指定年份的缺陷图片数据并根据布尔值决定是否扫描特定类型的缺陷图
Args:
Y: 叶片号 "Y1""Y2""Y3"
picture_dir: 图片根目录
search_file_list (list, optional): 要搜索的文件列表.规定为3个元素
Returns:
(缺陷图片总数, 缺陷图片文件名字典)
"""
total_num = 0
result_dict = {}
try:
for defect_type in search_file_list:
dir_path = os.path.join(picture_dir, Y, defect_type)
num, img_dict = get_picture_nums_and_image_with_name(dir_path)
total_num += num
result_dict.update(img_dict)
except Exception as e:
print(f"获取图片数据时出现问题: {str(e)},搜寻的目录:{dir_path}")
return total_num, result_dict
def get_picture_nums_and_image_with_name(source_path: str) -> tuple[int, dict]:
"""
获取指定目录下图片的数量并返回每个图片的路径和名称字典
Args:
source_path (str): 要搜索的目录路径
Returns:
tuple: 包含两个元素的元组
picture_count (int): 图片数量
image_with_name (dict): 图片路径和名称的字典格式为 {图片名称: 图片完整路径}
"""
picture_count = 0
image_with_name = {}
name_list = []
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'):
picture_count += 1
image_with_name[os.path.splitext(file)[0]] = os.path.join(root, file)
return picture_count, image_with_name
def find_image(directory, image_name):
"""
在指定目录中查找指定名称的图片文件
参数:
directory (str): 要搜索的目录路径
image_name (str): 要查找的图片文件名(可带扩展名或不带)
返回:
str: 找到的图片完整路径如果未找到则返回None
"""
# 支持的图片扩展名列表
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']
# 遍历目录中的所有文件
for root, dirs, files in os.walk(directory):
for file in files:
# 获取文件名和扩展名
filename, ext = os.path.splitext(file)
# 检查是否匹配图片名称(带或不带扩展名)
if (file.lower() == image_name.lower() or
filename.lower() == image_name.lower() and ext.lower() in image_extensions):
return os.path.join(root, file)
return None
async def make_Thumbnail(source_path: str, output_path: str, size: tuple = (436, 233)) -> str:
"""获取目录下所有图片,将所有图片合并制作略缩图并保存
Args:
source_path: 源目录
output_path: 输出目录
size: 合并后的略缩图总大小 (宽度, 高度)
"""
print("略缩图处理中")
try:
if not os.path.exists(output_path):
print(f"无输出目录,创建中,输出目录为:{output_path}")
os.makedirs(output_path)
except Exception as e:
print(f"输出目录有问题:{e}")
return ""
#如果存在merged_thumbnail.jpg文件则直接返回该文件路径
if os.path.exists(os.path.join(output_path,'merged_thumbnail.jpg')):
print(f"已有略缩图,不用处理, 目前如需重新生成,请去往{output_path}目录 删除 merged_thumbnail.jpg 图片")
"""
此处可预留接口询问用户是否重新生成一份略缩图
"""
return os.path.join(output_path,'merged_thumbnail.jpg')
print("目录中无略缩图,合并略缩图中")
# 获取源目录下所有的图片文件
try:
image_files = []
for root, dirs, files in os.walk(source_path):
for file in files:
if file.lower().endswith(('.jpg', '.jpeg', '.png')):
image_files.append(os.path.join(root, file))
except Exception as e:
print(f"递归获取图片失败,原因:{e}")
if not image_files:
print("源目录中没有找到图片文件")
return ""
# 计算每个缩略图的大小
num_images = len(image_files)
target_width, target_height = size
# 计算最佳的缩略图排列方式
# 先尝试计算每行可以放多少个缩略图
aspect_ratio = target_width / target_height
cols = math.ceil(math.sqrt(num_images * aspect_ratio))
rows = math.ceil(num_images / cols)
# 计算单个缩略图的大小
thumb_width = target_width // cols
thumb_height = target_height // rows
# 创建线程池处理图片
with ThreadPoolExecutor() as executor:
thumbnails = list(executor.map(
lambda file: create_thumbnail(file, (thumb_width, thumb_height)),
image_files
))
# 过滤掉 None 值
thumbnails = [thumb for thumb in thumbnails if thumb is not None]
if not thumbnails:
print("没有成功创建任何略缩图")
return ""
# 计算实际需要的行数和列数
actual_cols = min(len(thumbnails), cols)
actual_rows = math.ceil(len(thumbnails) / actual_cols)
# 创建合并后的图像
merged_image = Image.new('RGB', (actual_cols * thumb_width, actual_rows * thumb_height))
# 粘贴缩略图
for index, thumb in enumerate(thumbnails):
row = index // actual_cols
col = index % actual_cols
merged_image.paste(thumb, (col * thumb_width, row * thumb_height))
# 如果最终尺寸不完全匹配,调整大小
if merged_image.size != size:
merged_image = merged_image.resize(size, Image.LANCZOS)
# 保存合并后的略缩图
merged_thumbnail_path = os.path.join(output_path, 'merged_thumbnail.jpg')
merged_image.save(merged_thumbnail_path)
print(f"合并后的略缩图已保存到:{merged_thumbnail_path}")
return merged_thumbnail_path
def create_thumbnail(file_path: str, size: tuple) -> Image:
"""创建单个图片的略缩图
Args:
file_path: 图片文件路径
size: 缩略图大小
"""
try:
with Image.open(file_path) as img:
# 保持原始宽高比
img.thumbnail(size, Image.LANCZOS)
# 创建新图像确保尺寸一致
new_img = Image.new('RGB', size)
new_img.paste(img, ((size[0] - img.width) // 2, (size[1] - img.height) // 2))
return new_img
except Exception as e:
print(f"图片处理有问题:{e}")
2025-07-08 10:41:14 +08:00
return None
def process_picture_data(picture_data : list[dict],
image_source_to_find : list[str],
quexian_type : str,
dianxing_type : str) -> tuple[list[str], list[str]]:
"""处理从数据库获取的图片数据
Args:
picture_data (list[dict]): 图片数据
image_source_to_find (list[str]): 要查找的图片来源枚举(外内防雷)
quexian_type (str): 缺陷类型枚举值
dianxing_type (str): 典型类型枚举值
Returns:
tuple(
defct_pictures, 缺陷图列表
dianxing_pictures 典型图列表
2025-07-08 10:41:14 +08:00
)
"""
#过滤目标来源的图片数据
picture_data = [pic for pic in picture_data if pic['imageSource'] in image_source_to_find]
#分别择出缺陷图和典型图
return [pic for pic in picture_data if pic['imageType'] == quexian_type], [pic for pic in picture_data if pic['imageType'] == dianxing_type]