import os import math from PIL import Image from concurrent.futures import ThreadPoolExecutor from tools.dataproccess import get_resource_path def resize_and_reduce_quality(image_path, output_path, target_width = None): try: # 检查图片文件大小 if os.path.getsize(image_path) < 10 * 1024 * 1024: # 10MB print("图片文件大小小于10MB,不进行调整") return image_path # 打开图片 with Image.open(image_path) as img: # 计算新的高度以保持宽高比 if target_width is None: target_width = img.width aspect_ratio = img.height / img.width new_height = int(target_width * aspect_ratio) # 调整图片大小 img_resized = img.resize((target_width, new_height), Image.LANCZOS) # 降低图片质量 quality = 70 # 质量从1(最差)到95(最好),可以根据需要调整 img_resized.save(output_path, quality=quality) return output_path except Exception as e: return f"调整图片大小和质量时出现问题: {str(e)}" def get_picture_nums(source_path: str) -> int: picture_count = 0 for root, dirs, files in os.walk(source_path): for file in files: if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'): picture_count += 1 return picture_count def collect_defect_data( Y: str, picture_dir: str, search_file_list: list = [], ) -> tuple[int, dict]: """ 收集指定年份的缺陷图片数据,并根据布尔值决定是否扫描特定类型的缺陷图。 Args: Y: 叶片号,如 "Y1"、"Y2"、"Y3" picture_dir: 图片根目录 search_file_list (list, optional): 要搜索的文件列表.规定为3个元素 Returns: (缺陷图片总数, 缺陷图片文件名字典) """ total_num = 0 result_dict = {} try: for defect_type in search_file_list: dir_path = os.path.join(picture_dir, Y, defect_type) num, img_dict = get_picture_nums_and_image_with_name(dir_path) total_num += num result_dict.update(img_dict) except Exception as e: print(f"获取图片数据时出现问题: {str(e)},搜寻的目录:{dir_path}") return total_num, result_dict def get_picture_nums_and_image_with_name(source_path: str) -> tuple[int, dict]: """ 获取指定目录下图片的数量,并返回每个图片的路径和名称(字典) Args: source_path (str): 要搜索的目录路径 Returns: tuple: 包含两个元素的元组 picture_count (int): 图片数量 image_with_name (dict): 图片路径和名称的字典,格式为 {图片名称: 图片完整路径} """ picture_count = 0 image_with_name = {} name_list = [] for root, dirs, files in os.walk(source_path): for file in files: if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'): picture_count += 1 image_with_name[os.path.splitext(file)[0]] = os.path.join(root, file) return picture_count, image_with_name def find_image(directory, image_name): """ 在指定目录中查找指定名称的图片文件 参数: directory (str): 要搜索的目录路径 image_name (str): 要查找的图片文件名(可带扩展名或不带) 返回: str: 找到的图片完整路径,如果未找到则返回None """ # 支持的图片扩展名列表 image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'] # 遍历目录中的所有文件 for root, dirs, files in os.walk(directory): for file in files: # 获取文件名和扩展名 filename, ext = os.path.splitext(file) # 检查是否匹配图片名称(带或不带扩展名) if (file.lower() == image_name.lower() or filename.lower() == image_name.lower() and ext.lower() in image_extensions): return os.path.join(root, file) return None async def make_Thumbnail(source_path: str, output_path: str, size: tuple = (436, 233)) -> str: """获取目录下所有图片,将所有图片合并制作略缩图并保存 Args: source_path: 源目录 output_path: 输出目录 size: 合并后的略缩图总大小 (宽度, 高度) """ print("略缩图处理中") try: if not os.path.exists(output_path): print(f"无输出目录,创建中,输出目录为:{output_path}") os.makedirs(output_path) except Exception as e: print(f"输出目录有问题:{e}") return "" #如果存在merged_thumbnail.jpg文件,则直接返回该文件路径 if os.path.exists(os.path.join(output_path,'merged_thumbnail.jpg')): print(f"已有略缩图,不用处理, 目前如需重新生成,请去往{output_path}目录 删除 merged_thumbnail.jpg 图片") """ 此处可预留接口,询问用户是否重新生成一份略缩图 """ return os.path.join(output_path,'merged_thumbnail.jpg') print("目录中无略缩图,合并略缩图中") # 获取源目录下所有的图片文件 try: image_files = [] for root, dirs, files in os.walk(source_path): for file in files: if file.lower().endswith(('.jpg', '.jpeg', '.png')): image_files.append(os.path.join(root, file)) except Exception as e: print(f"递归获取图片失败,原因:{e}") if not image_files: print("源目录中没有找到图片文件") return "" # 计算每个缩略图的大小 num_images = len(image_files) target_width, target_height = size # 计算最佳的缩略图排列方式 # 先尝试计算每行可以放多少个缩略图 aspect_ratio = target_width / target_height cols = math.ceil(math.sqrt(num_images * aspect_ratio)) rows = math.ceil(num_images / cols) # 计算单个缩略图的大小 thumb_width = target_width // cols thumb_height = target_height // rows # 创建线程池处理图片 with ThreadPoolExecutor() as executor: thumbnails = list(executor.map( lambda file: create_thumbnail(file, (thumb_width, thumb_height)), image_files )) # 过滤掉 None 值 thumbnails = [thumb for thumb in thumbnails if thumb is not None] if not thumbnails: print("没有成功创建任何略缩图") return "" # 计算实际需要的行数和列数 actual_cols = min(len(thumbnails), cols) actual_rows = math.ceil(len(thumbnails) / actual_cols) # 创建合并后的图像 merged_image = Image.new('RGB', (actual_cols * thumb_width, actual_rows * thumb_height)) # 粘贴缩略图 for index, thumb in enumerate(thumbnails): row = index // actual_cols col = index % actual_cols merged_image.paste(thumb, (col * thumb_width, row * thumb_height)) # 如果最终尺寸不完全匹配,调整大小 if merged_image.size != size: merged_image = merged_image.resize(size, Image.LANCZOS) # 保存合并后的略缩图 merged_thumbnail_path = os.path.join(output_path, 'merged_thumbnail.jpg') merged_image.save(merged_thumbnail_path) print(f"合并后的略缩图已保存到:{merged_thumbnail_path}") return merged_thumbnail_path def create_thumbnail(file_path: str, size: tuple) -> Image: """创建单个图片的略缩图 Args: file_path: 图片文件路径 size: 缩略图大小 """ try: with Image.open(file_path) as img: # 保持原始宽高比 img.thumbnail(size, Image.LANCZOS) # 创建新图像确保尺寸一致 new_img = Image.new('RGB', size) new_img.paste(img, ((size[0] - img.width) // 2, (size[1] - img.height) // 2)) return new_img except Exception as e: print(f"图片处理有问题:{e}") return None def process_picture_data(picture_data : dict[list[dict]], image_source_to_find : list[str] ) -> tuple[dict[dict[list[dict]]],int]: """处理从数据库获取的图片数据 Args: picture_data (dict[list[dict]]: 图片数据(按部件分类,返回的列表也要按部件分类) image_source_to_find (list[str]): 要查找的图片来源枚举(外内防雷) Returns: tuple( filtered_picture_data, 按部件、缺陷类型、典型类型、其他类型层层分类的图片数据 total_num 总图片数量 ) """ # 过滤目标来源的图片数据 filtered_picture_data = {} total_num = 0 for partName, values in picture_data.items(): total_num = total_num + len(values) for pic in values: if pic['imageSource'] in image_source_to_find: if partName not in filtered_picture_data: filtered_picture_data[partName] = {} if pic['imageType'] not in filtered_picture_data[partName]: filtered_picture_data[partName][pic['imageType']] = [] filtered_picture_data[partName][pic['imageType']].append(pic) return filtered_picture_data, total_num def print_data(filtered_picture_data : dict[dict[list[dict]]]) -> None: """打印图片数据""" for partName, types in filtered_picture_data.items(): print(f"Part Name: {partName}") for imageType, pictures in types.items(): print(f" Image Type: {imageType}, Number of Pictures: {len(pictures)}") def get_records_with_pic(records: list[dict], filtered_picture_data: dict[str, dict[str, list[dict]]], defect_enum: str) -> tuple[dict[str, dict[str, list[dict]]], dict[str, list[dict]]]: """获取指定图片ID的记录 Args: records (list[dict]): 记录列表,每条记录包含部件名、缺陷类型等信息 filtered_picture_data (dict[str, dict[str, list[dict]]]): 图片数据,最外层的字典的键是部件名,内层的字典的键是缺陷类型,值是图片信息列表 defect_enum (str): 指定的缺陷类型 Returns: tuple( records_with_defect_pic: 按部件名分类的缺陷记录,映射到图片的imagePath defect_pic_with_no_records: 对应缺陷类型的图片没有记录的部件名映射 ) """ records_with_defect_pic = {} defect_pic_with_no_records = {} # 收集所有filtered_picture_data中的imageId all_image_ids = set() for partName, types in filtered_picture_data.items(): defect_pictures = types.get(defect_enum, []) for pic in defect_pictures: all_image_ids.add(pic.get('imageId')) # 过滤掉records中imageId不在all_image_ids中的记录 filtered_records = [record for record in records if record.get('imageId') in all_image_ids] # 遍历每个部件 for partName, types in filtered_picture_data.items(): # 获取指定缺陷类型的图片列表 defect_pictures = types.get(defect_enum, []) if defect_pictures: # 初始化部件名的字典 if partName not in records_with_defect_pic: records_with_defect_pic[partName] = {} # 初始化缺陷类型的字典 records_with_defect_pic[partName][defect_enum] = [] # 遍历缺陷图片列表,找到对应的记录并添加到字典中 for pic in defect_pictures: pic_id = pic.get('imageId') matched_records = [record for record in filtered_records if record.get('imageId') == pic_id] # 如果有匹配的记录,则添加到字典中 if matched_records: for record in matched_records: records_with_defect_pic[partName][defect_enum].append({ 'record': record, 'imagePath': pic.get('imagePath') }) else: # 如果没有匹配的记录,则将图片信息添加到没有记录的字典中 if partName not in defect_pic_with_no_records: defect_pic_with_no_records[partName] = [] defect_pic_with_no_records[partName].append({ 'defect_enum': defect_enum, 'imagePath': pic.get('imagePath') }) return records_with_defect_pic, defect_pic_with_no_records def get_template_pic(pic_name : str): """获取模板图片路径""" return get_resource_path(pic_name)