ZhaoBiao_Python/ronghetest.py

3095 lines
131 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import xmltodict
from openpyxl.formatting.rule import FormulaRule
from openpyxl.styles import PatternFill
from openpyxl.utils import get_column_letter
import tkinter as tk
from tkinter import ttk, messagebox
from tkinter.scrolledtext import ScrolledText
import datetime
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
import json
import smtplib
import openpyxl
from curl_cffi import requests
from bs4 import BeautifulSoup
import urllib.parse
from openpyxl.worksheet.datavalidation import DataValidation
import openpyxl
import re
from random import uniform
import os
import subprocess
import platform
import threading
import time
from tkcalendar import Calendar, DateEntry
import openai
from tqdm import tqdm
import time
from typing import Dict, List
from openai import OpenAI
class WebCrawlerApp:
def __init__(self, root):
# 添加中断标志
self.crawling_stopped = False
self.is_scheduled_task = False
self.root = root
self.root.title("招标信息爬取工具")
self.root.geometry("1000x750")
self.root.configure(bg="#f5f5f5")
self.receiver_emails = []
self.config_file = "crawler_config.json"
self.scheduled_event = threading.Event()
# 定时任务相关变量
self.scheduled_task = None
self.scheduled_thread = None
self.scheduled_running = False
self.next_scheduled_time = None
# 绑定窗口关闭事件
self.root.protocol("WM_DELETE_WINDOW", self.on_window_close)
# 设置现代化主题
self.style = ttk.Style()
self.style.theme_use('clam')
# 自定义现代化样式
self.style.configure('TFrame', background="#f5f5f5")
self.style.configure('TLabel', background="#f5f5f5", font=('Microsoft YaHei', 10))
self.style.configure('TButton', font=('Microsoft YaHei', 10), padding=6)
self.style.configure('TEntry', font=('Microsoft YaHei', 10), padding=5, relief="flat")
self.style.configure('TCombobox', font=('Microsoft YaHei', 10), padding=5)
self.style.configure('TLabelframe', font=('Microsoft YaHei', 10, 'bold'), background="#f5f5f5")
self.style.configure('TLabelframe.Label', font=('Microsoft YaHei', 10, 'bold'))
# 按钮状态样式
self.style.map('TButton',
foreground=[('active', 'white'), ('!active', 'white')],
background=[('active', '#4a6ea9'), ('!active', '#5d7bb5')],
relief=[('pressed', 'sunken'), ('!pressed', 'raised')]
)
# 关键词列表
self.keywords = []
# 创建主框架
self.main_frame = ttk.Frame(self.root, padding="10")
self.main_frame.pack(fill=tk.BOTH, expand=True)
# 创建输入框
self.create_widgets()
# 尝试加载上次的配置
self.load_config()
# 如果加载失败,设置默认值
if not self.keywords:
self.set_default_values()
def create_widgets(self):
# 顶部配置框架 - 使用现代化卡片式布局
top_frame = ttk.Frame(self.main_frame, style='Card.TFrame')
top_frame.grid(row=0, column=0, columnspan=2, sticky="ew", pady=(0, 10))
# 邮箱配置部分 - 添加卡片阴影效果
email_frame = ttk.LabelFrame(top_frame, text=" 邮箱配置 ", padding=(12, 8))
email_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=6, ipady=5)
# 发送方QQ邮箱用户名
ttk.Label(email_frame, text="发送方QQ邮箱用户名:").grid(row=0, column=0, padx=6, pady=3, sticky="w")
self.sender_username = ttk.Entry(email_frame, width=24)
self.sender_username.grid(row=0, column=1, padx=6, pady=3, sticky="w")
# 发送方邮箱
ttk.Label(email_frame, text="发送方QQ邮箱:").grid(row=1, column=0, padx=6, pady=3, sticky="w")
self.sender_email = ttk.Entry(email_frame, width=24)
self.sender_email.grid(row=1, column=1, padx=6, pady=3, sticky="w")
# 授权码
ttk.Label(email_frame, text="邮箱授权码:").grid(row=2, column=0, padx=6, pady=3, sticky="w")
self.sender_password = ttk.Entry(email_frame, width=24, show="*")
self.sender_password.grid(row=2, column=1, padx=6, pady=3, sticky="w")
# 接收方邮箱部分
receiver_frame = ttk.Frame(email_frame)
receiver_frame.grid(row=3, column=0, columnspan=2, sticky="ew", pady=3)
ttk.Label(receiver_frame, text="接收方邮箱:").pack(side=tk.LEFT, padx=6)
self.receiver_email = ttk.Entry(receiver_frame, width=20)
self.receiver_email.pack(side=tk.LEFT, padx=4)
self.add_receiver_button = ttk.Button(
receiver_frame,
text="添加",
command=self.add_receiver_email,
width=6,
style='Accent.TButton'
)
self.add_receiver_button.pack(side=tk.LEFT, padx=2)
# 接收方邮箱列表
self.receiver_listbox = tk.Listbox(
email_frame,
width=26,
height=3,
bg="white",
fg="#333333",
selectbackground="#4a6ea9",
selectforeground="white",
font=('Microsoft YaHei', 9),
relief="flat",
highlightthickness=1,
highlightcolor="#cccccc",
highlightbackground="#cccccc"
)
self.receiver_listbox.grid(row=4, column=0, columnspan=2, padx=6, pady=3, sticky="ew")
# 删除接收方按钮
self.remove_receiver_button = ttk.Button(
email_frame,
text="删除选中",
command=self.remove_receiver_email,
style='Accent.TButton'
)
self.remove_receiver_button.grid(row=5, column=0, columnspan=2, padx=6, pady=3, sticky="ew")
# 爬取配置部分 - 现代化设计
config_frame = ttk.LabelFrame(top_frame, text=" 爬取配置 ", padding=(12, 8))
config_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=6, ipady=5)
# 天数配置
days_frame = ttk.Frame(config_frame)
days_frame.pack(fill=tk.X, pady=(0, 8))
ttk.Label(days_frame, text="筛选多少天内的报告:").pack(side=tk.LEFT, padx=6)
self.days = ttk.Entry(days_frame, width=14)
self.days.pack(side=tk.LEFT, padx=6)
# 网站选择区域
ttk.Label(config_frame, text="选择爬取网站:").pack(anchor="w", padx=6, pady=(6, 0))
# 网站选择复选框 - 使用现代化网格布局
checkbox_frame = ttk.Frame(config_frame)
checkbox_frame.pack(fill=tk.X, padx=6, pady=6)
self.website_vars = {} # 存储每个网站的变量
websites = ["国能e招", "三峡招标","三峡采购", "国能e购", "中国节能", "北京京能", "华润守正", "华电电子", "科环集团"]
# 自动计算列数每列最多4个网站
max_per_column = 4
num_columns = (len(websites) + max_per_column - 1) // max_per_column
# 创建需要的列数
for col in range(num_columns):
col_frame = ttk.Frame(checkbox_frame)
col_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=4)
# 获取当前列应该显示的网站
start_idx = col * max_per_column
end_idx = start_idx + max_per_column
current_websites = websites[start_idx:end_idx]
# 在当前列添加网站复选框
for website in current_websites:
var = tk.BooleanVar()
chk = ttk.Checkbutton(
col_frame,
text=website,
variable=var,
style='TCheckbutton'
)
chk.pack(anchor="w", pady=3)
self.website_vars[website] = var
# 爬取按钮框架 - 现代化按钮组
crawl_button_frame = ttk.Frame(config_frame)
crawl_button_frame.pack(fill=tk.X, pady=(8, 0))
# 使用grid布局使按钮均匀分布
crawl_button_frame.columnconfigure(0, weight=1)
crawl_button_frame.columnconfigure(1, weight=1)
crawl_button_frame.columnconfigure(2, weight=1)
# 一键爬取所有按钮
self.crawl_all_button = ttk.Button(
crawl_button_frame,
text="一键爬取",
command=self.crawl_all_sites,
style='Accent.TButton'
)
self.crawl_all_button.grid(row=0, column=0, padx=4, sticky="ew")
# 开始爬取按钮
self.start_button = ttk.Button(
crawl_button_frame,
text="开始爬取",
command=self.start_crawling,
style='Accent.TButton'
)
self.start_button.grid(row=0, column=1, padx=4, sticky="ew")
# 中断爬取按钮
self.stop_button = ttk.Button(
crawl_button_frame,
text="中断爬取",
command=self.stop_crawling,
style='Stop.TButton'
)
self.style.configure('Stop.TButton', background='#d9534f')
self.stop_button.grid(row=0, column=2, padx=4, sticky="ew")
self.stop_button.config(state=tk.DISABLED)
# 定时任务配置部分 - 现代化设计
schedule_frame = ttk.LabelFrame(top_frame, text=" 定时任务 ", padding=(12, 8))
schedule_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=6, ipady=5)
# 定时任务开关
self.schedule_var = tk.BooleanVar()
self.schedule_check = ttk.Checkbutton(
schedule_frame,
text="启用定时任务",
variable=self.schedule_var,
command=self.toggle_schedule,
style='TCheckbutton'
)
self.schedule_check.pack(anchor="w", padx=6, pady=4)
# 定时时间设置
time_frame = ttk.Frame(schedule_frame)
time_frame.pack(fill=tk.X, padx=6, pady=4)
ttk.Label(time_frame, text="定时时间:").pack(side=tk.LEFT)
# 小时选择
self.hour_var = tk.StringVar(value="09")
self.hour_spin = ttk.Spinbox(
time_frame,
from_=0, to=23,
width=3,
textvariable=self.hour_var,
validate='key',
validatecommand=(self.root.register(self.validate_hour), '%P'),
font=('Microsoft YaHei', 10)
)
self.hour_spin.pack(side=tk.LEFT, padx=4)
ttk.Label(time_frame, text=":").pack(side=tk.LEFT)
# 分钟选择
self.minute_var = tk.StringVar(value="00")
self.minute_spin = ttk.Spinbox(
time_frame,
from_=0, to=59,
width=3,
textvariable=self.minute_var,
validate='key',
validatecommand=(self.root.register(self.validate_minute), '%P'),
font=('Microsoft YaHei', 10)
)
self.minute_spin.pack(side=tk.LEFT, padx=4)
# 定时频率设置
interval_frame = ttk.Frame(schedule_frame)
interval_frame.pack(fill=tk.X, padx=6, pady=4)
ttk.Label(interval_frame, text="频率(天):").pack(side=tk.LEFT)
self.schedule_interval = ttk.Spinbox(
interval_frame,
from_=1, to=30,
width=5,
validate='key',
validatecommand=(self.root.register(self.validate_interval), '%P'),
font=('Microsoft YaHei', 10)
)
self.schedule_interval.pack(side=tk.LEFT, padx=6)
self.schedule_interval.set(1)
# 定时任务状态显示
self.schedule_status_var = tk.StringVar(value="状态: 已停止")
self.schedule_status_label = ttk.Label(
schedule_frame,
textvariable=self.schedule_status_var,
foreground="#d9534f",
font=('Microsoft YaHei', 9)
)
self.schedule_status_label.pack(anchor="w", padx=6, pady=2)
# 下次执行时间显示
self.next_run_label = ttk.Label(
schedule_frame,
text="下次执行: 未设置",
font=('Microsoft YaHei', 9)
)
self.next_run_label.pack(anchor="w", padx=6, pady=2)
# 关键词管理部分 - 现代化卡片设计
keyword_frame = ttk.LabelFrame(self.main_frame, text=" 关键词管理 ", padding=(12, 8))
keyword_frame.grid(row=1, column=0, padx=6, pady=6, sticky="nsew")
# 关键词输入框和按钮框架
input_frame = ttk.Frame(keyword_frame)
input_frame.pack(fill=tk.X, pady=(0, 6))
self.keyword_entry = ttk.Entry(
input_frame,
width=28,
font=('Microsoft YaHei', 10))
self.keyword_entry.pack(side=tk.LEFT, padx=(0, 6), fill=tk.X, expand=True)
# 添加关键词按钮
self.add_button = ttk.Button(
input_frame,
text="添加",
command=self.add_keyword,
width=6,
style='Accent.TButton'
)
self.add_button.pack(side=tk.LEFT)
# 关键词列表 - 使用Frame包装确保正确显示
listbox_frame = ttk.Frame(keyword_frame)
listbox_frame.pack(fill=tk.BOTH, expand=True)
self.keyword_listbox = tk.Listbox(
listbox_frame,
width=28,
height=8,
bg="white",
fg="#333333",
selectbackground="#4a6ea9",
selectforeground="white",
font=('Microsoft YaHei', 10),
relief="flat",
highlightthickness=1,
highlightcolor="#cccccc",
highlightbackground="#cccccc"
)
self.keyword_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# 添加滚动条
scrollbar = ttk.Scrollbar(listbox_frame, orient=tk.VERTICAL, command=self.keyword_listbox.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.keyword_listbox.config(yscrollcommand=scrollbar.set)
# 操作按钮框架
button_frame = ttk.Frame(keyword_frame)
button_frame.pack(fill=tk.X, pady=(6, 0))
self.clear_button = ttk.Button(
button_frame,
text="清空",
command=self.clear_keywords,
style='Accent.TButton'
)
self.clear_button.pack(side=tk.LEFT, padx=(0, 6), expand=True)
self.remove_button = ttk.Button(
button_frame,
text="删除选中",
command=self.remove_keyword,
style='Accent.TButton'
)
self.remove_button.pack(side=tk.LEFT, expand=True)
# 日志输出部分 - 现代化设计
log_frame = ttk.LabelFrame(self.main_frame, text=" 运行日志 ", padding=(12, 8))
log_frame.grid(row=1, column=1, rowspan=2, padx=6, pady=6, sticky="nsew")
self.log_text = ScrolledText(
log_frame,
width=60,
height=22,
wrap=tk.WORD,
bg="#f8f8f8",
fg="#333333",
font=('Consolas', 10),
relief="flat",
padx=8,
pady=8
)
self.log_text.pack(fill=tk.BOTH, expand=True)
self.log_text.config(state="disabled")
# 操作按钮部分 - 现代化按钮组
button_frame = ttk.Frame(self.main_frame)
button_frame.grid(row=2, column=0, pady=10, sticky="ew")
self.save_config_button = ttk.Button(
button_frame,
text="保存配置",
command=self.save_config,
style='Accent.TButton'
)
self.save_config_button.pack(side=tk.LEFT, padx=6, expand=True, fill=tk.X)
self.clear_log_button = ttk.Button(
button_frame,
text="清空日志",
command=self.clear_log,
style='Accent.TButton'
)
self.clear_log_button.pack(side=tk.LEFT, padx=6, expand=True, fill=tk.X)
self.exit_button = ttk.Button(
button_frame,
text="退出",
command=self.on_exit,
style='Accent.TButton'
)
self.exit_button.pack(side=tk.LEFT, padx=6, expand=True, fill=tk.X)
# 设置网格布局权重
self.main_frame.columnconfigure(0, weight=1)
self.main_frame.columnconfigure(1, weight=2)
self.main_frame.rowconfigure(1, weight=1)
# 绑定事件
def stop_crawling(self):
"""中断爬取"""
self.crawling_stopped = True
self.log("正在中断爬取,请稍候...")
self.stop_button.config(state=tk.DISABLED)
def on_window_close(self):
"""处理窗口关闭事件"""
response = messagebox.askyesnocancel("退出", "是否保存当前配置后再退出?")
if response is True: # 用户点击"是"
self.save_config()
self._shutdown() # 执行退出操作
elif response is False: # 用户点击"否"
self._shutdown() # 执行退出操作
def add_receiver_email(self):
"""添加接收方邮箱"""
email = self.receiver_email.get().strip()
if email:
if email not in self.receiver_emails:
self.receiver_emails.append(email)
self.receiver_listbox.insert(tk.END, email)
self.receiver_email.delete(0, tk.END)
self.log(f"添加接收方邮箱: {email}")
else:
messagebox.showwarning("警告", "该邮箱已存在!")
else:
messagebox.showwarning("警告", "请输入邮箱地址!")
def select_all_websites(self):
"""选择所有网站复选框"""
for website, var in self.website_vars.items():
var.set(True)
self.log("已选择所有网站")
def crawl_all_sites(self):
"""一键爬取所有网站"""
if not self.validate_inputs():
return
# 重置中断标志
self.crawling_stopped = False
# 启用中断按钮
self.stop_button.config(state=tk.NORMAL)
# 禁用开始按钮防止重复点击
self.crawl_all_button.config(state=tk.DISABLED)
# 选择所有网站
self.select_all_websites()
# 执行爬取
self.crawl_selected_sites()
self.stop_button.config(state=tk.DISABLED)
# 爬取完成后恢复按钮状态
self.crawl_all_button.config(state=tk.NORMAL)
def remove_receiver_email(self):
"""删除选中的接收方邮箱"""
selected = self.receiver_listbox.curselection()
if selected:
index = selected[0]
email = self.receiver_listbox.get(index)
self.receiver_listbox.delete(index)
self.receiver_emails.remove(email)
self.log(f"删除接收方邮箱: {email}")
else:
messagebox.showwarning("警告", "请先选择要删除的邮箱!")
def _shutdown(self):
"""执行退出前的清理工作"""
self.scheduled_running = False
self.scheduled_event.set()
self.root.destroy()
def set_default_values(self):
# 设置默认值
self.sender_username.insert(0, "xyq")
self.sender_email.insert(0, "2501327538@qq.com")
self.sender_password.insert(0, "pxsuyxocvjlwebaa")
self.receiver_email.insert(0, "black.hyxz@gmail.com")
default_email = "black.hyxz@gmail.com"
self.receiver_emails.append(default_email)
self.receiver_listbox.insert(tk.END, default_email)
self.days.insert(0, "5")
self.website_combobox.current(0)
# 添加默认关键词
default_keywords = [
"巡检", "叶片检查", "检测", "防雷", "导通", "无人机", "叶片维修", "测绘",
"建模", "实景三维", "数字化", "智能化", "机器人", "应用研究",
"平台开发", "清洗", "除锈", "喷涂", "防腐"
]
for keyword in default_keywords:
self.keywords.append(keyword)
self.keyword_listbox.insert(tk.END, keyword)
def save_config(self):
"""保存当前配置到文件"""
selected_websites = [website for website, var in self.website_vars.items() if var.get()]
config = {
"sender_username": self.sender_username.get(),
"sender_email": self.sender_email.get(),
"sender_password": self.sender_password.get(),
# "receiver_email": self.receiver_email.get(),
"receiver_emails": self.receiver_emails, # 改为保存邮箱列表
"days": self.days.get(),
#"website": self.website_var.get(),
"selected_websites": selected_websites,
"keywords": self.keywords,
"schedule_enabled": self.schedule_var.get(),
"schedule_time": self.get_schedule_time(), # 使用格式化后的时间
"schedule_interval": self.schedule_interval.get()
}
try:
with open(self.config_file, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
self.log("配置已保存")
messagebox.showinfo("成功", "配置已成功保存!")
except Exception as e:
self.log(f"保存配置失败: {str(e)}")
messagebox.showerror("错误", f"保存配置失败:\n{str(e)}")
def validate_hour(self, value):
"""验证小时输入"""
if value == "":
return True
try:
num = int(value)
return 0 <= num <= 23
except ValueError:
return False
def validate_minute(self, value):
"""验证分钟输入"""
if value == "":
return True
try:
num = int(value)
return 0 <= num <= 59
except ValueError:
return False
def validate_interval(self, value):
"""验证间隔天数"""
if value == "":
return True
try:
num = int(value)
return num >= 1
except ValueError:
return False
def get_schedule_time(self):
"""获取格式化后的定时时间"""
hour = self.hour_var.get().zfill(2)
minute = self.minute_var.get().zfill(2)
return f"{hour}:{minute}"
def setup_scheduled_task(self):
"""设置定时任务"""
if not self.validate_schedule_inputs():
self.schedule_var.set(False)
return
# 解析定时时间
try:
schedule_time = self.get_schedule_time()
hours, minutes = map(int, schedule_time.split(':'))
interval_days = int(self.schedule_interval.get())
# 计算下次执行时间
now = datetime.datetime.now()
next_time = now.replace(hour=hours, minute=minutes, second=0, microsecond=0)
# 如果今天的时间已经过了,就安排到明天
if next_time < now:
next_time += datetime.timedelta(days=1)
self.next_scheduled_time = next_time
self.update_next_run_label()
# 启动定时任务线程
self.scheduled_running = True
if not self.scheduled_thread or not self.scheduled_thread.is_alive():
self.scheduled_thread = threading.Thread(target=self.schedule_loop, daemon=True)
self.scheduled_thread.start()
self.schedule_status_var.set("状态: 运行中")
self.schedule_status_label.config(foreground="green")
self.log(f"定时任务已启用,将在 {next_time.strftime('%Y-%m-%d %H:%M:%S')} 执行")
except Exception as e:
self.log(f"设置定时任务失败: {str(e)}")
messagebox.showerror("错误", f"设置定时任务失败:\n{str(e)}")
self.schedule_var.set(False)
def cancel_scheduled_task(self):
"""取消定时任务"""
self.scheduled_running = False
self.scheduled_event.set() # 唤醒线程以便退出
self.next_scheduled_time = None
self.next_run_label.config(text="下次执行时间: 未设置")
self.schedule_status_var.set("状态: 已停止")
self.schedule_status_label.config(foreground="red")
self.log("定时任务已取消")
def load_config(self):
"""从文件加载配置"""
if not os.path.exists(self.config_file):
return False
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
# 恢复邮箱配置
self.sender_username.delete(0, tk.END)
self.sender_username.insert(0, config.get("sender_username", ""))
self.sender_email.delete(0, tk.END)
self.sender_email.insert(0, config.get("sender_email", ""))
self.sender_password.delete(0, tk.END)
self.sender_password.insert(0, config.get("sender_password", ""))
self.receiver_emails = config.get("receiver_emails", [])
self.receiver_listbox.delete(0, tk.END)
for email in self.receiver_emails:
self.receiver_listbox.insert(tk.END, email)
# 恢复爬取配置
self.days.delete(0, tk.END)
self.days.insert(0, config.get("days", "5"))
selected_websites = config.get("selected_websites", [])
for website, var in self.website_vars.items():
var.set(website in selected_websites)
# 恢复关键词
self.keywords = config.get("keywords", [])
self.keyword_listbox.delete(0, tk.END)
for keyword in self.keywords:
self.keyword_listbox.insert(tk.END, keyword)
# 恢复定时任务配置
self.schedule_var.set(config.get("schedule_enabled", False))
schedule_time = config.get("schedule_time", "09:00")
if ':' in schedule_time:
hours, minutes = schedule_time.split(':')
self.hour_var.set(hours.zfill(2))
self.minute_var.set(minutes.zfill(2))
self.schedule_interval.set(config.get("schedule_interval", 1))
# 如果定时任务启用,则启动定时任务
if self.schedule_var.get():
self.setup_scheduled_task()
self.log("配置已从文件加载")
return True
except Exception as e:
self.log(f"加载配置失败: {str(e)}")
return False
def on_exit(self):
self.on_window_close()
def add_keyword(self):
keyword = self.keyword_entry.get().strip()
if keyword:
if keyword not in self.keywords:
self.keywords.append(keyword)
self.keyword_listbox.insert(tk.END, keyword)
self.keyword_entry.delete(0, tk.END)
self.log(f"添加关键词: {keyword}")
else:
messagebox.showwarning("警告", "该关键词已存在!")
else:
messagebox.showwarning("警告", "请输入关键词!")
def remove_keyword(self):
selected = self.keyword_listbox.curselection()
if selected:
index = selected[0]
keyword = self.keyword_listbox.get(index)
self.keyword_listbox.delete(index)
self.keywords.remove(keyword)
self.log(f"删除关键词: {keyword}")
else:
messagebox.showwarning("警告", "请先选择要删除的关键词!")
def clear_keywords(self):
self.keyword_listbox.delete(0, tk.END)
self.keywords.clear()
self.log("已清空关键词列表")
def log(self, message):
self.log_text.config(state="normal")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if "成功发送至" in message:
self.log_text.tag_config('success', foreground='green')
self.log_text.insert(tk.END, f"[{timestamp}] ", 'success')
self.log_text.insert(tk.END, f"{message}\n", 'success')
elif "发送失败" in message:
self.log_text.tag_config('error', foreground='red')
self.log_text.insert(tk.END, f"[{timestamp}] ", 'error')
self.log_text.insert(tk.END, f"{message}\n", 'error')
else:
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
self.log_text.config(state="disabled")
self.log_text.see(tk.END)
self.root.update()
def clear_log(self):
self.log_text.config(state="normal")
self.log_text.delete(1.0, tk.END)
self.log_text.config(state="disabled")
def validate_inputs(self):
if not self.sender_username.get():
messagebox.showerror("错误", "请输入发送方QQ邮箱用户名!")
return False
if not self.sender_email.get():
messagebox.showerror("错误", "请输入发送方QQ邮箱!")
return False
if not self.sender_password.get():
messagebox.showerror("错误", "请输入邮箱授权码!")
return False
try:
days = int(self.days.get())
if days <= 0:
messagebox.showerror("错误", "天数必须为正整数!")
return False
except ValueError:
messagebox.showerror("错误", "请输入有效的天数!")
return False
# Ensure at least one website is selected
if not any(var.get() for var in self.website_vars.values()):
messagebox.showerror("错误", "请至少选择一个爬取网站!")
return False
if not self.keywords:
messagebox.showerror("错误", "请至少添加一个关键词!")
return False
if not self.receiver_emails:
messagebox.showerror("错误", "请至少添加一个接收方邮箱!")
return False
return True
def start_crawling(self):
if not self.validate_inputs():
return
# 重置中断标志
self.crawling_stopped = False
# 启用中断按钮
self.stop_button.config(state=tk.NORMAL)
# 禁用开始按钮防止重复点击
self.start_button.config(state=tk.DISABLED)
# 开始爬取
self.crawl_selected_sites()
# 爬取完成后恢复按钮状态
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
def crawl_selected_sites(self):
"""爬取用户选中的网站"""
selected_websites = [website for website, var in self.website_vars.items() if var.get()]
if not selected_websites:
messagebox.showwarning("警告", "请至少选择一个网站进行爬取!")
return
if not self.validate_inputs():
return
all_matched_articles = {}
total_all_matched = 0
for website in selected_websites:
if self.crawling_stopped:
self.log(f"爬取已被用户中断")
break
self.log(f"\n{'='*40}")
self.log(f"开始爬取 {website} 最近{self.days.get()}天的文章...")
try:
matched_articles = self.crawl_single_site(website)
if matched_articles:
total_matched = len(matched_articles)
total_all_matched += total_matched
all_matched_articles[website] = matched_articles
self.log(f"{website} 爬取完成,找到 {total_matched} 条匹配结果")
except Exception as e:
self.log(f"爬取 {website} 时发生错误: {str(e)}")
if not self.is_scheduled_task:
messagebox.showerror("错误", f"爬取 {website} 时发生错误:\n{str(e)}")
continue
# 汇总所有结果并保存到一个Excel文件
if total_all_matched > 0 and not self.crawling_stopped:
self.log(f"\n所有选中网站爬取完成,共找到{total_all_matched}篇文章")
# 询问用户是否要保存结果
if not self.is_scheduled_task: # 定时任务自动保存,不询问
save = messagebox.askyesno("保存结果", "是否要保存爬取结果?")
if not save:
return
# 让用户选择保存格式
if not self.is_scheduled_task:
file_type = self.ask_save_file_type()
if file_type is None: # 用户取消
return
else: # 定时任务默认保存Excel
file_type = "excel"
# 根据选择的格式保存文件
if file_type == "excel":
filename = self.save_all_articles_to_excel(all_matched_articles, int(self.days.get()))
elif file_type == "html":
filename = self.save_all_articles_as_html(all_matched_articles, int(self.days.get()))
elif file_type == "markdown":
filename = self.save_all_articles_to_markdown(all_matched_articles, int(self.days.get()))
elif file_type == "text":
filename = self.save_all_articles_to_text(all_matched_articles, int(self.days.get()))
elif file_type == "wechat":
filename = self.save_all_articles_for_wechat(all_matched_articles, int(self.days.get()))
# 询问用户是否要打开文件
if not self.is_scheduled_task and filename:
if messagebox.askyesno("完成", f"结果已保存到 {filename}\n是否要打开文件?"):
self.open_file(filename)
# 询问用户是否要发送邮件
if not self.is_scheduled_task and filename and self.receiver_emails:
send_email = messagebox.askyesno("发送邮件", "是否要将结果发送到配置的邮箱?")
if send_email:
email_config = {
"smtp_server": "smtp.qq.com",
"smtp_port": 465,
"sender_email": self.sender_email.get(),
"sender_password": self.sender_password.get()
}
self.send_all_email_with_excel(filename, int(self.days.get()), total_all_matched, email_config)
elif self.crawling_stopped:
self.log("爬取已被用户中断")
else:
self.log("\n所有选中网站爬取完成,没有找到符合条件的文章")
if not self.is_scheduled_task:
messagebox.showinfo("完成", "所有选中网站爬取完成,没有找到符合条件的文章。")
def ask_save_file_type(self):
"""弹出对话框让用户选择保存文件类型"""
file_types = [
("Excel 文件", "excel"),
("HTML 文件", "html"),
("Markdown 文件", "markdown"),
("文本文件", "text"),
("微信格式", "wechat")
]
dialog = tk.Toplevel(self.root)
dialog.title("选择保存格式")
dialog_width = 350
dialog_height = 250
dialog.resizable(False, False)
# 计算居中位置
screen_width = dialog.winfo_screenwidth()
screen_height = dialog.winfo_screenheight()
x = (screen_width // 2) - (dialog_width // 2)
y = (screen_height // 2) - (dialog_height // 2)
dialog.geometry(f"{dialog_width}x{dialog_height}+{x}+{y}")
# 主框架
main_frame = ttk.Frame(dialog, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 标签
label = ttk.Label(main_frame, text="请选择保存的文件格式:")
label.pack(pady=(0, 10))
# 单选按钮框架
radio_frame = ttk.Frame(main_frame)
radio_frame.pack(fill=tk.X, padx=10)
selected_type = tk.StringVar(value="excel")
for text, value in file_types:
rb = ttk.Radiobutton(
radio_frame,
text=text,
variable=selected_type,
value=value
)
rb.pack(anchor="w", padx=10, pady=2)
# 按钮框架 - 使用grid布局使按钮居中
button_frame = ttk.Frame(main_frame)
button_frame.pack(pady=(15, 0), fill=tk.X)
# 添加空白列使按钮居中
button_frame.columnconfigure(0, weight=1)
button_frame.columnconfigure(3, weight=1)
def on_ok():
dialog.result = selected_type.get()
dialog.destroy()
def on_cancel():
dialog.result = None
dialog.destroy()
ok_button = ttk.Button(button_frame, text="确定", command=on_ok)
ok_button.grid(row=0, column=1, padx=5, ipadx=10)
cancel_button = ttk.Button(button_frame, text="取消", command=on_cancel)
cancel_button.grid(row=0, column=2, padx=5, ipadx=10)
# 使对话框模态化
dialog.transient(self.root)
dialog.grab_set()
self.root.wait_window(dialog)
return getattr(dialog, "result", None)
def crawl_single_site(self, website):
"""爬取单个网站"""
"""爬取单个网站"""
# 获取输入参数
email_config = {
"smtp_server": "smtp.qq.com",
"smtp_port": 465,
"sender_email": self.sender_email.get(),
"sender_password": self.sender_password.get(),
"receiver_email": self.receiver_email.get()
}
days = int(self.days.get())
source = ""
if website == "国能e招":
source = "neet"
elif website == "三峡招标":
source = "ctg"
elif website == "三峡采购":
source = "ctgc"
elif website == "国能e购":
source = "chnenergy"
elif website == "中国节能":
source="chinaedb"
elif website =="北京京能":
source = "beijing"
elif website =="华润守正":
source = "hrsz"
elif website =="华电电子":
source = "zghn"
elif website =="科环集团":
source = "kh"
else:
self.log(f"未知网站: {website}")
return
self.log(f"开始爬取{website}最近{days}天的文章...")
# 执行爬取
try:
if source == "neet":
matched_articles = self.crawl_neet_shop(days)
elif source == "ctg":
matched_articles = self.crawl_ctg(days)
elif source == "ctgc":
matched_articles = self.crawl_sanxiacaigou(days)
elif source == "chnenergy":
matched_articles = self.crawl_chnenergy(days)
elif source == "chinaedb":
matched_articles = self.crawl_chinaedb(days)
elif source == "beijing":
matched_articles = self.crawl_beijing(days)
elif source == "hrsz":
matched_articles = self.crawl_hrsz(days)
elif source == "zghn":
matched_articles = self.crawl_zghn(days)
elif source == "kh":
matched_articles = self.crawl_kh(days)
else:
self.log(f"未知网站: {website}")
return
total_matched = len(matched_articles)
self.log(f"{website}爬取结束,共找到{total_matched}篇最近{days}天内发布的文章")
except Exception as e:
self.log(f"发生错误: {str(e)}")
if not self.is_scheduled_task:
messagebox.showerror("错误", f"{website}爬取过程中发生错误:\n{str(e)}")
return matched_articles
def open_file(self, filepath):
"""打开文件"""
try:
if platform.system() == 'Darwin': # macOS
subprocess.call(('open', filepath))
elif platform.system() == 'Windows': # Windows
os.startfile(filepath)
else: # linux variants
subprocess.call(('xdg-open', filepath))
self.log(f"已打开文件: {filepath}")
except Exception as e:
self.log(f"打开文件失败: {str(e)}")
messagebox.showerror("错误", f"无法打开文件:\n{str(e)}")
def crawl_kh(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取科环集团最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
all_matched = []
channels_categories = [
("cggg1", ['fw', 'hw', 'gc']),
("cggg2", ['fw', 'hw', 'gc']),
("cggg3", [None])
]
for channel, categories in channels_categories:
for category in categories:
page = 1
while True:
if self.crawling_stopped:
return all_matched
self.log(f"[{channel}{category if category else ''}] 正在爬取第 {page:2d} 页...")
info = self.get_kh_source(page, days, channel, category)
if not info:
page += 1
self.log(f" ⚠️ {channel}{category if category else ''}{page:2d} 页无数据,跳过")
continue
elif info == -2:
self.log(f" ❌ 获取 {channel}{category if category else ''} 页面失败,请检查网络连接")
break
elif info == -1:
self.log(f" ⏹️ {channel}{category if category else ''} 已到达查询时间范围")
break
if info[-1] == -1:
all_matched.extend(info[:-1])
self.log(f" ⏹️ {channel}{category if category else ''} 已到达查询时间范围")
break
else:
all_matched.extend(info)
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(all_matched, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 类别: {info['来源类别']}")
self.log(f" 🔹 标题: {info['标题']}")
self.log(f" 🔸 类型: {info['公告类型']}")
self.log(f" 🔹 发布日期: {info['发布日期'].strftime('%Y-%m-%d') if info['发布日期'] else '无日期信息'}")
self.log(f" 🔸 投标时间: {info['投标开始时间']}")
self.log(f" 🔹 详情链接: {info['详情链接']}")
self.log(f"{'-'*50}")
return all_matched
def get_kh_source(self, page, days, channel, category=None):
"""通用爬取函数根据channel和category参数爬取不同类别的页面"""
BASE_URL = "https://khjtcgpt.chnenergy.com.cn"
HEADERS = {
"Referer": BASE_URL,
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
}
if channel == "cggg3":
url = f"{BASE_URL}/cms/channel/{channel}/index.htm?pageNo={page}"
else:
url = f"{BASE_URL}/cms/channel/{channel}{category}/index.htm?pageNo={page}"
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=(days + 1))
try:
response = requests.get(url, impersonate="chrome110", headers=HEADERS)
if not response.ok:
self.log(f" ❌ 请求失败,状态码: {response.status_code}")
return -2
soup = BeautifulSoup(response.text, 'html.parser')
bidding_items = soup.select('ul#list1 li a')
results = []
for item in bidding_items:
title_span = item.find('span', class_='bidLink')
title = title_span.get_text(strip=True) if title_span else item.get('title', '').strip()
if channel == "cggg2":
em_tag = title_span.find('em') if title_span else None
bid_type = em_tag.get_text(strip=True).strip('[]') if em_tag else "未知类型"
else:
bid_type = item.find('em').get_text(strip=True).strip('[]') if item.find('em') else "未知类型"
publish_date = item.find('span', class_='bidDate').get_text(strip=True) if item.find('span', class_='bidDate') else None
try:
publish_date = datetime.datetime.strptime(publish_date, '%Y-%m-%d') if publish_date else None
except ValueError:
publish_date = None
self.log(" ⚠️ 日期转换失败")
if channel == "cggg2":
bid_start = "无投标时间信息"
else:
time_info = item.find('span', class_='bidTime')
if time_info:
input_tag = time_info.find('input')
bid_start = input_tag.get('buystart_1', '').split()[0] if input_tag else ''
else:
bid_start = ''
detail_url = item.get('href', '')
if detail_url and not detail_url.startswith('http'):
detail_url = BASE_URL + detail_url
if publish_date and start_time > publish_date:
results.append(-1)
return results
if any(keyword in title for keyword in self.keywords):
results.append({
'标题': title.replace(bid_type, '').strip() if bid_type != "未知类型" else title,
'公告类型': bid_type,
'发布日期': publish_date,
'投标开始时间': bid_start,
'详情链接': detail_url,
'来源类别': f"{channel}{category}" if category else channel
})
return results
except Exception as e:
self.log(f" ❌ 爬取过程中发生错误: {str(e)}")
return -2
def crawl_zghn(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取华电电子最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
matched_articles = []
page = 1
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[中国华电] 正在爬取第 {page:2d} 页...")
results = self.get_zghn_results(page, days)
if results == -2:
self.log(" ❌ 网页找不到,请检查网页是否能进行访问")
break
elif results == -1:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
elif results:
size = len(results)
if size and results[size-1] == -1:
if size > 1:
try:
self.log(f" ✅ 第 {page:2d} 页共 {size-1:2d} 条招标信息")
for i in range(size-1):
matched_articles.append(results[i])
except IndexError as e:
self.log(f" ❌ 发生 IndexError: {e}")
finally:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
else:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
if not results:
page += 1
continue
self.log(f" ✅ 第 {page:2d} 页共 {len(results):2d} 条招标信息")
for i, item in enumerate(results, 1):
self.log(f"\n 【公告 {i:02d}")
self.log(f" 🔸 公告状态: {item['公告状态']}")
self.log(f" 🔹 公告标题: {item['公告标题']}")
self.log(f" 🔸 业务类型: {item['业务类型']}")
self.log(f" 🔹 发布日期: {item['发布日期']}")
self.log(f" 🔸 链接: {item['链接']}")
self.log(f" {'-'*40}")
matched_articles.append(item)
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 公告状态: {info['公告状态']}")
self.log(f" 🔹 公告标题: {info['公告标题']}")
self.log(f" 🔸 业务类型: {info['业务类型']}")
self.log(f" 🔹 发布日期: {info['发布日期']}")
self.log(f" 🔸 链接: {info['链接']}")
self.log(f"{'-'*50}")
return matched_articles
def get_zghn_results(self, page, days):
url1 = "https://www.chdtp.com/webs/queryWebZbgg.action?zbggType=1&ggtype=&bustype=&ggbt=&id_gonggaoshrq=&jump=2&page.pageSize=20&page.currentpage=" + str(page) + "&page.totalCount=35544"
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
BaseURL = "https://www.chdtp.com/staticPage/"
try:
response = requests.get(
url1,
impersonate="chrome110",
headers={
"Referer": url1,
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
}
)
soup = BeautifulSoup(response.text, 'html.parser')
rows = soup.find_all('tr', style="height: 33px;")
if not rows:
return -2
bidding_info = []
for row in rows:
# 提取每个单元格的信息
status = row.find('td', class_='td_1').get_text(strip=True)
title_tag = row.find('td', class_='td_2').find('a')
title = title_tag.get_text(strip=True)
link = title_tag['href']
link = link.split("'")[1]
link = BaseURL + link
business_type = row.find('td', class_='td_3').get_text(strip=True)
publish_date = row.find('td', class_='td_4').get_text(strip=True)
publish_date = publish_date.strip("[]")
publish_date = datetime.datetime.strptime(publish_date, '%Y-%m-%d')
if publish_date < start_time:
bidding_info.append(-1)
return bidding_info
if any(keyword in title for keyword in self.keywords) and not "成交" in title:
bidding_info.append({
'公告状态': status,
'公告标题': title,
'业务类型': business_type,
'发布日期': publish_date,
'链接': link
})
return bidding_info
except Exception as e:
self.log(f" ❌ 爬取过程中发生错误: {str(e)}")
return -2
def crawl_hrsz(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取华润守正最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
matched_articles = []
page = 1
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[华润守正] 正在爬取第 {page:2d} 页...")
results = self.get_hrsz_results(page, days)
if results == -3:
self.log(" ❌ 无法解析数据格式")
break
elif results == -2:
self.log(" ❌ 网页找不到,请检查网页是否能进行访问")
break
elif results == -1:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
elif results:
size = len(results)
if size and results[size-1] == -1:
if size > 1:
try:
self.log(f" ✅ 第 {page:2d} 页共 {size-1:2d} 条招标信息")
for i in range(size-1):
matched_articles.append(results[i])
except IndexError as e:
self.log(f" ❌ 发生 IndexError: {e}")
finally:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
else:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
if not results:
page += 1
continue
self.log(f" ✅ 第 {page:2d} 页共 {len(results):2d} 条招标信息")
for i, bid in enumerate(results, 1):
self.log(f"\n 【公告 {i:02d}")
self.log(f" 🔸 标题: {bid['title']}")
self.log(f" 🔹 发布时间: {bid['publishDate']}")
self.log(f" 🔸 截止时间: {bid['deadline']}")
self.log(f" 🔹 详情链接: {bid['url']}")
self.log(f" {'-'*40}")
matched_articles.append(bid)
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {info['title']}")
self.log(f" 🔹 发布时间: {info['publishDate']}")
self.log(f" 🔸 截止时间: {info['deadline']}")
self.log(f" 🔹 详情链接: {info['url']}")
self.log(f"{'-'*50}")
return matched_articles
def get_hrsz_results(self, page, days):
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
Base_url = "https://www.szecp.com.cn/"
url = Base_url + "rcms-external-rest/content/getSZExtData?channelIds=26909&pageNo=" + str(page) + "&pageSize=10"
time.sleep(0.5)
try:
response = requests.get(
url,
impersonate="chrome110",
headers={
"Referer": url,
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
}
)
response.encoding = 'utf-8'
data_str = response.text.strip()
if not data_str:
self.log(" ❌ 返回数据为空!")
return -2
# 尝试解析 JSON
if data_str.startswith(('{', '[')):
try:
data = json.loads(data_str)
except json.JSONDecodeError as e:
self.log(f" ❌ JSON 解析失败: {e}")
return -3
# 尝试解析 XML
elif data_str.startswith('<'):
try:
data = xmltodict.parse(data_str)
if 'Result' in data:
data = data['Result']
except Exception as e:
self.log(f" ❌ XML 解析失败: {e}")
return -3
else:
self.log(f" ❌ 未知数据格式: {data_str[:100]}")
return -3
tender_list = data['data']['data']
if isinstance(tender_list, dict) and 'data' in data:
tender_list = tender_list['data']
extracted_info = []
for tender in tender_list:
if tender['publishDate'] < start_time.strftime('%Y-%m-%d'):
extracted_info.append(-1)
return extracted_info
if any(keyword in tender['title'] for keyword in self.keywords) and not "成交" in tender['title']:
url = tender.get('url', '')
cleaned_url = url.lstrip("./")
full_url = Base_url + cleaned_url
info = {
'number': tender['number'],
'purchaseRegion': tender['purchaseRegion']['label'],
'businessUnit': tender['businessUnit']['label'],
'deadline': tender['deadline'],
'purchaseOrg': tender['purchaseOrg']['label'],
'purchaseType': tender['purchaseType'],
'title': tender['title'],
'url': full_url,
'publishDate': tender['publishDate']
}
extracted_info.append(info)
return extracted_info
except Exception as e:
self.log(f" ❌ 爬取过程中发生错误: {str(e)}")
return -2
def crawl_beijing(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取北京京能最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
matched_articles = []
page = 1
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[第 {page:2d} 页] 正在爬取...")
results = self.get_beijing_results(page, days)
if results == -2:
self.log(" ❌ 网页找不到,请检查网页是否能进行访问")
break
elif results:
size = len(results)
if size and results[size-1] == -1:
if size > 1:
self.log(f" ✅ 第 {page:2d} 页共 {size-1:2d} 条招标信息")
matched_articles.extend(results[:-1])
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
if not results:
page += 1
continue
self.log(f" ✅ 第 {page:2d} 页共 {len(results):2d} 条招标信息")
for i, bid in enumerate(results, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {bid['title']}")
self.log(f" 🔹 发布时间: {bid['date'].strftime('%Y-%m-%d') if bid['date'] else '无日期信息'}")
self.log(f" 🔸 详情链接: {bid['link']}")
self.log(f"{'-'*50}")
matched_articles.append(bid)
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {info['title']}")
self.log(f" 🔹 发布时间: {info['date'].strftime('%Y-%m-%d') if info['date'] else '无日期信息'}")
self.log(f" 🔸 详情链接: {info['link']}")
self.log(f"{'-'*50}")
return matched_articles
def get_beijing_results(self, page, days):
BASE_URL = "https://www.powerbeijing-ec.com/"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
search_url = BASE_URL + "/jncms/search/bulletin.html?categoryId=2&tabName=招标公告&goSearch=&page=" + str(page)
response = requests.get(search_url, headers=headers)
response.encoding = 'utf-8'
html_content = response.text
soup = BeautifulSoup(html_content, 'html.parser')
tenders = []
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
tender_list = soup.find('ul', class_='newslist')
if not tender_list:
return -2
for li in tender_list.find_all('li'):
a_tag = li.find('a')
title = a_tag['title']
link = a_tag['href']
date = a_tag.find('div', class_='newsDate').div.text
try:
date = datetime.datetime.strptime(date, '%Y-%m-%d')
except ValueError:
date = None
self.log(" ⚠️ 日期转换失败")
if date < start_time:
tenders.append(-1)
return tenders
if date >= start_time and any(keyword in title for keyword in self.keywords) and not "成交" in title:
tenders.append({
'title': title,
'link': link,
'date': date
})
return tenders
def crawl_chinaedb(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取中国节能网最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
matched_articles = []
page = 1
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[第 {page:2d} 页] 正在爬取...")
results = self.get_chinaedb_results(page, days)
if results == -2:
self.log(" ❌ 网页找不到,请检查网页是否能进行访问")
break
elif results:
size = len(results)
if size and results[size-1] == -1:
if size > 1:
self.log(f" ✅ 第 {page:2d} 页共 {size-1:2d} 条招标信息")
matched_articles.extend(results[:-1])
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
if not results:
page += 1
continue
for i, bid in enumerate(results, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {bid['title']}")
self.log(f" 🔹 发布时间: {bid['publish_date'].strftime('%Y-%m-%d') if bid['publish_date'] else '无日期信息'}")
self.log(f" 🔸 采购类别: {bid['category']}")
self.log(f" 🔹 招标单位: {bid['bid_unit']}")
self.log(f" 🔸 报名截止: {bid['deadline'].strftime('%Y-%m-%d') if bid['deadline'] else '无截止时间'}")
self.log(f" 🔹 详情链接: {bid['link']}")
self.log(f"{'-'*50}")
matched_articles.append(bid)
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {info['title']}")
self.log(f" 🔹 发布时间: {info['publish_date'].strftime('%Y-%m-%d') if info['publish_date'] else '无日期信息'}")
self.log(f" 🔸 采购类别: {info['category']}")
self.log(f" 🔹 招标单位: {info['bid_unit']}")
self.log(f" 🔸 报名截止: {info['deadline'].strftime('%Y-%m-%d') if info['deadline'] else '无截止时间'}")
self.log(f" 🔹 详情链接: {info['link']}")
self.log(f"{'-'*50}")
return matched_articles
def get_chinaedb_results(self, page, days):
BASE_URL = "https://www.ebidding.cecep.cn"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
if page == 1:
search_url = f"{BASE_URL}/jyxx/001006/001006001/bidinfo.html"
else:
search_url = f"{BASE_URL}/jyxx/001006/001006001/{page}.html"
response = requests.get(search_url, headers=headers)
response.encoding = 'utf-8'
html_content = response.text
soup = BeautifulSoup(html_content, 'html.parser')
bid_list = []
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
go_items = soup.find('ul', class_='go-items')
if not go_items:
return -2
for li in go_items.find_all('li'):
a_tag = li.find('a', class_='go-box')
if not a_tag:
continue
title = a_tag.find('span', class_='go-txt').get_text(strip=True)
pub_date = a_tag.find('span', class_='go-time').get_text(strip=True)
try:
pub_date = datetime.datetime.strptime(pub_date, '%Y-%m-%d')
except ValueError:
pub_date = None
self.log(" ⚠️ 日期转换失败")
if pub_date < start_time:
bid_list.append(-1)
return bid_list
link = BASE_URL + a_tag['href']
go_para = a_tag.find('div', class_='go-para')
category = go_para.find('div', class_='go-sub').get_text(strip=True).replace('采购类别:', '')
bid_unit = go_para.find('div', class_='go-sub2').get_text(strip=True).replace('招标单位:', '')
deadline = go_para.find('div', class_='go-sub3').get_text(strip=True).replace('报名截止:', '')
try:
deadline = datetime.datetime.strptime(deadline, '%Y-%m-%d')
except ValueError:
deadline = None
self.log(" ⚠️ 截止日期转换失败")
if pub_date >= start_time and any(keyword in title for keyword in self.keywords) and not "成交" in title:
bid_info = {
'title': title,
'publish_date': pub_date,
'category': category,
'bid_unit': bid_unit,
'deadline': deadline,
'link': link
}
bid_list.append(bid_info)
return bid_list
def crawl_chnenergy(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取国能e购最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
matched_articles = []
page = 1
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[第 {page:2d} 页] 正在爬取...")
results = self.get_chnenergy_results(page, days)
if results == -2:
self.log(" ❌ 网页找不到,请检查网页是否能进行访问")
break
elif results == -1:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
elif results:
size = len(results)
if size and results[size-1] == -1:
if size > 1:
self.log(f" ✅ 第 {page:2d} 页共 {size-1:2d} 条招标信息")
matched_articles.extend(results[:-1])
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
if not results:
page += 1
continue
for i, result in enumerate(results, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 编号: {result['code']}")
self.log(f" 🔹 标题: {result['title']}")
self.log(f" 🔸 发布时间: {result['time'].strftime('%Y-%m-%d') if result['time'] else '无日期信息'}")
self.log(f" 🔹 详情链接: {result['link']}")
self.log(f"{'-'*50}")
matched_articles.append(result)
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 编号: {info['code']}")
self.log(f" 🔹 标题: {info['title']}")
self.log(f" 🔸 发布时间: {info['time'].strftime('%Y-%m-%d') if info['time'] else '无日期信息'}")
self.log(f" 🔹 详情链接: {info['link']}")
self.log(f"{'-'*50}")
return matched_articles
def get_chnenergy_results(self, page, days):
BASE_URL = "https://www.chnenergybidding.com.cn/"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
starttime = datetime.datetime.now() - datetime.timedelta(days=days)
search_url = f"{BASE_URL}/bidweb/001/001002/{page}.html"
try:
response = requests.get(search_url, headers=headers)
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
items_list = soup.find('ul', class_='right-items')
if not items_list:
return -2
results = []
for item in items_list.find_all('li', class_='right-item clearfix'):
title_link = item.find('a', href=True)
title = title_link.get('title', '').strip()
if not title:
title = title_link.get_text(strip=True)
link = BASE_URL + title_link['href']
code_tag = item.find('span', class_='author')
code = code_tag.get_text(strip=True) if code_tag else ''
time_span = item.find('span', class_='r')
time_str = time_span.get_text(strip=True) if time_span else ''
try:
time = datetime.datetime.strptime(time_str, '%Y-%m-%d')
except ValueError:
time = None
continue
if time < starttime:
results.append(-1)
return results
if time >= starttime and any(keyword in title for keyword in self.keywords) and not "成交" in title:
results.append({
"code": code,
"title": title,
"link": link,
"time": time
})
return results
except Exception as e:
self.log(f" ❌ 获取国能e购结果时出错: {str(e)}")
return -2
def crawl_neet_shop(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取国能e招最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
page = 1
matched_articles = []
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[第 {page:2d} 页] 正在爬取...")
results = self.get_neet_shop_results(page, days)
if results == -2:
self.log(" ❌ 网页找不到,请检查网页是否能进行访问")
break
elif results == -1:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
elif results:
size = len(results)
if size and results[size-1] == -1:
if size > 1:
self.log(f" ✅ 第 {page:2d} 页共 {size-1:2d} 条招标信息")
matched_articles.extend(results[:-1])
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
if not results:
page += 1
continue
for i, result in enumerate(results, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {result[1]}")
self.log(f" 🔹 发布时间: {result[2].strftime('%Y-%m-%d %H:%M:%S')}")
self.log(f" 🔸 截止时间: {result[3].strftime('%Y-%m-%d %H:%M:%S')}")
self.log(f" 🔹 详情链接: {result[0]}")
self.log(f"{'-'*50}")
matched_articles.append(result)
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {info[1]}")
self.log(f" 🔹 发布时间: {info[2].strftime('%Y-%m-%d %H:%M:%S')}")
self.log(f" 🔸 截止时间: {info[3].strftime('%Y-%m-%d %H:%M:%S')}")
self.log(f" 🔹 详情链接: {info[0]}")
self.log(f"{'-'*50}")
return matched_articles
def get_neet_shop_results(self, page_no, days):
try:
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
url = (
"https://www.neep.shop/rest/service/routing/nouser/inquiry/quote/searchCmsArticleList"
"?callback=jQuery191018342137772079192_1747887937321"
"&order=asc&deadline=&inquireName=&publishArea=&inquireCode=&noticeType=1&pageNo="
+ str(page_no)
)
response = requests.get(
url,
headers={
"Referer": url,
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"',
},
)
response.encoding = 'utf-8'
if response.status_code != 200:
self.log(f" ❌ 请求失败,状态码: {response.status_code}")
return -2
data_str = response.text
json_start = data_str.find('{')
json_end = data_str.rfind('}') + 1
json_str = data_str[json_start:json_end]
if not json_str:
self.log(" ❌ 未找到有效的 JSON 数据")
return -2
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
self.log(f" ❌ JSON 解析失败: {e}")
return -2
results = []
rows = data.get('data', {}).get('rows', [])
for row in rows:
publish_date = datetime.datetime.strptime(row['publishTimeString'], '%Y-%m-%d %H:%M:%S')
if start_time > publish_date:
results.append(-1)
return results
if (
any(keyword in row['inquireName'] for keyword in self.keywords)
and start_time <= publish_date
and "成交" not in row['inquireName']
):
link = row['articleUrl']
title = row['inquireName']
deadline_date = datetime.datetime.strptime(row['quotDeadlineString'], '%Y-%m-%d %H:%M:%S')
results.append((link, title, publish_date, deadline_date))
return results
except Exception as e:
self.log(f" ❌ 发生错误: {e}")
return -2
def get_sanxiacaigou_results(self, page, days):
url1 = "https://eps.ctg.com.cn/cms/channel/2ywgg0qb/index.htm?pageNo=" + str(page)
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
BaseURL = "https://eps.ctg.com.cn"
try:
response = requests.get(
url1,
impersonate="chrome110",
headers={
"Referer": url1,
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
}
)
soup = BeautifulSoup(response.text, 'html.parser')
# 定位招标信息列表区域
info_list = soup.find('div', class_='infolist-main bidlist bidlist2')
if not info_list:
return -2
# 查找所有招标信息项
items = info_list.find_all('li', attrs={'name': 'li_name'})
if not items:
return -2
bidding_info = []
for item in items:
a_tag = item.find('a')
if not a_tag:
continue
# 提取标题
title = a_tag.get('title', '').strip()
if not title:
# 如果title属性为空尝试从span中提取文本
span = a_tag.find('span')
if span:
# 获取span下所有文本去除图标和采购方式em标签的内容
for element in span.find_all(['i', 'em']):
element.decompose()
title = span.get_text().strip()
# 提取链接
href = a_tag.get('href', '')
full_url = BaseURL + href if href and not href.startswith('http') else href
# 提取日期 - 精确查找a标签下最后一个em标签
date_em = None
all_em_tags = a_tag.find_all('em')
if all_em_tags:
# 采购方式em有特定样式日期em没有样式或样式不同
for em in all_em_tags:
if not em.get('style') or 'width:6.5em' not in em.get('style', ''):
date_em = em
break
# 如果没有找到符合条件的em取最后一个
if not date_em and all_em_tags:
date_em = all_em_tags[-1]
publish_date_str = date_em.get_text().strip() if date_em else ''
try:
publish_date = datetime.datetime.strptime(publish_date_str, "%Y-%m-%d") if publish_date_str else current_time
except:
publish_date = current_time
# 日期检查
if publish_date < start_time:
bidding_info.append(-1)
return bidding_info
# 关键词过滤
if any(keyword in title for keyword in self.keywords) and not any(exclude in title for exclude in ["成交", "结果公告", "中标"]):
bidding_info.append({
'title': title,
'url': full_url,
'date': publish_date_str if publish_date_str else publish_date.strftime("%Y-%m-%d"),
'source': '三峡采购',
'page': page
})
return bidding_info
except Exception as e:
self.log(f" ❌ 爬取过程中发生错误: {str(e)}")
return -2
def crawl_sanxiacaigou(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取三峡采购最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
matched_articles = []
page = 1
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[三峡采购] 正在爬取第 {page:2d} 页...")
results = self.get_sanxiacaigou_results(page, days)
if results == -3:
self.log(" ❌ 无法解析数据格式")
break
elif results == -2:
self.log(" ❌ 网页找不到或没有数据,请检查网页是否能进行访问")
break
elif results == -1:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
elif results:
# 检查是否到达时间限制
if results[-1] == -1:
results = results[:-1] # 移除-1标记
if results:
matched_articles.extend(results)
self.log(f" ✅ 第 {page:2d} 页共 {len(results):2d} 条招标信息")
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
break
self.log(f" ✅ 第 {page:2d} 页共 {len(results):2d} 条招标信息")
matched_articles.extend(results)
page += 1
# 结果输出
self.log(f"\n{'='*50}")
self.log(f"共找到 {len(matched_articles)} 条匹配的招标信息:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"{i:3d}. [第{info['page']}页] {info['date']} - {info['title']}")
self.log(f" {info['url']}")
return matched_articles
def crawl_ctg(self, days):
self.log(f"\n{'='*50}")
self.log(f"开始爬取CTG最近 {days} 天的招标信息...")
self.log(f"{'='*50}\n")
session = self.init_ctg_session()
if not session:
self.log(" ❌ 会话初始化失败")
return []
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(days=days)
matched_articles = []
page = 1
while True:
if self.crawling_stopped:
return matched_articles
self.log(f"[第 {page:2d} 页] 正在爬取...")
results = self.get_ctg_search_results(session, page)
if not results:
page += 1
continue
self.log(f" ✅ 第 {page:2d} 页找到 {len(results)} 条结果")
for i, (link, title, date) in enumerate(results, 1):
if date is None:
continue
if start_time <= date <= current_time and "成交" not in title:
time_str = date.strftime('%Y-%m-%d %H:%M:%S')
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {title}")
self.log(f" 🔹 详情链接: {link}")
self.log(f" 🔸 发布时间: {time_str}")
self.log(f"{'-'*50}")
matched_articles.append((link, title, date))
elif date < start_time:
self.log(" ⏹️ 发布时间小于开始时间,爬取结束")
return matched_articles
page += 1
self.log(f"\n{'='*50}")
self.log("所有匹配结果:")
self.log(f"{'='*50}")
for i, info in enumerate(matched_articles, 1):
self.log(f"\n【公告 {i:03d}")
self.log(f" 🔸 标题: {info[1]}")
self.log(f" 🔹 详情链接: {info[0]}")
self.log(f" 🔸 发布时间: {info[2].strftime('%Y-%m-%d %H:%M:%S')}")
self.log(f"{'-'*50}")
return matched_articles
def init_ctg_session(self):
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
})
return session
def get_ctg_search_results(self, session, page=1):
base_url = "https://eps.ctg.com.cn"
url = f"{base_url}/cms/channel/2ywgg0qb/index.htm?pageNo={page}"
response = requests.get(
url,
impersonate="chrome110",
headers={
"Referer": url,
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
}
)
results = self.parse_ctg_html(response.text)
return results
def parse_ctg_html(self, html):
soup = BeautifulSoup(html, 'html.parser')
results = []
base_url = "https://eps.ctg.com.cn"
for a in soup.find_all('a', href=True):
link_text = a.text.strip()
em_tag = re.search(r'\d{4}-\d{2}-\d{2}', link_text)
if em_tag:
try:
date = datetime.datetime.strptime(em_tag.group(), '%Y-%m-%d')
except ValueError:
date = None
else:
date = None
if any(keyword in link_text for keyword in self.keywords):
link = urllib.parse.urljoin(base_url, a['href'])
title = a.get('title', a.text.strip())
results.append((link, title, date))
return results
def save_all_articles_to_excel(self, all_articles, days):
"""将所有网站的文章保存到一个Excel文件的不同工作表中"""
# 创建Excel工作簿
wb = openpyxl.Workbook()
# 删除默认创建的工作表
if 'Sheet' in wb.sheetnames:
wb.remove(wb['Sheet'])
# 为每个网站创建一个工作表并添加数据
for website, articles in all_articles.items():
if website == "国能e招":
source = "neet"
elif website == "三峡招标":
source = "ctg"
elif website == "三峡采购":
source = "ctgc"
elif website == "国能e购":
source = "chnenergy"
elif website == "中国节能":
source = "chinaedb"
elif website == "北京京能":
source = "beijing"
elif website == "华润守正":
source = "hrsz"
elif website == "华电电子":
source = "zghn"
elif website =="科环集团":
source="kh"
else:
continue
# 创建工作表
ws = wb.create_sheet(title=website[:31]) # Excel工作表名称最多31个字符
# 添加标记列作为第一列
if source == "neet":
headers = ["标记", "序号", "标题链接", "发布时间", "截止时间"]
ws.append(headers)
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{link}", "{title}")',
publish_date,
deadline_date
])
# 调整列宽
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 链接列
ws.column_dimensions['D'].width = 20 # 公开时间列
ws.column_dimensions['E'].width = 20 # 截止时间列
elif source == "ctgc": # 新增三峡采购的处理逻辑
headers = ["标记", "序号", "标题链接", "发布时间"]
ws.append(headers)
for idx, article in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{article["url"]}", "{article["title"]}")',
article["date"]
])
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 链接列
ws.column_dimensions['D'].width = 20 # 时间列
elif source == "chnenergy":
headers = ["标记", "序号", "公告编号", "标题链接", "发布时间"]
headers = ["标记", "序号", "标题链接", "发布时间"]
ws.append(headers)
for idx, (link, title, publish_time) in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{link}", "{title}")',
publish_time.strftime('%Y-%m-%d %H:%M:%S')
])
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 链接列
ws.column_dimensions['D'].width = 20 # 时间列
elif source == "chnenergy":
headers = ["标记", "序号", "公告编号", "标题链接", "发布时间"]
ws.append(headers)
for idx, article in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
article.get("code", ""),
f'=HYPERLINK("{article["link"]}", "{article["title"]}")',
article["time"].strftime('%Y-%m-%d')
])
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 20 # 公告编号列
ws.column_dimensions['D'].width = 100 # 链接列
ws.column_dimensions['E'].width = 20 # 时间列
elif source == "chinaedb":
headers = ["标记", "序号", "标题链接", "发布时间", "截止时间","采购类别","招标单位"]
ws.append(headers)
for idx, article in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{article["link"]}", "{article["title"]}")',
article["publish_date"].strftime('%Y-%m-%d'),
article["deadline"].strftime('%Y-%m-%d'),
article["category"],
article["bid_unit"]
])
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 链接列
ws.column_dimensions['D'].width = 20 # 公开时间列
ws.column_dimensions['E'].width = 20 # 截止时间列
ws.column_dimensions['F'].width = 10 # 采购类别列
ws.column_dimensions['G'].width = 10 # 招标单位列
elif source == "beijing":
headers = ["标记", "序号", "标题链接", "发布时间"]
ws.append(headers)
for idx, article in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{article["link"]}", "{article["title"]}")',
article["date"].strftime('%Y-%m-%d')
])
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 链接列
ws.column_dimensions['D'].width = 20 # 时间列
elif source == "hrsz":
headers = ["标记", "序号", "标题链接", "发布时间", "截止时间"]
ws.append(headers)
for idx, article in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{article["url"]}", "{article["title"]}")',
datetime.datetime.strptime(article["publishDate"], "%Y-%m-%d %H:%M:%S"),
datetime.datetime.strptime(article["deadline"], "%Y-%m-%d %H:%M:%S")
])
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 链接列
ws.column_dimensions['D'].width = 20 # 发布时间列
ws.column_dimensions['E'].width = 20 # 截止时间列
elif source == "zghn":
headers = ["标记", "序号", "标题链接", "发布时间", "公告状态","业务类型"]
ws.append(headers)
for idx, article in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{article['链接']}", "{article['公告标题']}")',
article['发布日期'].strftime('%Y-%m-%d'),
article['公告状态'],
article['业务类型']
])
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 链接列
ws.column_dimensions['D'].width = 20 # 发布时间列
ws.column_dimensions['E'].width = 10 # 公告状态列
ws.column_dimensions['F'].width = 10 # 公告类型列
elif source == "kh":
headers = ["标记", "序号", "标题链接", "公告类型", "发布日期", "投标时间", "来源类别"]
ws.append(headers)
for idx, article in enumerate(articles, 1):
ws.append([
"", # 标记列初始为空
idx,
f'=HYPERLINK("{article["详情链接"]}", "{article["标题"]}")',
article["公告类型"],
article["发布日期"].strftime('%Y-%m-%d') if article["发布日期"] else "无日期信息",
article["投标开始时间"] if article["投标开始时间"] else "无投标时间信息",
article["来源类别"]
])
# 设置列宽
ws.column_dimensions['A'].width = 8 # 标记列
ws.column_dimensions['B'].width = 8 # 序号列
ws.column_dimensions['C'].width = 100 # 标题链接列
ws.column_dimensions['D'].width = 15 # 公告类型列
ws.column_dimensions['E'].width = 15 # 发布日期列
ws.column_dimensions['F'].width = 20 # 投标时间列
ws.column_dimensions['G'].width = 15 # 来源类别列
# 为标记列添加数据验证(下拉列表)
mark_dv = DataValidation(type="list", formula1='"✔,"', allow_blank=True)
mark_dv.error = '请从下拉列表中选择'
mark_dv.errorTitle = '无效输入'
mark_dv.prompt = '选择✔标记此行'
mark_dv.promptTitle = '标记选择'
ws.add_data_validation(mark_dv)
# 将数据验证应用到标记列的所有数据行
last_row = len(articles) + 1 # +1 for header row
mark_dv.add(f'A2:A{last_row}')
# 添加条件格式,使标记的行变为黄色
yellow_fill = PatternFill(start_color='FFFF00', end_color='FFFF00', fill_type='solid')
# 修改公式为当前行判断
rule = FormulaRule(formula=['$A2<>""'], stopIfTrue=True, fill=yellow_fill)
last_row = len(articles) + 1
last_col = len(headers)
# 应用范围从A2开始
ws.conditional_formatting.add(
f'A2:{get_column_letter(last_col)}{last_row}',
rule
)
# 保存文件
filename = f"招标信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
wb.save(filename)
self.log(f"所有网站信息已保存到Excel文件: {filename}")
return filename
def save_all_articles_for_wechat(self, all_articles, days):
"""将所有网站的文章保存为微信可识别的超链接格式"""
output_lines = []
for website, articles in all_articles.items():
if website == "国能e招":
source = "neet"
elif website == "三峡招标":
source = "ctg"
elif website == "国能e购":
source = "chnenergy"
elif website == "中国节能":
source = "chinaedb"
elif website == "北京京能":
source = "beijing"
elif website == "华润守正":
source = "hrsz"
elif website == "华电电子":
source = "zghn"
elif website =="科环集团":
source="kh"
elif website == "三峡采购":
source = "ctgc"
else:
continue
# Add website header
output_lines.append(f"{website}")
output_lines.append("")
# Process articles based on source
if source == "neet":
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
output_lines.append(f"{idx}. {title}")
output_lines.append(f" {link}")
elif source == "ctg":
for idx, (link, title, publish_time) in enumerate(articles, 1):
output_lines.append(f"{idx}. {title}")
output_lines.append(f" {link}")
elif source == "ctgc":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. {article['title']}")
output_lines.append(f" {article['url']}")
elif source == "chnenergy":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. {article['title']}")
output_lines.append(f" {article['link']}")
elif source == "chinaedb":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. {article['title']}")
output_lines.append(f" {article['link']}")
elif source == "beijing":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. {article['title']}")
output_lines.append(f" {article['link']}")
elif source == "hrsz":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. {article['title']}")
output_lines.append(f" {article['url']}")
elif source == "zghn":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. {article['公告标题']}")
output_lines.append(f" {article['链接']}")
# Add empty line between sections
output_lines.append("")
# Save to file
filename = f"爬取信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
self.log(f"所有网站信息已保存为微信兼容格式文件: {filename}")
return filename
def save_all_articles_to_markdown(self, all_articles, days):
"""将所有网站的文章保存到Markdown格式"""
output_lines = []
for website, articles in all_articles.items():
if website == "国能e招":
source = "neet"
elif website == "三峡招标":
source = "ctg"
elif website == "国能e购":
source = "chnenergy"
elif website == "中国节能":
source = "chinaedb"
elif website == "北京京能":
source = "beijing"
elif website == "华润守正":
source = "hrsz"
elif website == "华电电子":
source = "zghn"
elif website =="科环集团":
source="kh"
elif website == "三峡采购":
source = "ctgc"
else:
continue
# Add website header
output_lines.append(f"### {website} ")
output_lines.append("")
# Process articles based on source
if source == "neet":
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
output_lines.append(f"{idx}. [{title}]({link}) ")
elif source == "ctgc":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['url']}) ")
elif source == "ctg":
for idx, (link, title, publish_time) in enumerate(articles, 1):
output_lines.append(f"{idx}. [{title}]({link}) ")
elif source == "chnenergy":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['link']}) ")
elif source == "chinaedb":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['link']}) ")
elif source == "beijing":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['link']}) ")
elif source == "hrsz":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['url']}) ")
elif source == "zghn":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['公告标题']}]({article['链接']}) ")
elif source=="kh":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['标题']}]({article['详情链接']}) ")
# Add empty line between sections
output_lines.append("")
# Save to file
filename = f"爬取信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
self.log(f"所有网站信息已保存到Markdown文件: {filename}")
return filename
def save_all_articles_to_text(self, all_articles, days):
"""将所有网站的文章保存到文本格式"""
output_lines = []
for website, articles in all_articles.items():
if website == "国能e招":
source = "neet"
elif website == "三峡招标":
source = "ctg"
elif website == "国能e购":
source = "chnenergy"
elif website == "中国节能":
source = "chinaedb"
elif website == "北京京能":
source = "beijing"
elif website == "华润守正":
source = "hrsz"
elif website == "华电电子":
source = "zghn"
elif website =="科环集团":
source="kh"
elif website == "三峡采购":
source = "ctgc"
else:
continue
# Add website header
output_lines.append(f"### {website}")
# Process articles based on source
if source == "neet":
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
output_lines.append(f"{idx}. [{title}]({link})")
elif source == "ctg":
for idx, (link, title, publish_time) in enumerate(articles, 1):
output_lines.append(f"{idx}. [{title}]({link})")
elif source == "chnenergy":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['link']})")
elif source == "chinaedb":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['link']})")
elif source == "beijing":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['link']})")
elif source == "hrsz":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['url']})")
elif source =="ctgc":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['title']}]({article['url']})")
elif source == "zghn":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['公告标题']}]({article['链接']})")
elif source=="kh":
for idx, article in enumerate(articles, 1):
output_lines.append(f"{idx}. [{article['标题']}]({article['详情链接']})")
# Add empty line between sections
output_lines.append("")
# Save to file
filename = f"爬取信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
self.log(f"所有网站信息已保存到文本文件: {filename}")
return filename
def save_all_articles_as_html(self, all_articles, days):
"""将所有网站的文章保存为HTML格式优化微信显示效果"""
# HTML头部
html_content = """<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>爬取信息汇总_{date}</title>
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
line-height: 1.6;
color: #333;
padding: 15px;
max-width: 100%;
word-break: break-word;
}}
h2 {{
color: #1a73e8;
font-size: 18px;
padding-bottom: 8px;
border-bottom: 1px solid #eee;
margin-top: 20px;
margin-bottom: 15px;
}}
.item {{
margin-bottom: 12px;
padding-left: 10px;
border-left: 3px solid transparent;
}}
.item:hover {{
border-left-color: #1a73e8;
}}
.item-index {{
color: #666;
margin-right: 5px;
}}
a {{
color: #1a73e8;
text-decoration: none;
}}
a:hover {{
text-decoration: underline;
}}
.meta {{
font-size: 13px;
color: #666;
margin-top: 3px;
}}
.time {{
display: inline-block;
margin-right: 10px;
}}
.deadline {{
display: inline-block;
color: #d32f2f;
}}
</style>
</head>
<body>
<h1>招标信息汇总</h1>
<p>更新时间:{date} {time}</p>
""".format(
date=datetime.datetime.now().strftime('%Y-%m-%d'),
time=datetime.datetime.now().strftime('%H:%M:%S')
)
# 按网站分类添加内容
for website, articles in all_articles.items():
# 确定来源类型
if website == "国能e招":
source = "neet"
elif website == "三峡招标":
source = "ctg"
elif website == "国能e购":
source = "chnenergy"
elif website == "中国节能":
source = "chinaedb"
elif website == "北京京能":
source = "beijing"
elif website == "华润守正":
source = "hrsz"
elif website == "华电电子":
source = "zghn"
elif website =="科环集团":
source="kh"
elif website == "三峡采购":
source = "ctgc"
elif website == "三峡采购":
source = "ctgc"
else:
continue
# 添加网站标题
html_content += f'<h2>{website}</h2>\n<div class="items">\n'
# 根据来源类型处理文章
if source == "neet":
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{link}" target="_blank">{title}</a>
<div class="meta">
<span class="time">发布时间: {publish_date}</span>
<span class="deadline">截止时间: {deadline_date}</span>
</div>
</div>
"""
elif source == "ctgc": # 新增三峡采购的处理逻辑
for idx, article in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{article['url']}" target="_blank">{article['title']}</a>
<div class="meta">
<span class="time">发布时间: {article['date']}</span>
</div>
</div>
"""
elif source == "ctg":
for idx, (link, title, publish_time) in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{link}" target="_blank">{title}</a>
<div class="meta">
<span class="time">发布时间: {publish_time.strftime('%Y-%m-%d %H:%M:%S')}</span>
</div>
</div>
"""
elif source == "chnenergy":
for idx, article in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{article['link']}" target="_blank">{article['title']}</a>
<div class="meta">
<span class="time">发布时间: {article['time'].strftime('%Y-%m-%d')}</span>
<span>公告编号: {article.get('code', '')}</span>
</div>
</div>
"""
elif source == "chinaedb":
for idx, article in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{article['link']}" target="_blank">{article['title']}</a>
<div class="meta">
<span class="time">发布时间: {article['publish_date'].strftime('%Y-%m-%d')}</span>
<span class="deadline">截止时间: {article['deadline'].strftime('%Y-%m-%d')}</span>
<div>采购类别: {article['category']} | 招标单位: {article['bid_unit']}</div>
</div>
</div>
"""
elif source == "beijing":
for idx, article in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{article['link']}" target="_blank">{article['title']}</a>
<div class="meta">
<span class="time">发布时间: {article['date'].strftime('%Y-%m-%d')}</span>
</div>
</div>
"""
elif source == "hrsz":
for idx, article in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{article['url']}" target="_blank">{article['title']}</a>
<div class="meta">
<span class="time">发布时间: {article['publishDate']}</span>
<span class="deadline">截止时间: {article['deadline']}</span>
</div>
</div>
"""
elif source == "zghn":
for idx, article in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{article['链接']}" target="_blank">{article['公告标题']}</a>
<div class="meta">
<span class="time">发布时间: {article['发布日期'].strftime('%Y-%m-%d')}</span>
<span>状态: {article['公告状态']} | 类型: {article['业务类型']}</span>
</div>
</div>
"""
elif source == "kh":
for idx, article in enumerate(articles, 1):
html_content += f"""
<div class="item">
<span class="item-index">{idx}.</span>
<a href="{article['详情链接']}" target="_blank">{article['标题']}</a>
<div class="meta">
<span class="time">发布时间: {article['发布日期'].strftime('%Y-%m-%d') if article['发布日期'] else '无日期信息'}</span>
<span class="deadline">投标时间: {article['投标开始时间'] if article['投标开始时间'] else '无投标时间信息'}</span>
<div>公告类型: {article['公告类型']} | 来源类别: {article['来源类别']}</div>
</div>
</div>
"""
html_content += "</div>\n" # 关闭items div
# HTML尾部
html_content += """
<footer style="margin-top: 30px; padding-top: 15px; border-top: 1px solid #eee; color: #666; font-size: 13px;">
<p>本文件由系统自动生成,链接可直接点击访问</p>
<p>更新时间: {date} {time}</p>
</footer>
</body>
</html>
""".format(
date=datetime.datetime.now().strftime('%Y-%m-%d'),
time=datetime.datetime.now().strftime('%H:%M:%S')
)
# 保存文件
filename = f"招标信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
with open(filename, 'w', encoding='utf-8') as f:
f.write(html_content)
self.log(f"所有网站信息已保存为HTML文件: {filename}")
return filename
def send_all_email_with_excel(self, excel_filepath, days, article_count, email_config):
"""发送包含所有网站信息的Excel文件"""
msg = MIMEMultipart()
def encode_from_header(nickname, email):
try:
if all(ord(c) < 128 for c in nickname):
return f"{nickname} <{email}>"
else:
from email.header import Header
encoded_nickname = Header(nickname, 'utf-8').encode()
return f"{encoded_nickname} <{email}>"
except:
return email
msg['From'] = encode_from_header(self.sender_username.get(), email_config["sender_email"])
msg['To'] = ", ".join(self.receiver_emails)
subject = f"招标信息汇总报告(最近{days}天)"
msg['Subject'] = Header(subject, 'utf-8')
# 邮件正文
selected_websites = [website for website, var in self.website_vars.items() if var.get()]
body = f"""<html>
<body>
<h2>招标信息汇总报告</h2>
<p>时间范围: 最近{days}天</p>
<p>爬取的网站: {", ".join(selected_websites)}</p>
<p>找到的文章总数: {article_count}篇</p>
<p>请查看附件Excel文件获取详细信息。</p>
</body>
</html>"""
alternative = MIMEMultipart('alternative')
texthtml = MIMEText(body, _subtype='html', _charset='UTF-8')
alternative.attach(texthtml)
msg.attach(alternative)
# 添加Excel附件
xlsxpart = MIMEApplication(open(excel_filepath, 'rb').read())
xlsxpart.add_header('Content-Disposition', 'attachment', filename=Header(os.path.basename(excel_filepath),"utf-8").encode())
msg.attach(xlsxpart)
# 记录发送结果
success_emails = []
failed_emails = []
try:
server = smtplib.SMTP_SSL(email_config['smtp_server'], email_config['smtp_port'])
server.ehlo()
server.login(email_config['sender_email'], email_config['sender_password'])
# 尝试批量发送
try:
server.sendmail(
email_config['sender_email'],
self.receiver_emails,
msg.as_string()
)
success_emails = self.receiver_emails.copy()
self.log(f"邮件成功发送至所有收件人")
except smtplib.SMTPException as e:
# 如果批量发送失败,尝试逐个发送
self.log("批量发送失败,尝试逐个发送...")
for receiver in self.receiver_emails:
try:
temp_msg = msg
temp_msg.replace_header('To', receiver)
server.sendmail(
email_config['sender_email'],
[receiver],
temp_msg.as_string()
)
success_emails.append(receiver)
self.log(f"邮件成功发送至: {receiver}")
except Exception as e:
failed_emails.append((receiver, str(e)))
self.log(f"邮件发送失败至 {receiver}: {str(e)}")
server.close()
# 显示发送结果摘要
result_msg = "邮件发送结果:\n"
result_msg += f"成功发送至 {len(success_emails)} 个邮箱:\n"
for email in success_emails:
result_msg += f"{email}\n"
if failed_emails:
result_msg += f"\n发送失败 {len(failed_emails)} 个邮箱:\n"
for email, error in failed_emails:
result_msg += f"{email} (原因: {error})\n"
self.log(result_msg)
# 弹出窗口显示结果
if failed_emails:
messagebox.showwarning("邮件发送结果",
f"成功发送至 {len(success_emails)} 个邮箱\n"
f"发送失败 {len(failed_emails)} 个邮箱\n"
"请查看日志了解详情")
else:
if not self.is_scheduled_task:
messagebox.showinfo("邮件发送结果",
f"邮件已成功发送至 {len(success_emails)} 个邮箱")
except Exception as e:
error_msg = f"邮件发送失败: {str(e)}"
self.log(error_msg)
messagebox.showerror("错误", error_msg)
raise
def toggle_schedule(self):
"""切换定时任务状态"""
if self.schedule_var.get():
if not self.setup_scheduled_task(): # 如果设置失败
self.schedule_var.set(False) # 回滚Checkbutton状态
else:
self.cancel_scheduled_task()
def setup_scheduled_task(self):
"""设置定时任务"""
if not self.validate_schedule_inputs():
return False
try:
schedule_time = self.get_schedule_time()
hours, minutes = map(int, schedule_time.split(':'))
interval_days = int(self.schedule_interval.get())
# 计算下次执行时间
now = datetime.datetime.now()
next_time = now.replace(hour=hours, minute=minutes, second=0, microsecond=0)
if next_time < now:
next_time += datetime.timedelta(days=1)
self.next_scheduled_time = next_time
self.update_next_run_label()
# 启动定时任务线程
self.scheduled_running = True
if not self.scheduled_thread or not self.scheduled_thread.is_alive():
self.scheduled_thread = threading.Thread(
target=self.schedule_loop,
daemon=True
)
self.scheduled_thread.start()
self.schedule_status_var.set("状态: 运行中")
self.schedule_status_label.config(foreground="green")
self.log(f"定时任务已启用,将在 {next_time.strftime('%Y-%m-%d %H:%M:%S')} 执行")
return True
except Exception as e:
self.log(f"设置定时任务失败: {str(e)}")
messagebox.showerror("错误", f"设置定时任务失败:\n{str(e)}")
return False
def schedule_loop(self):
"""定时任务循环"""
while self.scheduled_running:
now = datetime.datetime.now()
# 检查是否到达执行时间
if self.next_scheduled_time and now >= self.next_scheduled_time:
self.log(f"定时任务触发,开始执行爬取 ({now.strftime('%Y-%m-%d %H:%M:%S')})")
# 在主线程中执行爬取
self.root.after(0, self.execute_scheduled_task)
# 计算下次执行时间
interval_days = int(self.schedule_interval.get())
self.next_scheduled_time += datetime.timedelta(days=interval_days)
self.update_next_run_label()
self.log(f"下次执行时间: {self.next_scheduled_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 每分钟检查一次
self.scheduled_event.wait(60)
def execute_scheduled_task(self):
"""执行定时爬取任务"""
try:
self.is_scheduled_task = True # 设置为定时任务状态
self.crawl_selected_sites()
except Exception as e:
self.log(f"定时任务执行失败: {str(e)}")
finally:
self.is_scheduled_task = False # 恢复为非定时任务状态
def validate_schedule_inputs(self):
"""验证定时任务输入"""
# 验证时间格式
try:
schedule_time = self.schedule_time.get()
hours, minutes = map(int, schedule_time.split(':'))
if not (0 <= hours < 24 and 0 <= minutes < 60):
raise ValueError
except ValueError:
messagebox.showerror("错误", "请输入有效的时间格式 (HH:MM)!")
return False
# 验证间隔天数
try:
interval = int(self.schedule_interval.get())
if interval <= 0:
raise ValueError
except ValueError:
messagebox.showerror("错误", "请输入有效的执行频率 (正整数)!")
return False
return True
def update_next_run_label(self):
"""更新下次执行时间标签"""
if self.next_scheduled_time:
time_str = self.next_scheduled_time.strftime("%Y-%m-%d %H:%M:%S")
self.next_run_label.config(text=f"下次执行时间: {time_str}")
else:
self.next_run_label.config(text="下次执行时间: 未设置")
if __name__ == "__main__":
root = tk.Tk()
# 设置窗口图标
try:
root.iconbitmap('icon.ico') # 如果有图标文件可以取消注释
except:
pass
app = WebCrawlerApp(root)
root.mainloop()