2024 字
10 分钟
基于Smtp的邮件推送RSS订阅Python脚本
import feedparserimport smtplibimport timeimport reimport sslimport osimport threadingimport requestsfrom email.mime.multipart import MIMEMultipartfrom email.mime.text import MIMETextfrom email.mime.image import MIMEImagefrom email.header import Headerfrom urllib.parse import urlparsefrom datetime import datetime
# 全局变量控制定时任务running = Truestop_event = threading.Event()
def get_safe_input(prompt, default=None, allow_empty=False): """安全获取用户输入,支持默认值""" while True: user_input = input(prompt) if user_input.strip() or default is not None or allow_empty: return user_input.strip() or default print("不能为空哦,请重新输入~")
def is_valid_url(url): """验证URL有效性""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False
def ensure_https(url): """确保图片URL使用HTTPS""" if not url: return None if url.startswith("http://"): return "https" + url[4:] return url
def download_image(url): """下载图片到内存""" try: if not is_valid_url(url): return None
headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200: return response.content else: print(f"❌ 图片下载失败 (状态码: {response.status_code})") return None except Exception as e: print(f"❌ 图片下载异常: {str(e)}") return None
def parse_rss_feed(url): """增强版RSS解析,添加详细错误日志""" print(f"🐾 开始解析RSS源: {url}") try: feed = feedparser.parse(url)
# 详细错误诊断 if hasattr(feed, 'bozo_exception'): print(f"❌ RSS解析错误: {str(feed.bozo_exception)}") return []
if not feed.entries: print("⚠️ RSS源没有找到任何内容条目") return []
print(f"✨ 成功获取 {len(feed.entries)} 条内容 | 更新时间: {feed.feed.get('updated', '未知')}") return feed.entries except Exception as e: print(f"🔥 获取RSS时发生严重错误: {str(e)}") return []
def extract_content(entry): """从RSS条目中提取内容、图片和标题""" title = entry.title if hasattr(entry, 'title') else "无标题"
# 优先使用description,其次content content_html = "" if hasattr(entry, 'description'): content_html = entry.description elif hasattr(entry, 'content') and entry.content: content_html = entry.content[0].value
# 清理HTML获取纯文本 content_text = re.sub(r'<[^>]+>', '', content_html) content_text = re.sub(r'\s+', ' ', content_text).strip()
# 智能提取图片 image_url = None
# 1. 检查enclosures if hasattr(entry, 'enclosures') and entry.enclosures: for enclosure in entry.enclosures: if enclosure.get('type', '').startswith('image'): image_url = enclosure.href break
# 2. 从description中提取 if not image_url: img_match = re.search(r'<img[^>]+src="([^">]+)"', content_html) if img_match: image_url = img_match.group(1)
# 3. 检查media:thumbnail if not image_url and hasattr(entry, 'media_thumbnail'): image_url = entry.media_thumbnail[0]['url']
# 确保HTTPS image_url = ensure_https(image_url) if image_url else None
# 获取链接 link_url = entry.link if hasattr(entry, 'link') else ""
return { 'title': title, 'content': content_text, 'image_url': image_url, 'link_url': link_url, 'html_content': content_html }
def create_email_content(title, content, image_url, link_url, image_id=None): """创建HTML格式的邮件内容,支持内嵌图片""" # 邮件HTML模板 - 明确指定UTF-8编码 html = f"""<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>{title}</title> <style> body {{ font-family: 'Microsoft YaHei', Arial, sans-serif; line-height: 1.6; color: #333; }} .header {{ background-color: #f0f2f5; padding: 20px; border-bottom: 1px solid #e0e0e0; }} .content {{ padding: 25px; }} .footer {{ margin-top: 25px; padding-top: 15px; border-top: 1px solid #e0e0e0; font-size: 12px; color: #666; }} .image-container {{ text-align: center; margin: 20px 0; }} .image-container img {{ max-width: 100%; height: auto; border-radius: 4px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }} .read-more {{ display: inline-block; margin-top: 15px; padding: 10px 20px; background-color: #007bff; color: white; text-decoration: none; border-radius: 4px; font-weight: bold; }} .timestamp {{ color: #999; font-size: 14px; }} .content-preview {{ font-size: 16px; line-height: 1.8; margin: 20px 0; }} </style> </head> <body> <div class="header"> <h2>RSS订阅更新</h2> <div class="timestamp">更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</div> </div>
<div class="content"> <h1>{title}</h1> """
# 添加图片 if image_url and is_valid_url(image_url): if image_id: html += f""" <div class="image-container"> <img src="cid:{image_id}" alt="文章配图"> </div> """ else: html += f""" <div class="image-container"> <img src="{image_url}" alt="文章配图"> </div> """
# 添加内容预览 html += f""" <div class="content-preview"> {content} </div> """
# 添加阅读更多链接 html += f""" <a href="{link_url}" class="read-more">查看全文</a> </div>
<div class="footer"> <p>这是由RSS-to-Email服务自动发送的订阅更新</p> <p>如需停止订阅,请回复本邮件</p> </div> </body> </html> """
return html
def send_email(title, content, image_url, link_url, email_config): """发送邮件,支持内嵌图片和UTF-8中文字符""" # 创建邮件内容 html_content = create_email_content(title, content, image_url, link_url)
# 创建MIMEMultipart对象 msg = MIMEMultipart('related')
# 设置邮件主题,确保UTF-8编码 msg['Subject'] = Header(title, 'utf-8').encode() msg['From'] = email_config['from_email'] msg['To'] = email_config['to_email']
# 添加HTML内容,明确指定UTF-8编码 part = MIMEText(html_content, 'html', 'utf-8') msg.attach(part)
# 如果有图片,尝试内嵌 if image_url and is_valid_url(image_url): image_data = download_image(image_url) if image_data: # 创建图片ID image_id = f"image_{hash(image_url)}"
# 更新HTML内容以使用内嵌图片 html_content = create_email_content(title, content, image_url, link_url, image_id)
# 替换HTML内容 for part in msg.walk(): if part.get_content_type() == 'text/html': part.set_payload(html_content, 'utf-8') break
# 添加图片 img = MIMEImage(image_data) img.add_header('Content-ID', f'<{image_id}>') img.add_header('Content-Disposition', 'inline', filename='rss_image.jpg') msg.attach(img)
# 连接SMTP服务器并发送邮件 try: print(f"📤 尝试通过 {email_config['smtp_server']}:{email_config['smtp_port']} 发送邮件...")
if email_config['use_ssl']: context = ssl.create_default_context() server = smtplib.SMTP_SSL( email_config['smtp_server'], email_config['smtp_port'], context=context ) else: server = smtplib.SMTP( email_config['smtp_server'], email_config['smtp_port'] ) server.starttls()
server.login(email_config['username'], email_config['password'])
# 发送邮件,确保使用UTF-8编码 server.sendmail( email_config['from_email'], email_config['to_email'], msg.as_string() ) server.quit()
print(f"✅ 邮件发送成功: {title}") return True except Exception as e: print(f"❌ 邮件发送失败: {str(e)}") return False
def send_latest_entries(rss_url, count=5, email_config=None): """发送最新的N条RSS内容""" global running
if not email_config: email_config = { 'smtp_server': '你的smtp服务器地址', 'smtp_port': 465, 'username': '你的邮箱地址', 'password': '密码', 'from_email': '你的邮箱地址', 'to_email': '你的接收地址', 'use_ssl': True }
if not running: return 0
print(f"\n{datetime.now().strftime('%H:%M:%S')} - 🔍 获取最新内容...") entries = parse_rss_feed(rss_url)
if not entries: print("⚠️ 无法获取RSS内容,跳过本次发送") return 0
print(f"📰 找到 {len(entries)} 条新内容,准备发送最新的 {min(count, len(entries))} 条")
success_count = 0 for i, entry in enumerate(entries[:count], 1): if not running: print("🛑 收到停止信号,中断发送") break
entry_data = extract_content(entry) print(f"\n[{i}/{min(count, len(entries))}] {entry_data['title']}")
if send_email( entry_data['title'], entry_data['content'], entry_data['image_url'], entry_data['link_url'], email_config ): success_count += 1
# 避免发送太快 if i < min(count, len(entries)) and running: print("⏱️ 等待3秒避免发送太快...") time.sleep(3)
print(f"\n✅ 本次发送完成: 成功 {success_count}/{min(count, len(entries))} 条") return success_count
def schedule_sender(rss_url, count, interval_minutes, email_config=None): """定时发送任务""" global running
print(f"\n⏰ 定时任务已启动! 每 {interval_minutes} 分钟发送一次最新 {count} 条内容") print("按 Ctrl+C 停止定时任务\n")
try: while running: send_latest_entries(rss_url, count, email_config)
if not running: break
# 倒计时显示 print(f"\n🕒 下次发送将在 {interval_minutes} 分钟后进行") for i in range(interval_minutes * 60): if not running: break if i % 30 == 0 and i > 0: # 每30秒显示一次进度 remaining = interval_minutes * 60 - i print(f"⏳ 剩余时间: {remaining//60}分{remaining%60}秒") time.sleep(1)
except KeyboardInterrupt: print("\n\n🛑 检测到键盘中断,正在停止定时任务...") finally: running = False stop_event.set()
def test_connection(email_config): """测试SMTP连接""" print("\n🔍 测试SMTP连接...") try: if email_config['use_ssl']: context = ssl.create_default_context() server = smtplib.SMTP_SSL( email_config['smtp_server'], email_config['smtp_port'], context=context ) else: server = smtplib.SMTP( email_config['smtp_server'], email_config['smtp_port'] ) server.starttls()
server.login(email_config['username'], email_config['password']) server.quit()
print("✅ SMTP连接测试成功!") return True except Exception as e: print(f"❌ 连接失败: {str(e)}") return False
def main(): global running running = True
# 配置默认值 DEFAULT_RSS = "https://plink.anyfeeder.com/idaily/today" DEFAULT_RECIPIENT = "guzhou999@dingtalk.com"
print("\n" + "="*60) print("📧 RSS-to-Email 服务 v2.0") print("✨ 中文字符完美支持 | 内嵌图片 | 专业排版") print("="*60)
# 获取配置 print("\n" + "-"*40) print("⚙️ 邮件服务配置") print("-"*40)
# SMTP服务器 smtp_server = get_safe_input( f"SMTP服务器 (默认: smtp.zoho.com.cn):\n> ", "smtp.zoho.com.cn" )
# SMTP端口 try: smtp_port = int(get_safe_input( f"SMTP端口 (默认: 465):\n> ", "465" )) except ValueError: smtp_port = 465 print("⚠️ 无效端口,使用默认值465")
# SMTP用户名 smtp_username = get_safe_input( f"SMTP用户名 ( ):\n> ", "填你的用户名" )
# SMTP密码 smtp_password = get_safe_input( "SMTP密码 (必填):\n> ", allow_empty=False )
# 发件人邮箱 from_email = get_safe_input( f"发件人邮箱 (默认: {smtp_username}):\n> ", smtp_username )
# 收件人邮箱 to_email = get_safe_input( f"收件人邮箱 (默认: {DEFAULT_RECIPIENT}):\n> ", DEFAULT_RECIPIENT )
# 配置字典 email_config = { 'smtp_server': smtp_server, 'smtp_port': smtp_port, 'username': smtp_username, 'password': smtp_password, 'from_email': from_email, 'to_email': to_email, 'use_ssl': True if smtp_port == 465 else False }
# 测试连接 if not test_connection(email_config): if get_safe_input("\n连接失败,是否继续? (y/n): ", "n").lower() != 'y': print("😿 退出程序") return
print("\n" + "-"*40) print("⚙️ RSS配置") print("-"*40)
rss_url = get_safe_input( f"RSS订阅地址\n(默认: {DEFAULT_RSS}):\n> ", DEFAULT_RSS )
try: count = int(get_safe_input("\n每轮发送的最新内容数量 (1-10):\n> ", "3")) count = max(1, min(10, count)) # 限制在1-10之间 except ValueError: count = 3 print("⚠️ 无效数字,使用默认值3")
# 选择模式:一次性 or 定时 print("\n" + "-"*40) print("🕒 发送模式") print("-"*40) print("1. 一次性发送 (立即发送一次后退出)") print("2. 定时自动发送 (按指定间隔循环发送)")
mode = get_safe_input("\n请选择模式 (1/2): ", "1")
if mode == "2": try: interval = int(get_safe_input("\n发送间隔(分钟) (5-120):\n> ", "30")) interval = max(5, min(120, interval)) # 限制在5-120分钟 except ValueError: interval = 30 print("⚠️ 无效数字,使用默认值30分钟")
# 启动定时任务 schedule_sender(rss_url, count, interval, email_config) else: # 一次性发送 print("\n🚀 开始一次性发送任务...") send_latest_entries(rss_url, count, email_config)
print("\n" + "="*60) print("✨ 任务完成!感谢使用 RSS-to-Email 服务") print("="*60)
if __name__ == "__main__": # 依赖检查 try: import feedparser import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.header import Header import ssl import requests except ImportError as e: print("\n" + "="*50) print("❌ 依赖库未安装! 请运行:") print(f"pip install feedparser smtplib requests") print("="*50) exit(1)
print("\n" + "="*50) print("📧 RSS-to-Email 服务") print(f"🕒 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("="*50)
try: main() except KeyboardInterrupt: print("\n\n🛑 程序被用户中断") except Exception as e: print(f"\n\n🔥 未捕获的异常: {str(e)}") finally: running = False stop_event.set() print("\n👋 程序已安全退出") 基于Smtp的邮件推送RSS订阅Python脚本
https://xiyue-org.pages.dev/posts/邮件/ 部分信息可能已经过时









