1874 lines
75 KiB
Python
1874 lines
75 KiB
Python
import sys
|
||
from filelock import FileLock
|
||
import xmltodict
|
||
from openpyxl.formatting.rule import FormulaRule
|
||
from openpyxl.styles import PatternFill
|
||
from openpyxl.utils import get_column_letter
|
||
import datetime
|
||
from email.mime.application import MIMEApplication
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.text import MIMEText
|
||
from email.header import Header
|
||
import json
|
||
import smtplib
|
||
import openpyxl
|
||
from curl_cffi import requests
|
||
from bs4 import BeautifulSoup
|
||
import urllib.parse
|
||
from openpyxl.worksheet.datavalidation import DataValidation
|
||
import openpyxl
|
||
import re
|
||
from random import random, uniform
|
||
import os
|
||
import subprocess
|
||
import platform
|
||
import time
|
||
import time
|
||
from typing import Dict, List, Union
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
|
||
|
||
class WebCrawler:
|
||
def __init__(self, config_path: str = "config.json"):
|
||
self.config = self.load_config(config_path)
|
||
self.crawling_stopped = False
|
||
self.website_handlers = {
|
||
"国能e招": self.crawl_neet_shop,
|
||
"三峡招标": self.crawl_ctg,
|
||
"三峡采购": self.crawl_sanxiacaigou,
|
||
"国能e购": self.crawl_chnenergy,
|
||
"中国节能": self.crawl_chinaedb,
|
||
"北京京能": self.crawl_beijing,
|
||
"华润守正": self.crawl_hrsz,
|
||
"华电电子": self.crawl_zghn,
|
||
"科环集团": self.crawl_kh
|
||
}
|
||
self.scheduler = None
|
||
|
||
def load_config(self, config_path: str) -> Dict:
|
||
"""加载配置文件"""
|
||
try:
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
# 设置默认的定时任务配置
|
||
if 'schedule' not in config:
|
||
config['schedule'] = {
|
||
'enabled': False,
|
||
'time': "09:00",
|
||
'interval_days': 1
|
||
}
|
||
if 'output_dir' not in config:
|
||
config['output_dir'] = os.getcwd() # 默认为当前工作目录
|
||
|
||
return config
|
||
except Exception as e:
|
||
print(f"无法加载配置文件: {e}")
|
||
sys.exit(1)
|
||
|
||
def run(self) -> None:
|
||
"""主执行函数"""
|
||
print("开始执行爬取任务...")
|
||
|
||
# 检查是否有定时任务
|
||
if self.config.get('schedule', {}).get('enabled', False):
|
||
self.start_scheduled_task()
|
||
if not self.config['schedule'].get('run_immediately', False):
|
||
return # 启动定时任务后不执行立即爬取
|
||
|
||
# 执行单次爬取
|
||
self.run_crawler()
|
||
|
||
def run_crawler(self) -> None:
|
||
"""执行实际的爬取任务"""
|
||
# 使用文件锁防止重复执行
|
||
lock = FileLock("crawler.lock")
|
||
try:
|
||
with lock.acquire(timeout=10): # 等待10秒获取锁
|
||
print(f"开始执行爬取任务,时间: {datetime.datetime.now()}")
|
||
|
||
# 执行爬取
|
||
results = self.crawl_all_sites(
|
||
self.config['days'],
|
||
self.config['websites'],
|
||
self.config['keywords']
|
||
)
|
||
|
||
if not results:
|
||
print("没有找到匹配的结果")
|
||
return
|
||
|
||
# 保存结果
|
||
output_file = self.save_results(results, self.config['output_format'])
|
||
|
||
# 发送邮件
|
||
if self.config.get('send_email', False):
|
||
self.send_email_with_results(results, output_file)
|
||
|
||
# 打开文件
|
||
if self.config.get('open_file', False):
|
||
self.open_file(output_file)
|
||
|
||
print(f"任务执行完成! 时间: {datetime.datetime.now()}")
|
||
except Exception as e:
|
||
print(f"获取文件锁失败或执行爬取时出错: {e}")
|
||
|
||
def start_scheduled_task(self):
|
||
"""启动定时任务"""
|
||
if self.scheduler and self.scheduler.running:
|
||
print("定时任务已经在运行中")
|
||
return
|
||
|
||
schedule_config = self.config.get('schedule', {})
|
||
scheduled_time = schedule_config.get('time', '09:00')
|
||
interval_days = schedule_config.get('interval_days', 1)
|
||
run_immediately = schedule_config.get('run_immediately', False)
|
||
|
||
print(f"启动定时任务,每天 {scheduled_time} 执行,间隔 {interval_days} 天")
|
||
|
||
self.scheduler = BackgroundScheduler()
|
||
|
||
# 解析时间
|
||
hour, minute = map(int, scheduled_time.split(':'))
|
||
|
||
# 添加定时任务
|
||
self.scheduler.add_job(
|
||
self.run_crawler,
|
||
trigger=CronTrigger(
|
||
hour=hour,
|
||
minute=minute,
|
||
day='*/{}'.format(interval_days) if interval_days > 1 else '*'
|
||
)
|
||
)
|
||
|
||
self.scheduler.start()
|
||
|
||
if run_immediately:
|
||
print("立即执行一次爬取任务...")
|
||
self.run_crawler()
|
||
|
||
try:
|
||
# 保持程序运行
|
||
while True:
|
||
time.sleep(1)
|
||
except (KeyboardInterrupt, SystemExit):
|
||
if self.scheduler:
|
||
self.scheduler.shutdown()
|
||
print("定时任务已停止")
|
||
# Core crawling methods
|
||
def crawl_all_sites(self, days: int, websites: List[str], keywords: List[str]) -> Dict[str, List]:
|
||
"""爬取所有选定的网站(修正版)"""
|
||
all_results = {}
|
||
|
||
for website in websites:
|
||
if website in self.website_handlers:
|
||
print(f"正在爬取 {website}...")
|
||
try:
|
||
# 统一调用接口:days 和 keywords
|
||
results = self.website_handlers[website](days, keywords)
|
||
if results:
|
||
all_results[website] = results
|
||
print(f"{website} 爬取完成,找到 {len(results)} 条结果")
|
||
except Exception as e:
|
||
print(f"爬取 {website} 时出错: {e}")
|
||
else:
|
||
print(f"未知网站: {website}")
|
||
return all_results
|
||
|
||
|
||
|
||
|
||
def crawl_single_site(self, website, days, keywords):
|
||
"""Crawl a single website"""
|
||
source = ""
|
||
if website == "国能e招":
|
||
source = "neet"
|
||
elif website == "三峡招标":
|
||
source = "ctg"
|
||
elif website == "三峡采购":
|
||
source = "ctgc"
|
||
elif website == "国能e购":
|
||
source = "chnenergy"
|
||
elif website == "中国节能":
|
||
source="chinaedb"
|
||
elif website =="北京京能":
|
||
source = "beijing"
|
||
elif website =="华润守正":
|
||
source = "hrsz"
|
||
elif website =="华电电子":
|
||
source = "zghn"
|
||
elif website =="科环集团":
|
||
source = "kh"
|
||
|
||
matched_articles = []
|
||
|
||
if source == "neet":
|
||
matched_articles = self.crawl_neet_shop(days, keywords)
|
||
elif source == "ctg":
|
||
matched_articles = self.crawl_ctg(days, keywords)
|
||
elif source == "ctgc":
|
||
matched_articles = self.crawl_sanxiacaigou(days, keywords)
|
||
elif source == "chnenergy":
|
||
matched_articles = self.crawl_chnenergy(days, keywords)
|
||
elif source == "chinaedb":
|
||
matched_articles = self.crawl_chinaedb(days, keywords)
|
||
elif source == "beijing":
|
||
matched_articles = self.crawl_beijing(days, keywords)
|
||
elif source == "hrsz":
|
||
matched_articles = self.crawl_hrsz(days, keywords)
|
||
elif source == "zghn":
|
||
matched_articles = self.crawl_zghn(days, keywords)
|
||
elif source == "kh":
|
||
matched_articles = self.crawl_kh(days, keywords)
|
||
|
||
return matched_articles
|
||
|
||
# Website-specific crawling methods
|
||
def crawl_kh(self, days: int, keywords: List[str]) -> List[Dict]:
|
||
all_matched = []
|
||
channels_categories = [
|
||
("cggg1", ['fw', 'hw', 'gc']),
|
||
("cggg2", ['fw', 'hw', 'gc']),
|
||
("cggg3", [None])
|
||
]
|
||
|
||
for channel, categories in channels_categories:
|
||
for category in categories:
|
||
page = 1
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return all_matched
|
||
|
||
info = self.get_kh_source(page, days, channel, category, keywords)
|
||
|
||
if not info:
|
||
page += 1
|
||
continue
|
||
elif info == -2:
|
||
break
|
||
elif info == -1:
|
||
break
|
||
|
||
if info[-1] == -1:
|
||
all_matched.extend(info[:-1])
|
||
break
|
||
else:
|
||
all_matched.extend(info)
|
||
page += 1
|
||
|
||
return all_matched
|
||
|
||
def get_kh_source(self, page: int, days: int, channel: str, category: str, keywords: List[str]) -> Union[List[Dict], int]:
|
||
BASE_URL = "https://khjtcgpt.chnenergy.com.cn"
|
||
HEADERS = {
|
||
"Referer": BASE_URL,
|
||
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
|
||
}
|
||
|
||
if channel == "cggg3":
|
||
url = f"{BASE_URL}/cms/channel/{channel}/index.htm?pageNo={page}"
|
||
else:
|
||
url = f"{BASE_URL}/cms/channel/{channel}{category}/index.htm?pageNo={page}"
|
||
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=(days + 1))
|
||
|
||
try:
|
||
response = requests.get(url, impersonate="chrome110", headers=HEADERS)
|
||
if not response.ok:
|
||
return -2
|
||
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
bidding_items = soup.select('ul#list1 li a')
|
||
results = []
|
||
|
||
for item in bidding_items:
|
||
title_span = item.find('span', class_='bidLink')
|
||
title = title_span.get_text(strip=True) if title_span else item.get('title', '').strip()
|
||
|
||
if channel == "cggg2":
|
||
em_tag = title_span.find('em') if title_span else None
|
||
bid_type = em_tag.get_text(strip=True).strip('[]') if em_tag else "未知类型"
|
||
else:
|
||
bid_type = item.find('em').get_text(strip=True).strip('[]') if item.find('em') else "未知类型"
|
||
|
||
publish_date = item.find('span', class_='bidDate').get_text(strip=True) if item.find('span', class_='bidDate') else None
|
||
try:
|
||
publish_date = datetime.datetime.strptime(publish_date, '%Y-%m-%d') if publish_date else None
|
||
except ValueError:
|
||
publish_date = None
|
||
|
||
if channel == "cggg2":
|
||
bid_start = "无投标时间信息"
|
||
else:
|
||
time_info = item.find('span', class_='bidTime')
|
||
if time_info:
|
||
input_tag = time_info.find('input')
|
||
bid_start = input_tag.get('buystart_1', '').split()[0] if input_tag else ''
|
||
else:
|
||
bid_start = ''
|
||
|
||
detail_url = item.get('href', '')
|
||
if detail_url and not detail_url.startswith('http'):
|
||
detail_url = BASE_URL + detail_url
|
||
|
||
if publish_date and start_time > publish_date:
|
||
results.append(-1)
|
||
return results
|
||
|
||
if any(keyword in title for keyword in keywords):
|
||
results.append({
|
||
'标题': title.replace(bid_type, '').strip() if bid_type != "未知类型" else title,
|
||
'公告类型': bid_type,
|
||
'发布日期': publish_date,
|
||
'投标开始时间': bid_start,
|
||
'详情链接': detail_url,
|
||
'来源类别': f"{channel}{category}" if category else channel
|
||
})
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
return -2
|
||
|
||
def crawl_zghn(self, days:int, keywords: List[str])-> List[Dict]:
|
||
matched_articles = []
|
||
page = 1
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_zghn_results(page, days, keywords)
|
||
if results == -2:
|
||
break
|
||
elif results == -1:
|
||
break
|
||
elif results:
|
||
size = len(results)
|
||
if size and results[size-1] == -1:
|
||
if size > 1:
|
||
matched_articles.extend(results[:-1])
|
||
break
|
||
|
||
if not results:
|
||
page += 1
|
||
continue
|
||
|
||
matched_articles.extend(results)
|
||
page += 1
|
||
|
||
return matched_articles
|
||
|
||
def get_zghn_results(self, page: int, days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
url = "http://chdtp.gdtzb.com/v1/" + str(page) + "/"
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
|
||
try:
|
||
response = requests.get(
|
||
url,
|
||
headers={
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
|
||
"Connection": "keep-alive",
|
||
"Referer": "http://chdtp.gdtzb.com/",
|
||
}
|
||
)
|
||
|
||
if not response.ok:
|
||
return -2
|
||
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
items = soup.select('.pdbox ul li')
|
||
|
||
bidding_info = []
|
||
|
||
for item in items:
|
||
date_str = item.find('span', class_='fr').get_text(strip=True)
|
||
title_tag = item.find('a')
|
||
title = title_tag.get_text(strip=True)
|
||
link = title_tag['href']
|
||
|
||
publish_date = datetime.datetime.strptime(date_str, '%Y-%m-%d')
|
||
|
||
if publish_date < start_time:
|
||
bidding_info.append(-1)
|
||
return bidding_info
|
||
|
||
if any(keyword in title for keyword in keywords):
|
||
bidding_info.append({
|
||
'公告标题': title,
|
||
'发布日期': publish_date.strftime('%Y-%m-%d'),
|
||
'链接': link
|
||
})
|
||
|
||
return bidding_info
|
||
|
||
except Exception as e:
|
||
return -2
|
||
|
||
def crawl_hrsz(self, days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
matched_articles = []
|
||
page = 1
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_hrsz_results(page, days, keywords)
|
||
if results == -3:
|
||
break
|
||
elif results == -2:
|
||
break
|
||
elif results == -1:
|
||
break
|
||
|
||
elif results:
|
||
size = len(results)
|
||
if size and results[size-1] == -1:
|
||
if size > 1:
|
||
matched_articles.extend(results[:-1])
|
||
break
|
||
|
||
if not results:
|
||
page += 1
|
||
continue
|
||
|
||
matched_articles.extend(results)
|
||
page += 1
|
||
|
||
return matched_articles
|
||
|
||
def get_hrsz_results(self, page, days, keywords)->List[Dict]:
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
Base_url = "https://www.szecp.com.cn/"
|
||
|
||
url = Base_url + "rcms-external-rest/content/getSZExtData?channelIds=26909&pageNo=" + str(page) + "&pageSize=10"
|
||
|
||
time.sleep(0.5)
|
||
try:
|
||
response = requests.get(
|
||
url,
|
||
impersonate="chrome110",
|
||
headers={
|
||
"Referer": url,
|
||
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
|
||
}
|
||
)
|
||
|
||
response.encoding = 'utf-8'
|
||
data_str = response.text.strip()
|
||
|
||
if not data_str:
|
||
return -2
|
||
|
||
if data_str.startswith(('{', '[')):
|
||
try:
|
||
data = json.loads(data_str)
|
||
except json.JSONDecodeError:
|
||
return -3
|
||
elif data_str.startswith('<'):
|
||
try:
|
||
data = xmltodict.parse(data_str)
|
||
if 'Result' in data:
|
||
data = data['Result']
|
||
except Exception:
|
||
return -3
|
||
else:
|
||
return -3
|
||
|
||
tender_list = data['data']['data']
|
||
if isinstance(tender_list, dict) and 'data' in data:
|
||
tender_list = tender_list['data']
|
||
|
||
extracted_info = []
|
||
|
||
for tender in tender_list:
|
||
if tender['publishDate'] < start_time.strftime('%Y-%m-%d'):
|
||
extracted_info.append(-1)
|
||
return extracted_info
|
||
if any(keyword in tender['title'] for keyword in keywords) and not "成交" in tender['title']:
|
||
url = tender.get('url', '')
|
||
cleaned_url = url.lstrip("./")
|
||
full_url = Base_url + cleaned_url
|
||
info = {
|
||
'number': tender['number'],
|
||
'purchaseRegion': tender['purchaseRegion']['label'],
|
||
'businessUnit': tender['businessUnit']['label'],
|
||
'deadline': tender['deadline'],
|
||
'purchaseOrg': tender['purchaseOrg']['label'],
|
||
'purchaseType': tender['purchaseType'],
|
||
'title': tender['title'],
|
||
'url': full_url,
|
||
'publishDate': tender['publishDate']
|
||
}
|
||
extracted_info.append(info)
|
||
|
||
return extracted_info
|
||
|
||
except Exception as e:
|
||
return -2
|
||
|
||
def crawl_beijing(self, days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
matched_articles = []
|
||
page = 1
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_beijing_results(page, days, keywords)
|
||
|
||
if results == -2:
|
||
break
|
||
|
||
elif results:
|
||
size = len(results)
|
||
if size and results[size-1] == -1:
|
||
if size > 1:
|
||
matched_articles.extend(results[:-1])
|
||
break
|
||
|
||
if not results:
|
||
page += 1
|
||
continue
|
||
|
||
matched_articles.extend(results)
|
||
page += 1
|
||
|
||
return matched_articles
|
||
|
||
def get_beijing_results(self, page, days, keywords)->List[Dict]:
|
||
BASE_URL = "https://www.powerbeijing-ec.com/"
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
search_url = BASE_URL + "/jncms/search/bulletin.html?categoryId=2&tabName=招标公告&goSearch=&page=" + str(page)
|
||
|
||
response = requests.get(search_url, headers=headers)
|
||
response.encoding = 'utf-8'
|
||
html_content = response.text
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
tenders = []
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
|
||
tender_list = soup.find('ul', class_='newslist')
|
||
if not tender_list:
|
||
return -2
|
||
|
||
for li in tender_list.find_all('li'):
|
||
a_tag = li.find('a')
|
||
title = a_tag['title']
|
||
link = a_tag['href']
|
||
date = a_tag.find('div', class_='newsDate').div.text
|
||
|
||
try:
|
||
date = datetime.datetime.strptime(date, '%Y-%m-%d')
|
||
except ValueError:
|
||
date = None
|
||
|
||
if date < start_time:
|
||
tenders.append(-1)
|
||
return tenders
|
||
|
||
if date >= start_time and any(keyword in title for keyword in keywords) and not "成交" in title:
|
||
tenders.append({
|
||
'title': title,
|
||
'link': link,
|
||
'date': date
|
||
})
|
||
return tenders
|
||
|
||
def crawl_chinaedb(self, days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
matched_articles = []
|
||
page = 1
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_chinaedb_results(page, days, keywords)
|
||
|
||
if results == -2:
|
||
break
|
||
|
||
elif results:
|
||
size = len(results)
|
||
if size and results[size-1] == -1:
|
||
if size > 1:
|
||
matched_articles.extend(results[:-1])
|
||
break
|
||
|
||
if not results:
|
||
page += 1
|
||
continue
|
||
|
||
matched_articles.extend(results)
|
||
page += 1
|
||
|
||
return matched_articles
|
||
|
||
def get_chinaedb_results(self, page, days, keywords)->List[Dict]:
|
||
BASE_URL = "https://www.ebidding.cecep.cn"
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
if page == 1:
|
||
search_url = f"{BASE_URL}/jyxx/001006/001006001/bidinfo.html"
|
||
else:
|
||
search_url = f"{BASE_URL}/jyxx/001006/001006001/{page}.html"
|
||
|
||
response = requests.get(search_url, headers=headers)
|
||
response.encoding = 'utf-8'
|
||
html_content = response.text
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
bid_list = []
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
|
||
go_items = soup.find('ul', class_='go-items')
|
||
if not go_items:
|
||
return -2
|
||
|
||
for li in go_items.find_all('li'):
|
||
a_tag = li.find('a', class_='go-box')
|
||
if not a_tag:
|
||
continue
|
||
|
||
title = a_tag.find('span', class_='go-txt').get_text(strip=True)
|
||
pub_date = a_tag.find('span', class_='go-time').get_text(strip=True)
|
||
|
||
try:
|
||
pub_date = datetime.datetime.strptime(pub_date, '%Y-%m-%d')
|
||
except ValueError:
|
||
pub_date = None
|
||
|
||
if pub_date < start_time:
|
||
bid_list.append(-1)
|
||
return bid_list
|
||
|
||
link = BASE_URL + a_tag['href']
|
||
go_para = a_tag.find('div', class_='go-para')
|
||
category = go_para.find('div', class_='go-sub').get_text(strip=True).replace('采购类别:', '')
|
||
bid_unit = go_para.find('div', class_='go-sub2').get_text(strip=True).replace('招标单位:', '')
|
||
deadline = go_para.find('div', class_='go-sub3').get_text(strip=True).replace('报名截止:', '')
|
||
|
||
try:
|
||
deadline = datetime.datetime.strptime(deadline, '%Y-%m-%d')
|
||
except ValueError:
|
||
deadline = None
|
||
|
||
if pub_date >= start_time and any(keyword in title for keyword in keywords) and not "成交" in title:
|
||
bid_info = {
|
||
'title': title,
|
||
'publish_date': pub_date,
|
||
'category': category,
|
||
'bid_unit': bid_unit,
|
||
'deadline': deadline,
|
||
'link': link
|
||
}
|
||
bid_list.append(bid_info)
|
||
|
||
return bid_list
|
||
|
||
def crawl_chnenergy(self, days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
matched_articles = []
|
||
page = 1
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_chnenergy_results(page, days, keywords)
|
||
|
||
if results == -2:
|
||
break
|
||
|
||
elif results == -1:
|
||
break
|
||
|
||
elif results:
|
||
size = len(results)
|
||
if size and results[size-1] == -1:
|
||
if size > 1:
|
||
matched_articles.extend(results[:-1])
|
||
break
|
||
|
||
if not results:
|
||
page += 1
|
||
continue
|
||
|
||
matched_articles.extend(results)
|
||
page += 1
|
||
|
||
return matched_articles
|
||
|
||
def get_chnenergy_results(self, page, days, keywords)->List[Dict]:
|
||
BASE_URL = "https://www.chnenergybidding.com.cn/"
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
starttime = datetime.datetime.now() - datetime.timedelta(days=days)
|
||
search_url = f"{BASE_URL}/bidweb/001/001002/{page}.html"
|
||
|
||
try:
|
||
response = requests.get(search_url, headers=headers)
|
||
response.encoding = 'utf-8'
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
items_list = soup.find('ul', class_='right-items')
|
||
|
||
if not items_list:
|
||
return -2
|
||
|
||
results = []
|
||
|
||
for item in items_list.find_all('li', class_='right-item clearfix'):
|
||
title_link = item.find('a', href=True)
|
||
title = title_link.get('title', '').strip()
|
||
if not title:
|
||
title = title_link.get_text(strip=True)
|
||
link = BASE_URL + title_link['href']
|
||
|
||
code_tag = item.find('span', class_='author')
|
||
code = code_tag.get_text(strip=True) if code_tag else ''
|
||
|
||
time_span = item.find('span', class_='r')
|
||
time_str = time_span.get_text(strip=True) if time_span else ''
|
||
|
||
try:
|
||
time = datetime.datetime.strptime(time_str, '%Y-%m-%d')
|
||
except ValueError:
|
||
time = None
|
||
continue
|
||
|
||
if time < starttime:
|
||
results.append(-1)
|
||
return results
|
||
|
||
if time >= starttime and any(keyword in title for keyword in keywords) and not "成交" in title:
|
||
results.append({
|
||
"code": code,
|
||
"title": title,
|
||
"link": link,
|
||
"time": time
|
||
})
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
return -2
|
||
|
||
def crawl_neet_shop(self, days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
page = 1
|
||
matched_articles = []
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_neet_shop_results(page, days, keywords)
|
||
|
||
if results == -2:
|
||
break
|
||
|
||
elif results == -1:
|
||
break
|
||
|
||
elif results:
|
||
size = len(results)
|
||
if size and results[size-1] == -1:
|
||
if size > 1:
|
||
matched_articles.extend(results[:-1])
|
||
break
|
||
|
||
if not results:
|
||
page += 1
|
||
continue
|
||
|
||
matched_articles.extend(results)
|
||
page += 1
|
||
|
||
return matched_articles
|
||
|
||
def get_neet_shop_results(self, page_no, days, keywords)->List[Dict]:
|
||
try:
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
|
||
url = (
|
||
"https://www.neep.shop/rest/service/routing/nouser/inquiry/quote/searchCmsArticleList"
|
||
"?callback=jQuery191018342137772079192_1747887937321"
|
||
"&order=asc&deadline=&inquireName=&publishArea=&inquireCode=¬iceType=1&pageNo="
|
||
+ str(page_no)
|
||
)
|
||
|
||
response = requests.get(
|
||
url,
|
||
headers={
|
||
"Referer": url,
|
||
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"',
|
||
},
|
||
)
|
||
response.encoding = 'utf-8'
|
||
|
||
if response.status_code != 200:
|
||
return -2
|
||
|
||
data_str = response.text
|
||
json_start = data_str.find('{')
|
||
json_end = data_str.rfind('}') + 1
|
||
json_str = data_str[json_start:json_end]
|
||
|
||
if not json_str:
|
||
return -2
|
||
|
||
try:
|
||
data = json.loads(json_str)
|
||
except json.JSONDecodeError:
|
||
return -2
|
||
|
||
results = []
|
||
rows = data.get('data', {}).get('rows', [])
|
||
|
||
for row in rows:
|
||
publish_date = datetime.datetime.strptime(row['publishTimeString'], '%Y-%m-%d %H:%M:%S')
|
||
|
||
if start_time > publish_date:
|
||
results.append(-1)
|
||
return results
|
||
|
||
if (
|
||
any(keyword in row['inquireName'] for keyword in keywords)
|
||
and start_time <= publish_date
|
||
and "成交" not in row['inquireName']
|
||
):
|
||
link = row['articleUrl']
|
||
title = row['inquireName']
|
||
deadline_date = datetime.datetime.strptime(row['quotDeadlineString'], '%Y-%m-%d %H:%M:%S')
|
||
results.append((link, title, publish_date, deadline_date))
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
return -2
|
||
|
||
def crawl_sanxiacaigou(self,days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
matched_articles = []
|
||
page = 1
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_sanxiacaigou_results(page, days, keywords)
|
||
|
||
if results == -3:
|
||
break
|
||
elif results == -2:
|
||
break
|
||
elif results == -1:
|
||
break
|
||
|
||
elif results:
|
||
if results[-1] == -1:
|
||
results = results[:-1]
|
||
if results:
|
||
matched_articles.extend(results)
|
||
break
|
||
|
||
matched_articles.extend(results)
|
||
|
||
page += 1
|
||
|
||
return matched_articles
|
||
|
||
def get_sanxiacaigou_results(self, page, days, keywords)->List[Dict]:
|
||
url1 = "https://eps.ctg.com.cn/cms/channel/2ywgg0qb/index.htm?pageNo=" + str(page)
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
BaseURL = "https://eps.ctg.com.cn"
|
||
|
||
try:
|
||
response = requests.get(
|
||
url1,
|
||
impersonate="chrome110",
|
||
headers={
|
||
"Referer": url1,
|
||
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
|
||
}
|
||
)
|
||
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
info_list = soup.find('div', class_='infolist-main bidlist bidlist2')
|
||
if not info_list:
|
||
return -2
|
||
|
||
items = info_list.find_all('li', attrs={'name': 'li_name'})
|
||
if not items:
|
||
return -2
|
||
|
||
bidding_info = []
|
||
|
||
for item in items:
|
||
a_tag = item.find('a')
|
||
if not a_tag:
|
||
continue
|
||
|
||
title = a_tag.get('title', '').strip()
|
||
if not title:
|
||
span = a_tag.find('span')
|
||
if span:
|
||
for element in span.find_all(['i', 'em']):
|
||
element.decompose()
|
||
title = span.get_text().strip()
|
||
|
||
href = a_tag.get('href', '')
|
||
full_url = BaseURL + href if href and not href.startswith('http') else href
|
||
|
||
date_em = None
|
||
all_em_tags = a_tag.find_all('em')
|
||
if all_em_tags:
|
||
for em in all_em_tags:
|
||
if not em.get('style') or 'width:6.5em' not in em.get('style', ''):
|
||
date_em = em
|
||
break
|
||
if not date_em and all_em_tags:
|
||
date_em = all_em_tags[-1]
|
||
|
||
publish_date_str = date_em.get_text().strip() if date_em else ''
|
||
|
||
try:
|
||
publish_date = datetime.datetime.strptime(publish_date_str, "%Y-%m-%d") if publish_date_str else current_time
|
||
except:
|
||
publish_date = current_time
|
||
|
||
if publish_date < start_time:
|
||
bidding_info.append(-1)
|
||
return bidding_info
|
||
|
||
if any(keyword in title for keyword in keywords) and not any(exclude in title for exclude in ["成交", "结果公告", "中标"]):
|
||
bidding_info.append({
|
||
'title': title,
|
||
'url': full_url,
|
||
'date': publish_date_str if publish_date_str else publish_date.strftime("%Y-%m-%d"),
|
||
'source': '三峡采购',
|
||
'page': page
|
||
})
|
||
|
||
return bidding_info
|
||
|
||
except Exception as e:
|
||
return -2
|
||
|
||
def crawl_ctg(self,days: int, keywords: List[str]) -> Union[List[Dict], int]:
|
||
session = self.init_ctg_session()
|
||
if not session:
|
||
return []
|
||
|
||
current_time = datetime.datetime.now()
|
||
start_time = current_time - datetime.timedelta(days=days)
|
||
matched_articles = []
|
||
page = 1
|
||
|
||
while True:
|
||
if self.crawling_stopped:
|
||
return matched_articles
|
||
|
||
results = self.get_ctg_search_results(session, page, keywords)
|
||
|
||
if not results:
|
||
page += 1
|
||
continue
|
||
|
||
for link, title, date in results:
|
||
if date is None:
|
||
continue
|
||
|
||
if start_time <= date <= current_time and "成交" not in title:
|
||
matched_articles.append((link, title, date))
|
||
elif date < start_time:
|
||
return matched_articles
|
||
|
||
page += 1
|
||
return matched_articles
|
||
|
||
def init_ctg_session(self):
|
||
session = requests.Session()
|
||
session.headers.update({
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
||
})
|
||
return session
|
||
|
||
def get_ctg_search_results(self, session, page=1, keywords=None)->List[dict]:
|
||
base_url = "https://eps.ctg.com.cn"
|
||
url = f"{base_url}/cms/channel/2ywgg0qb/index.htm?pageNo={page}"
|
||
|
||
response = requests.get(
|
||
url,
|
||
impersonate="chrome110",
|
||
headers={
|
||
"Referer": url,
|
||
"Sec-Ch-Ua": '"Chromium";v="110", "Not A(Brand";v="24"'
|
||
}
|
||
)
|
||
results = self.parse_ctg_html(response.text, keywords)
|
||
return results
|
||
|
||
def parse_ctg_html(self, html, keywords):
|
||
soup = BeautifulSoup(html, 'html.parser')
|
||
results = []
|
||
base_url = "https://eps.ctg.com.cn"
|
||
|
||
for a in soup.find_all('a', href=True):
|
||
link_text = a.text.strip()
|
||
em_tag = re.search(r'\d{4}-\d{2}-\d{2}', link_text)
|
||
if em_tag:
|
||
try:
|
||
date = datetime.datetime.strptime(em_tag.group(), '%Y-%m-%d')
|
||
except ValueError:
|
||
date = None
|
||
else:
|
||
date = None
|
||
|
||
if any(keyword in link_text for keyword in keywords):
|
||
link = urllib.parse.urljoin(base_url, a['href'])
|
||
title = a.get('title', a.text.strip())
|
||
results.append((link, title, date))
|
||
|
||
return results
|
||
def save_results(self, results: Dict[str, List], output_format: str) -> str:
|
||
"""根据配置保存结果到指定格式"""
|
||
save_methods = {
|
||
'excel': self.save_all_articles_to_excel,
|
||
'html': self.save_all_articles_as_html,
|
||
'markdown': self.save_all_articles_to_markdown,
|
||
'text': self.save_all_articles_to_text,
|
||
'wechat': self.save_all_articles_for_wechat
|
||
}
|
||
|
||
if output_format not in save_methods:
|
||
print(f"不支持的输出格式: {output_format}, 默认使用Excel")
|
||
output_format = 'excel'
|
||
|
||
return save_methods[output_format](results)
|
||
# File saving methods
|
||
def save_all_articles_to_excel(self, all_articles: Dict[str, List]) -> str:
|
||
def parse_datetime(dt_str):
|
||
formats = [
|
||
"%Y-%m-%d %H:%M:%S",
|
||
"%Y-%m-%d %H:%M",
|
||
"%Y-%m-%d"
|
||
]
|
||
if isinstance(dt_str, datetime.datetime):
|
||
return dt_str
|
||
for fmt in formats:
|
||
try:
|
||
return datetime.datetime.strptime(dt_str, fmt)
|
||
except (ValueError, TypeError):
|
||
continue
|
||
return dt_str
|
||
|
||
wb = openpyxl.Workbook()
|
||
|
||
if 'Sheet' in wb.sheetnames:
|
||
wb.remove(wb['Sheet'])
|
||
|
||
for website, articles in all_articles.items():
|
||
if website == "国能e招":
|
||
source = "neet"
|
||
elif website == "三峡招标":
|
||
source = "ctg"
|
||
elif website == "三峡采购":
|
||
source = "ctgc"
|
||
elif website == "国能e购":
|
||
source = "chnenergy"
|
||
elif website == "中国节能":
|
||
source = "chinaedb"
|
||
elif website == "北京京能":
|
||
source = "beijing"
|
||
elif website == "华润守正":
|
||
source = "hrsz"
|
||
elif website == "华电电子":
|
||
source = "zghn"
|
||
elif website == "科环集团":
|
||
source = "kh"
|
||
else:
|
||
continue
|
||
|
||
ws = wb.create_sheet(title=website[:31])
|
||
|
||
if source == "neet":
|
||
headers = ["标记", "序号", "标题链接", "发布时间", "截止时间"]
|
||
ws.append(headers)
|
||
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{link}", "{title}")',
|
||
parse_datetime(publish_date),
|
||
parse_datetime(deadline_date)
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 20
|
||
ws.column_dimensions['E'].width = 20
|
||
|
||
elif source == "ctgc":
|
||
headers = ["标记", "序号", "标题链接", "发布时间"]
|
||
ws.append(headers)
|
||
for idx, article in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{article["url"]}", "{article["title"]}")',
|
||
parse_datetime(article["date"])
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 20
|
||
|
||
elif source == "ctg":
|
||
headers = ["标记", "序号", "标题链接", "发布时间"]
|
||
ws.append(headers)
|
||
for idx, (link, title, publish_time) in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{link}", "{title}")',
|
||
parse_datetime(publish_time)
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 20
|
||
|
||
elif source == "chnenergy":
|
||
headers = ["标记", "序号", "公告编号", "标题链接", "发布时间"]
|
||
ws.append(headers)
|
||
for idx, article in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
article.get("code", ""),
|
||
f'=HYPERLINK("{article["link"]}", "{article["title"]}")',
|
||
parse_datetime(article["time"])
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 20
|
||
ws.column_dimensions['D'].width = 100
|
||
ws.column_dimensions['E'].width = 20
|
||
|
||
elif source == "chinaedb":
|
||
headers = ["标记", "序号", "标题链接", "发布时间", "截止时间", "采购类别", "招标单位"]
|
||
ws.append(headers)
|
||
for idx, article in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{article["link"]}", "{article["title"]}")',
|
||
parse_datetime(article["publish_date"]),
|
||
parse_datetime(article["deadline"]),
|
||
article["category"],
|
||
article["bid_unit"]
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 20
|
||
ws.column_dimensions['E'].width = 20
|
||
ws.column_dimensions['F'].width = 15
|
||
ws.column_dimensions['G'].width = 20
|
||
|
||
elif source == "beijing":
|
||
headers = ["标记", "序号", "标题链接", "发布时间"]
|
||
ws.append(headers)
|
||
for idx, article in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{article["link"]}", "{article["title"]}")',
|
||
parse_datetime(article["date"])
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 20
|
||
|
||
elif source == "hrsz":
|
||
headers = ["标记", "序号", "标题链接", "发布时间", "截止时间"]
|
||
ws.append(headers)
|
||
for idx, article in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{article["url"]}", "{article["title"]}")',
|
||
parse_datetime(article["publishDate"]),
|
||
parse_datetime(article["deadline"])
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 20
|
||
ws.column_dimensions['E'].width = 20
|
||
|
||
elif source == "zghn":
|
||
headers = ["标记", "序号", "标题链接", "发布日期"]
|
||
ws.append(headers)
|
||
for idx, article in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{article["链接"]}", "{article["公告标题"]}")',
|
||
parse_datetime(article["发布日期"])
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 20
|
||
|
||
elif source == "kh":
|
||
headers = ["标记", "序号", "标题链接", "公告类型", "发布日期", "投标时间", "来源类别"]
|
||
ws.append(headers)
|
||
for idx, article in enumerate(articles, 1):
|
||
ws.append([
|
||
"",
|
||
idx,
|
||
f'=HYPERLINK("{article["详情链接"]}", "{article["标题"]}")',
|
||
article["公告类型"],
|
||
parse_datetime(article["发布日期"]) if article["发布日期"] else "无日期信息",
|
||
article["投标开始时间"] if article["投标开始时间"] else "无投标时间信息",
|
||
article["来源类别"]
|
||
])
|
||
ws.column_dimensions['A'].width = 8
|
||
ws.column_dimensions['B'].width = 8
|
||
ws.column_dimensions['C'].width = 100
|
||
ws.column_dimensions['D'].width = 15
|
||
ws.column_dimensions['E'].width = 15
|
||
ws.column_dimensions['F'].width = 20
|
||
ws.column_dimensions['G'].width = 15
|
||
|
||
mark_dv = DataValidation(type="list", formula1='"✔,"', allow_blank=True)
|
||
mark_dv.error = '请从下拉列表中选择'
|
||
mark_dv.errorTitle = '无效输入'
|
||
mark_dv.prompt = '选择✔标记此行'
|
||
mark_dv.promptTitle = '标记选择'
|
||
ws.add_data_validation(mark_dv)
|
||
|
||
last_row = len(articles) + 1
|
||
mark_dv.add(f'A2:A{last_row}')
|
||
|
||
yellow_fill = PatternFill(start_color='FFFF00', end_color='FFFF00', fill_type='solid')
|
||
|
||
rule = FormulaRule(formula=['$A2<>""'], stopIfTrue=True, fill=yellow_fill)
|
||
last_row = len(articles) + 1
|
||
last_col = len(headers)
|
||
ws.conditional_formatting.add(
|
||
f'A2:{get_column_letter(last_col)}{last_row}',
|
||
rule
|
||
)
|
||
|
||
filename = f"招标信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
|
||
outputfile=os.path.join(self.config['output_dir'],filename)
|
||
wb.save(outputfile)
|
||
return outputfile
|
||
|
||
def save_all_articles_for_wechat(self, all_articles: Dict[str, List]) -> str:
|
||
output_lines = []
|
||
|
||
for website, articles in all_articles.items():
|
||
if website == "国能e招":
|
||
source = "neet"
|
||
elif website == "三峡招标":
|
||
source = "ctg"
|
||
elif website == "国能e购":
|
||
source = "chnenergy"
|
||
elif website == "中国节能":
|
||
source = "chinaedb"
|
||
elif website == "北京京能":
|
||
source = "beijing"
|
||
elif website == "华润守正":
|
||
source = "hrsz"
|
||
elif website == "华电电子":
|
||
source = "zghn"
|
||
elif website =="科环集团":
|
||
source="kh"
|
||
elif website == "三峡采购":
|
||
source = "ctgc"
|
||
else:
|
||
continue
|
||
|
||
output_lines.append(f"【{website}】")
|
||
output_lines.append("")
|
||
|
||
if source == "neet":
|
||
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {title}")
|
||
output_lines.append(f" {link}")
|
||
|
||
elif source == "ctg":
|
||
for idx, (link, title, publish_time) in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {title}")
|
||
output_lines.append(f" {link}")
|
||
elif source == "ctgc":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {article['title']}")
|
||
output_lines.append(f" {article['url']}")
|
||
elif source == "chnenergy":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {article['title']}")
|
||
output_lines.append(f" {article['link']}")
|
||
|
||
elif source == "chinaedb":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {article['title']}")
|
||
output_lines.append(f" {article['link']}")
|
||
|
||
elif source == "beijing":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {article['title']}")
|
||
output_lines.append(f" {article['link']}")
|
||
|
||
elif source == "hrsz":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {article['title']}")
|
||
output_lines.append(f" {article['url']}")
|
||
|
||
elif source == "zghn":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. {article['公告标题']}")
|
||
output_lines.append(f" {article['链接']}")
|
||
|
||
output_lines.append("")
|
||
|
||
filename = f"爬取信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
||
outputfile=os.path.join(self.config['output_dir'],filename)
|
||
with open(outputfile, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(output_lines))
|
||
|
||
return outputfile
|
||
|
||
def save_all_articles_to_markdown(self, all_articles: Dict[str, List]) -> str:
|
||
output_lines = []
|
||
|
||
for website, articles in all_articles.items():
|
||
if website == "国能e招":
|
||
source = "neet"
|
||
elif website == "三峡招标":
|
||
source = "ctg"
|
||
elif website == "国能e购":
|
||
source = "chnenergy"
|
||
elif website == "中国节能":
|
||
source = "chinaedb"
|
||
elif website == "北京京能":
|
||
source = "beijing"
|
||
elif website == "华润守正":
|
||
source = "hrsz"
|
||
elif website == "华电电子":
|
||
source = "zghn"
|
||
elif website =="科环集团":
|
||
source="kh"
|
||
elif website == "三峡采购":
|
||
source = "ctgc"
|
||
else:
|
||
continue
|
||
|
||
output_lines.append(f"### {website} ")
|
||
output_lines.append("")
|
||
|
||
if source == "neet":
|
||
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{title}]({link}) ")
|
||
elif source == "ctgc":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['url']}) ")
|
||
elif source == "ctg":
|
||
for idx, (link, title, publish_time) in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{title}]({link}) ")
|
||
|
||
elif source == "chnenergy":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['link']}) ")
|
||
|
||
elif source == "chinaedb":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['link']}) ")
|
||
|
||
elif source == "beijing":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['link']}) ")
|
||
|
||
elif source == "hrsz":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['url']}) ")
|
||
|
||
elif source == "zghn":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['公告标题']}]({article['链接']}) ")
|
||
elif source=="kh":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['标题']}]({article['详情链接']}) ")
|
||
output_lines.append("")
|
||
|
||
filename = f"爬取信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
|
||
outputfile=os.path.join(self.config['output_dir'],filename)
|
||
with open(outputfile, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(output_lines))
|
||
|
||
return outputfile
|
||
|
||
def save_all_articles_to_text(self, all_articles: Dict[str, List]) -> str:
|
||
output_lines = []
|
||
|
||
for website, articles in all_articles.items():
|
||
if website == "国能e招":
|
||
source = "neet"
|
||
elif website == "三峡招标":
|
||
source = "ctg"
|
||
elif website == "国能e购":
|
||
source = "chnenergy"
|
||
elif website == "中国节能":
|
||
source = "chinaedb"
|
||
elif website == "北京京能":
|
||
source = "beijing"
|
||
elif website == "华润守正":
|
||
source = "hrsz"
|
||
elif website == "华电电子":
|
||
source = "zghn"
|
||
elif website =="科环集团":
|
||
source="kh"
|
||
elif website == "三峡采购":
|
||
source = "ctgc"
|
||
else:
|
||
continue
|
||
|
||
output_lines.append(f"### {website}")
|
||
|
||
if source == "neet":
|
||
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{title}]({link})")
|
||
|
||
elif source == "ctg":
|
||
for idx, (link, title, publish_time) in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{title}]({link})")
|
||
|
||
elif source == "chnenergy":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['link']})")
|
||
|
||
elif source == "chinaedb":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['link']})")
|
||
|
||
elif source == "beijing":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['link']})")
|
||
|
||
elif source == "hrsz":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['url']})")
|
||
elif source =="ctgc":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['title']}]({article['url']})")
|
||
elif source == "zghn":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['公告标题']}]({article['链接']})")
|
||
elif source=="kh":
|
||
for idx, article in enumerate(articles, 1):
|
||
output_lines.append(f"{idx}. [{article['标题']}]({article['详情链接']})")
|
||
output_lines.append("")
|
||
|
||
filename = f"爬取信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
|
||
outputfile=os.path.join(self.config['output_dir'],filename)
|
||
with open(outputfile, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(output_lines))
|
||
|
||
return outputfile
|
||
|
||
def save_all_articles_as_html(self, all_articles: Dict[str, List]) -> str:
|
||
html_content = """<!DOCTYPE html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<title>爬取信息汇总_{date}</title>
|
||
<style>
|
||
body {{
|
||
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
|
||
line-height: 1.6;
|
||
color: #333;
|
||
padding: 15px;
|
||
max-width: 100%;
|
||
word-break: break-word;
|
||
}}
|
||
h2 {{
|
||
color: #1a73e8;
|
||
font-size: 18px;
|
||
padding-bottom: 8px;
|
||
border-bottom: 1px solid #eee;
|
||
margin-top: 20px;
|
||
margin-bottom: 15px;
|
||
}}
|
||
.item {{
|
||
margin-bottom: 12px;
|
||
padding-left: 10px;
|
||
border-left: 3px solid transparent;
|
||
}}
|
||
.item:hover {{
|
||
border-left-color: #1a73e8;
|
||
}}
|
||
.item-index {{
|
||
color: #666;
|
||
margin-right: 5px;
|
||
}}
|
||
a {{
|
||
color: #1a73e8;
|
||
text-decoration: none;
|
||
}}
|
||
a:hover {{
|
||
text-decoration: underline;
|
||
}}
|
||
.meta {{
|
||
font-size: 13px;
|
||
color: #666;
|
||
margin-top: 3px;
|
||
}}
|
||
.time {{
|
||
display: inline-block;
|
||
margin-right: 10px;
|
||
}}
|
||
.deadline {{
|
||
display: inline-block;
|
||
color: #d32f2f;
|
||
}}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<h1>招标信息汇总</h1>
|
||
<p>更新时间:{date} {time}</p>
|
||
""".format(
|
||
date=datetime.datetime.now().strftime('%Y-%m-%d'),
|
||
time=datetime.datetime.now().strftime('%H:%M:%S')
|
||
)
|
||
|
||
for website, articles in all_articles.items():
|
||
if website == "国能e招":
|
||
source = "neet"
|
||
elif website == "三峡招标":
|
||
source = "ctg"
|
||
elif website == "国能e购":
|
||
source = "chnenergy"
|
||
elif website == "中国节能":
|
||
source = "chinaedb"
|
||
elif website == "北京京能":
|
||
source = "beijing"
|
||
elif website == "华润守正":
|
||
source = "hrsz"
|
||
elif website == "华电电子":
|
||
source = "zghn"
|
||
elif website =="科环集团":
|
||
source="kh"
|
||
elif website == "三峡采购":
|
||
source = "ctgc"
|
||
else:
|
||
continue
|
||
|
||
html_content += f'<h2>{website}</h2>\n<div class="items">\n'
|
||
|
||
if source == "neet":
|
||
for idx, (link, title, publish_date, deadline_date) in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{link}" target="_blank">{title}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {publish_date}</span>
|
||
<span class="deadline">截止时间: {deadline_date}</span>
|
||
</div>
|
||
</div>
|
||
"""
|
||
elif source == "ctgc":
|
||
for idx, article in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{article['url']}" target="_blank">{article['title']}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {article['date']}</span>
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
elif source == "ctg":
|
||
for idx, (link, title, publish_time) in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{link}" target="_blank">{title}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {publish_time.strftime('%Y-%m-%d %H:%M:%S')}</span>
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
elif source == "chnenergy":
|
||
for idx, article in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{article['link']}" target="_blank">{article['title']}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {article['time'].strftime('%Y-%m-%d')}</span>
|
||
<span>公告编号: {article.get('code', '')}</span>
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
elif source == "chinaedb":
|
||
for idx, article in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{article['link']}" target="_blank">{article['title']}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {article['publish_date'].strftime('%Y-%m-%d')}</span>
|
||
<span class="deadline">截止时间: {article['deadline'].strftime('%Y-%m-%d')}</span>
|
||
<div>采购类别: {article['category']} | 招标单位: {article['bid_unit']}</div>
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
elif source == "beijing":
|
||
for idx, article in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{article['link']}" target="_blank">{article['title']}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {article['date'].strftime('%Y-%m-%d')}</span>
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
elif source == "hrsz":
|
||
for idx, article in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{article['url']}" target="_blank">{article['title']}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {article['publishDate']}</span>
|
||
<span class="deadline">截止时间: {article['deadline']}</span>
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
elif source == "zghn":
|
||
for idx, article in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{article['链接']}" target="_blank">{article['公告标题']}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {article['发布日期'].strftime('%Y-%m-%d')}</span>
|
||
<span>状态: {article['公告状态']} | 类型: {article['业务类型']}</span>
|
||
</div>
|
||
</div>
|
||
"""
|
||
elif source == "kh":
|
||
for idx, article in enumerate(articles, 1):
|
||
html_content += f"""
|
||
<div class="item">
|
||
<span class="item-index">{idx}.</span>
|
||
<a href="{article['详情链接']}" target="_blank">{article['标题']}</a>
|
||
<div class="meta">
|
||
<span class="time">发布时间: {article['发布日期'].strftime('%Y-%m-%d') if article['发布日期'] else '无日期信息'}</span>
|
||
<span class="deadline">投标时间: {article['投标开始时间'] if article['投标开始时间'] else '无投标时间信息'}</span>
|
||
<div>公告类型: {article['公告类型']} | 来源类别: {article['来源类别']}</div>
|
||
</div>
|
||
</div>
|
||
"""
|
||
html_content += "</div>\n"
|
||
|
||
html_content += """
|
||
<footer style="margin-top: 30px; padding-top: 15px; border-top: 1px solid #eee; color: #666; font-size: 13px;">
|
||
<p>本文件由系统自动生成,链接可直接点击访问</p>
|
||
<p>更新时间: {date} {time}</p>
|
||
</footer>
|
||
</body>
|
||
</html>
|
||
""".format(
|
||
date=datetime.datetime.now().strftime('%Y-%m-%d'),
|
||
time=datetime.datetime.now().strftime('%H:%M:%S')
|
||
)
|
||
|
||
filename = f"招标信息汇总_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
|
||
outputfile=os.path.join(self.config['output_dir'],filename)
|
||
with open(outputfile, 'w', encoding='utf-8') as f:
|
||
f.write(html_content)
|
||
|
||
return outputfile
|
||
|
||
# Email sending methods
|
||
def send_all_email_with_excel(self, excel_filepath, days, article_count, email_config):
|
||
msg = MIMEMultipart()
|
||
|
||
def encode_from_header(nickname, email):
|
||
try:
|
||
if all(ord(c) < 128 for c in nickname):
|
||
return f"{nickname} <{email}>"
|
||
else:
|
||
from email.header import Header
|
||
encoded_nickname = Header(nickname, 'utf-8').encode()
|
||
return f"{encoded_nickname} <{email}>"
|
||
except:
|
||
return email
|
||
|
||
msg['From'] = encode_from_header(email_config["sender_username"], email_config["sender_email"])
|
||
msg['To'] = ", ".join(self.config['receiver_emails'])
|
||
|
||
subject = f"招标信息汇总报告(最近{days}天)"
|
||
msg['Subject'] = Header(subject, 'utf-8')
|
||
|
||
selected_websites = self.config['websites']
|
||
body = f"""<html>
|
||
<body>
|
||
<h2>招标信息汇总报告</h2>
|
||
<p>时间范围: 最近{days}天</p>
|
||
<p>爬取的网站: {", ".join(selected_websites)}</p>
|
||
<p>找到的文章总数: {article_count}篇</p>
|
||
<p>请查看附件Excel文件获取详细信息。</p>
|
||
</body>
|
||
</html>"""
|
||
|
||
alternative = MIMEMultipart('alternative')
|
||
texthtml = MIMEText(body, _subtype='html', _charset='UTF-8')
|
||
alternative.attach(texthtml)
|
||
msg.attach(alternative)
|
||
|
||
with open(excel_filepath, 'rb') as f:
|
||
xlsxpart = MIMEApplication(f.read())
|
||
|
||
xlsxpart.add_header('Content-Disposition', 'attachment',
|
||
filename=Header(os.path.basename(excel_filepath), "utf-8").encode())
|
||
msg.attach(xlsxpart)
|
||
|
||
success_emails = []
|
||
failed_emails = []
|
||
|
||
try:
|
||
server = smtplib.SMTP_SSL(email_config['smtp_server'], email_config['smtp_port'])
|
||
server.ehlo()
|
||
server.login(email_config['sender_email'], email_config['sender_password'])
|
||
|
||
try:
|
||
server.sendmail(
|
||
email_config['sender_email'],
|
||
self.config['receiver_emails'],
|
||
msg.as_string()
|
||
)
|
||
success_emails = self.config['receiver_emails'].copy()
|
||
except smtplib.SMTPException:
|
||
for receiver in self.config['receiver_emails']:
|
||
try:
|
||
temp_msg = msg
|
||
temp_msg.replace_header('To', receiver)
|
||
server.sendmail(
|
||
email_config['sender_email'],
|
||
[receiver],
|
||
temp_msg.as_string()
|
||
)
|
||
success_emails.append(receiver)
|
||
except Exception as e:
|
||
failed_emails.append((receiver, str(e)))
|
||
|
||
server.close()
|
||
|
||
return {
|
||
'success': success_emails,
|
||
'failed': failed_emails
|
||
}
|
||
|
||
except Exception as e:
|
||
raise Exception(f"邮件发送失败: {str(e)}")
|
||
def send_email_with_results(self,results: Dict[str, List], file_path: str) -> bool:
|
||
"""发送带结果的邮件"""
|
||
try:
|
||
email_config = {
|
||
"smtp_server": "smtp.qq.com",
|
||
"smtp_port": 465,
|
||
"sender_email": self.config['sender_email'],
|
||
"sender_username":self.config['sender_username'],
|
||
"receiver_emails":self.config['receiver_emails'],
|
||
"sender_password": self.config['sender_password']
|
||
}
|
||
|
||
result = self.send_all_email_with_excel(
|
||
file_path,
|
||
self.config['days'],
|
||
sum(len(v) for v in results.values()),
|
||
email_config
|
||
)
|
||
|
||
if result['failed']:
|
||
print(f"邮件发送失败: {result['failed']}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"发送邮件时出错: {e}")
|
||
return False
|
||
# Configuration methods
|
||
def load_config(self, config_file):
|
||
"""Load configuration from file"""
|
||
if not os.path.exists(config_file):
|
||
return None
|
||
|
||
try:
|
||
with open(config_file, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
return config
|
||
except Exception:
|
||
return None
|
||
|
||
def save_config(self, config, config_file):
|
||
"""Save configuration to file"""
|
||
try:
|
||
with open(config_file, "w", encoding="utf-8") as f:
|
||
json.dump(config, f, ensure_ascii=False, indent=2)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
# Utility methods
|
||
def open_file(self, filepath):
|
||
"""Open file with default application"""
|
||
try:
|
||
if platform.system() == 'Darwin':
|
||
subprocess.call(('open', filepath))
|
||
elif platform.system() == 'Windows':
|
||
os.startfile(filepath)
|
||
else:
|
||
subprocess.call(('xdg-open', filepath))
|
||
return True
|
||
except Exception:
|
||
return False
|
||
def main():
|
||
if len(sys.argv) > 1:
|
||
config_path = sys.argv[1]
|
||
else:
|
||
config_path = "config.json"
|
||
|
||
crawler = WebCrawler(config_path)
|
||
crawler.run()
|
||
|
||
if __name__ == "__main__":
|
||
main() |