import os import math from PIL import Image from concurrent.futures import ThreadPoolExecutor def resize_and_reduce_quality(image_path, output_path, target_width = None): try: # 检查图片文件大小 if os.path.getsize(image_path) < 10 * 1024 * 1024: # 10MB print("图片文件大小小于10MB,不进行调整") return image_path # 打开图片 with Image.open(image_path) as img: # 计算新的高度以保持宽高比 if target_width is None: target_width = img.width aspect_ratio = img.height / img.width new_height = int(target_width * aspect_ratio) # 调整图片大小 img_resized = img.resize((target_width, new_height), Image.LANCZOS) # 降低图片质量 quality = 70 # 质量从1(最差)到95(最好),可以根据需要调整 img_resized.save(output_path, quality=quality) return output_path except Exception as e: return f"调整图片大小和质量时出现问题: {str(e)}" def get_picture_nums(source_path: str) -> int: picture_count = 0 for root, dirs, files in os.walk(source_path): for file in files: if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'): picture_count += 1 return picture_count def collect_defect_data( Y: str, picture_dir: str, search_file_list: list = [], ) -> tuple[int, dict]: """ 收集指定年份的缺陷图片数据,并根据布尔值决定是否扫描特定类型的缺陷图。 Args: Y: 叶片号,如 "Y1"、"Y2"、"Y3" picture_dir: 图片根目录 search_file_list (list, optional): 要搜索的文件列表.规定为3个元素 Returns: (缺陷图片总数, 缺陷图片文件名字典) """ total_num = 0 result_dict = {} try: for defect_type in search_file_list: dir_path = os.path.join(picture_dir, Y, defect_type) num, img_dict = get_picture_nums_and_image_with_name(dir_path) total_num += num result_dict.update(img_dict) except Exception as e: print(f"获取图片数据时出现问题: {str(e)},搜寻的目录:{dir_path}") return total_num, result_dict def get_picture_nums_and_image_with_name(source_path: str) -> tuple[int, dict]: """ 获取指定目录下图片的数量,并返回每个图片的路径和名称(字典) Args: source_path (str): 要搜索的目录路径 Returns: tuple: 包含两个元素的元组 picture_count (int): 图片数量 image_with_name (dict): 图片路径和名称的字典,格式为 {图片名称: 图片完整路径} """ picture_count = 0 image_with_name = {} name_list = [] for root, dirs, files in os.walk(source_path): for file in files: if file.lower().endswith(('.jpg', '.jpeg', '.png')) and not file.startswith('merged_thumbnail'): picture_count += 1 image_with_name[os.path.splitext(file)[0]] = os.path.join(root, file) return picture_count, image_with_name def find_image(directory, image_name): """ 在指定目录中查找指定名称的图片文件 参数: directory (str): 要搜索的目录路径 image_name (str): 要查找的图片文件名(可带扩展名或不带) 返回: str: 找到的图片完整路径,如果未找到则返回None """ # 支持的图片扩展名列表 image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'] # 遍历目录中的所有文件 for root, dirs, files in os.walk(directory): for file in files: # 获取文件名和扩展名 filename, ext = os.path.splitext(file) # 检查是否匹配图片名称(带或不带扩展名) if (file.lower() == image_name.lower() or filename.lower() == image_name.lower() and ext.lower() in image_extensions): return os.path.join(root, file) return None async def make_Thumbnail(source_path: str, output_path: str, size: tuple = (436, 233)) -> str: """获取目录下所有图片,将所有图片合并制作略缩图并保存 Args: source_path: 源目录 output_path: 输出目录 size: 合并后的略缩图总大小 (宽度, 高度) """ print("略缩图处理中") try: if not os.path.exists(output_path): print(f"无输出目录,创建中,输出目录为:{output_path}") os.makedirs(output_path) except Exception as e: print(f"输出目录有问题:{e}") return "" #如果存在merged_thumbnail.jpg文件,则直接返回该文件路径 if os.path.exists(os.path.join(output_path,'merged_thumbnail.jpg')): print(f"已有略缩图,不用处理, 目前如需重新生成,请去往{output_path}目录 删除 merged_thumbnail.jpg 图片") """ 此处可预留接口,询问用户是否重新生成一份略缩图 """ return os.path.join(output_path,'merged_thumbnail.jpg') print("目录中无略缩图,合并略缩图中") # 获取源目录下所有的图片文件 try: image_files = [] for root, dirs, files in os.walk(source_path): for file in files: if file.lower().endswith(('.jpg', '.jpeg', '.png')): image_files.append(os.path.join(root, file)) except Exception as e: print(f"递归获取图片失败,原因:{e}") if not image_files: print("源目录中没有找到图片文件") return "" # 计算每个缩略图的大小 num_images = len(image_files) target_width, target_height = size # 计算最佳的缩略图排列方式 # 先尝试计算每行可以放多少个缩略图 aspect_ratio = target_width / target_height cols = math.ceil(math.sqrt(num_images * aspect_ratio)) rows = math.ceil(num_images / cols) # 计算单个缩略图的大小 thumb_width = target_width // cols thumb_height = target_height // rows # 创建线程池处理图片 with ThreadPoolExecutor() as executor: thumbnails = list(executor.map( lambda file: create_thumbnail(file, (thumb_width, thumb_height)), image_files )) # 过滤掉 None 值 thumbnails = [thumb for thumb in thumbnails if thumb is not None] if not thumbnails: print("没有成功创建任何略缩图") return "" # 计算实际需要的行数和列数 actual_cols = min(len(thumbnails), cols) actual_rows = math.ceil(len(thumbnails) / actual_cols) # 创建合并后的图像 merged_image = Image.new('RGB', (actual_cols * thumb_width, actual_rows * thumb_height)) # 粘贴缩略图 for index, thumb in enumerate(thumbnails): row = index // actual_cols col = index % actual_cols merged_image.paste(thumb, (col * thumb_width, row * thumb_height)) # 如果最终尺寸不完全匹配,调整大小 if merged_image.size != size: merged_image = merged_image.resize(size, Image.LANCZOS) # 保存合并后的略缩图 merged_thumbnail_path = os.path.join(output_path, 'merged_thumbnail.jpg') merged_image.save(merged_thumbnail_path) print(f"合并后的略缩图已保存到:{merged_thumbnail_path}") return merged_thumbnail_path def create_thumbnail(file_path: str, size: tuple) -> Image: """创建单个图片的略缩图 Args: file_path: 图片文件路径 size: 缩略图大小 """ try: with Image.open(file_path) as img: # 保持原始宽高比 img.thumbnail(size, Image.LANCZOS) # 创建新图像确保尺寸一致 new_img = Image.new('RGB', size) new_img.paste(img, ((size[0] - img.width) // 2, (size[1] - img.height) // 2)) return new_img except Exception as e: print(f"图片处理有问题:{e}") return None def process_picture_data(picture_data : list[dict], image_source_to_find : list[str], quexian_type : str, dianxing_type : str, other_type : str ) -> tuple[list[str], list[str]]: """处理从数据库获取的图片数据 Args: picture_data (list[dict]): 图片数据 image_source_to_find (list[str]): 要查找的图片来源枚举(外内防雷) quexian_type (str): 缺陷类型枚举值 dianxing_type (str): 典型类型枚举值 other_type (str): 其他类型枚举值 Returns: tuple( defct_pictures, 缺陷图列表 dianxing_pictures, 典型图列表 other_pictures, 其他图列表 total_num 总图片数量 ) """ #过滤目标来源的图片数据 picture_data = [pic for pic in picture_data if pic['imageSource'] in image_source_to_find] #分别择出缺陷图和典型图 return ([pic for pic in picture_data if pic['imageType'] == quexian_type], [pic for pic in picture_data if pic['imageType'] == dianxing_type], [pic for pic in picture_data if pic['imageType'] == other_type], len(picture_data))