Mobile wallpaper 1Mobile wallpaper 2Mobile wallpaper 3Mobile wallpaper 4Mobile wallpaper 5Mobile wallpaper 6
2024 字
10 分钟
基于Smtp的邮件推送RSS订阅Python脚本
2025-11-22
统计加载中...
import feedparser
import smtplib
import time
import re
import ssl
import os
import threading
import requests
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.header import Header
from urllib.parse import urlparse
from datetime import datetime
# 全局变量控制定时任务
running = True
stop_event = threading.Event()
def get_safe_input(prompt, default=None, allow_empty=False):
"""安全获取用户输入,支持默认值"""
while True:
user_input = input(prompt)
if user_input.strip() or default is not None or allow_empty:
return user_input.strip() or default
print("不能为空哦,请重新输入~")
def is_valid_url(url):
"""验证URL有效性"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
def ensure_https(url):
"""确保图片URL使用HTTPS"""
if not url:
return None
if url.startswith("http://"):
return "https" + url[4:]
return url
def download_image(url):
"""下载图片到内存"""
try:
if not is_valid_url(url):
return None
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.content
else:
print(f"❌ 图片下载失败 (状态码: {response.status_code})")
return None
except Exception as e:
print(f"❌ 图片下载异常: {str(e)}")
return None
def parse_rss_feed(url):
"""增强版RSS解析,添加详细错误日志"""
print(f"🐾 开始解析RSS源: {url}")
try:
feed = feedparser.parse(url)
# 详细错误诊断
if hasattr(feed, 'bozo_exception'):
print(f"❌ RSS解析错误: {str(feed.bozo_exception)}")
return []
if not feed.entries:
print("⚠️ RSS源没有找到任何内容条目")
return []
print(f"✨ 成功获取 {len(feed.entries)} 条内容 | 更新时间: {feed.feed.get('updated', '未知')}")
return feed.entries
except Exception as e:
print(f"🔥 获取RSS时发生严重错误: {str(e)}")
return []
def extract_content(entry):
"""从RSS条目中提取内容、图片和标题"""
title = entry.title if hasattr(entry, 'title') else "无标题"
# 优先使用description,其次content
content_html = ""
if hasattr(entry, 'description'):
content_html = entry.description
elif hasattr(entry, 'content') and entry.content:
content_html = entry.content[0].value
# 清理HTML获取纯文本
content_text = re.sub(r'<[^>]+>', '', content_html)
content_text = re.sub(r'\s+', ' ', content_text).strip()
# 智能提取图片
image_url = None
# 1. 检查enclosures
if hasattr(entry, 'enclosures') and entry.enclosures:
for enclosure in entry.enclosures:
if enclosure.get('type', '').startswith('image'):
image_url = enclosure.href
break
# 2. 从description中提取
if not image_url:
img_match = re.search(r'<img[^>]+src="([^">]+)"', content_html)
if img_match:
image_url = img_match.group(1)
# 3. 检查media:thumbnail
if not image_url and hasattr(entry, 'media_thumbnail'):
image_url = entry.media_thumbnail[0]['url']
# 确保HTTPS
image_url = ensure_https(image_url) if image_url else None
# 获取链接
link_url = entry.link if hasattr(entry, 'link') else ""
return {
'title': title,
'content': content_text,
'image_url': image_url,
'link_url': link_url,
'html_content': content_html
}
def create_email_content(title, content, image_url, link_url, image_id=None):
"""创建HTML格式的邮件内容,支持内嵌图片"""
# 邮件HTML模板 - 明确指定UTF-8编码
html = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>{title}</title>
<style>
body {{ font-family: 'Microsoft YaHei', Arial, sans-serif; line-height: 1.6; color: #333; }}
.header {{ background-color: #f0f2f5; padding: 20px; border-bottom: 1px solid #e0e0e0; }}
.content {{ padding: 25px; }}
.footer {{ margin-top: 25px; padding-top: 15px; border-top: 1px solid #e0e0e0; font-size: 12px; color: #666; }}
.image-container {{ text-align: center; margin: 20px 0; }}
.image-container img {{ max-width: 100%; height: auto; border-radius: 4px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }}
.read-more {{ display: inline-block; margin-top: 15px; padding: 10px 20px; background-color: #007bff; color: white; text-decoration: none; border-radius: 4px; font-weight: bold; }}
.timestamp {{ color: #999; font-size: 14px; }}
.content-preview {{ font-size: 16px; line-height: 1.8; margin: 20px 0; }}
</style>
</head>
<body>
<div class="header">
<h2>RSS订阅更新</h2>
<div class="timestamp">更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</div>
</div>
<div class="content">
<h1>{title}</h1>
"""
# 添加图片
if image_url and is_valid_url(image_url):
if image_id:
html += f"""
<div class="image-container">
<img src="cid:{image_id}" alt="文章配图">
</div>
"""
else:
html += f"""
<div class="image-container">
<img src="{image_url}" alt="文章配图">
</div>
"""
# 添加内容预览
html += f"""
<div class="content-preview">
{content}
</div>
"""
# 添加阅读更多链接
html += f"""
<a href="{link_url}" class="read-more">查看全文</a>
</div>
<div class="footer">
<p>这是由RSS-to-Email服务自动发送的订阅更新</p>
<p>如需停止订阅,请回复本邮件</p>
</div>
</body>
</html>
"""
return html
def send_email(title, content, image_url, link_url, email_config):
"""发送邮件,支持内嵌图片和UTF-8中文字符"""
# 创建邮件内容
html_content = create_email_content(title, content, image_url, link_url)
# 创建MIMEMultipart对象
msg = MIMEMultipart('related')
# 设置邮件主题,确保UTF-8编码
msg['Subject'] = Header(title, 'utf-8').encode()
msg['From'] = email_config['from_email']
msg['To'] = email_config['to_email']
# 添加HTML内容,明确指定UTF-8编码
part = MIMEText(html_content, 'html', 'utf-8')
msg.attach(part)
# 如果有图片,尝试内嵌
if image_url and is_valid_url(image_url):
image_data = download_image(image_url)
if image_data:
# 创建图片ID
image_id = f"image_{hash(image_url)}"
# 更新HTML内容以使用内嵌图片
html_content = create_email_content(title, content, image_url, link_url, image_id)
# 替换HTML内容
for part in msg.walk():
if part.get_content_type() == 'text/html':
part.set_payload(html_content, 'utf-8')
break
# 添加图片
img = MIMEImage(image_data)
img.add_header('Content-ID', f'<{image_id}>')
img.add_header('Content-Disposition', 'inline', filename='rss_image.jpg')
msg.attach(img)
# 连接SMTP服务器并发送邮件
try:
print(f"📤 尝试通过 {email_config['smtp_server']}:{email_config['smtp_port']} 发送邮件...")
if email_config['use_ssl']:
context = ssl.create_default_context()
server = smtplib.SMTP_SSL(
email_config['smtp_server'],
email_config['smtp_port'],
context=context
)
else:
server = smtplib.SMTP(
email_config['smtp_server'],
email_config['smtp_port']
)
server.starttls()
server.login(email_config['username'], email_config['password'])
# 发送邮件,确保使用UTF-8编码
server.sendmail(
email_config['from_email'],
email_config['to_email'],
msg.as_string()
)
server.quit()
print(f"✅ 邮件发送成功: {title}")
return True
except Exception as e:
print(f"❌ 邮件发送失败: {str(e)}")
return False
def send_latest_entries(rss_url, count=5, email_config=None):
"""发送最新的N条RSS内容"""
global running
if not email_config:
email_config = {
'smtp_server': '你的smtp服务器地址',
'smtp_port': 465,
'username': '你的邮箱地址',
'password': '密码',
'from_email': '你的邮箱地址',
'to_email': '你的接收地址',
'use_ssl': True
}
if not running:
return 0
print(f"\n{datetime.now().strftime('%H:%M:%S')} - 🔍 获取最新内容...")
entries = parse_rss_feed(rss_url)
if not entries:
print("⚠️ 无法获取RSS内容,跳过本次发送")
return 0
print(f"📰 找到 {len(entries)} 条新内容,准备发送最新的 {min(count, len(entries))} 条")
success_count = 0
for i, entry in enumerate(entries[:count], 1):
if not running:
print("🛑 收到停止信号,中断发送")
break
entry_data = extract_content(entry)
print(f"\n[{i}/{min(count, len(entries))}] {entry_data['title']}")
if send_email(
entry_data['title'],
entry_data['content'],
entry_data['image_url'],
entry_data['link_url'],
email_config
):
success_count += 1
# 避免发送太快
if i < min(count, len(entries)) and running:
print("⏱️ 等待3秒避免发送太快...")
time.sleep(3)
print(f"\n✅ 本次发送完成: 成功 {success_count}/{min(count, len(entries))} 条")
return success_count
def schedule_sender(rss_url, count, interval_minutes, email_config=None):
"""定时发送任务"""
global running
print(f"\n⏰ 定时任务已启动! 每 {interval_minutes} 分钟发送一次最新 {count} 条内容")
print("按 Ctrl+C 停止定时任务\n")
try:
while running:
send_latest_entries(rss_url, count, email_config)
if not running:
break
# 倒计时显示
print(f"\n🕒 下次发送将在 {interval_minutes} 分钟后进行")
for i in range(interval_minutes * 60):
if not running:
break
if i % 30 == 0 and i > 0: # 每30秒显示一次进度
remaining = interval_minutes * 60 - i
print(f"⏳ 剩余时间: {remaining//60}分{remaining%60}秒")
time.sleep(1)
except KeyboardInterrupt:
print("\n\n🛑 检测到键盘中断,正在停止定时任务...")
finally:
running = False
stop_event.set()
def test_connection(email_config):
"""测试SMTP连接"""
print("\n🔍 测试SMTP连接...")
try:
if email_config['use_ssl']:
context = ssl.create_default_context()
server = smtplib.SMTP_SSL(
email_config['smtp_server'],
email_config['smtp_port'],
context=context
)
else:
server = smtplib.SMTP(
email_config['smtp_server'],
email_config['smtp_port']
)
server.starttls()
server.login(email_config['username'], email_config['password'])
server.quit()
print("✅ SMTP连接测试成功!")
return True
except Exception as e:
print(f"❌ 连接失败: {str(e)}")
return False
def main():
global running
running = True
# 配置默认值
DEFAULT_RSS = "https://plink.anyfeeder.com/idaily/today"
DEFAULT_RECIPIENT = "guzhou999@dingtalk.com"
print("\n" + "="*60)
print("📧 RSS-to-Email 服务 v2.0")
print("✨ 中文字符完美支持 | 内嵌图片 | 专业排版")
print("="*60)
# 获取配置
print("\n" + "-"*40)
print("⚙️ 邮件服务配置")
print("-"*40)
# SMTP服务器
smtp_server = get_safe_input(
f"SMTP服务器 (默认: smtp.zoho.com.cn):\n> ",
"smtp.zoho.com.cn"
)
# SMTP端口
try:
smtp_port = int(get_safe_input(
f"SMTP端口 (默认: 465):\n> ",
"465"
))
except ValueError:
smtp_port = 465
print("⚠️ 无效端口,使用默认值465")
# SMTP用户名
smtp_username = get_safe_input(
f"SMTP用户名 ( ):\n> ",
"填你的用户名"
)
# SMTP密码
smtp_password = get_safe_input(
"SMTP密码 (必填):\n> ",
allow_empty=False
)
# 发件人邮箱
from_email = get_safe_input(
f"发件人邮箱 (默认: {smtp_username}):\n> ",
smtp_username
)
# 收件人邮箱
to_email = get_safe_input(
f"收件人邮箱 (默认: {DEFAULT_RECIPIENT}):\n> ",
DEFAULT_RECIPIENT
)
# 配置字典
email_config = {
'smtp_server': smtp_server,
'smtp_port': smtp_port,
'username': smtp_username,
'password': smtp_password,
'from_email': from_email,
'to_email': to_email,
'use_ssl': True if smtp_port == 465 else False
}
# 测试连接
if not test_connection(email_config):
if get_safe_input("\n连接失败,是否继续? (y/n): ", "n").lower() != 'y':
print("😿 退出程序")
return
print("\n" + "-"*40)
print("⚙️ RSS配置")
print("-"*40)
rss_url = get_safe_input(
f"RSS订阅地址\n(默认: {DEFAULT_RSS}):\n> ",
DEFAULT_RSS
)
try:
count = int(get_safe_input("\n每轮发送的最新内容数量 (1-10):\n> ", "3"))
count = max(1, min(10, count)) # 限制在1-10之间
except ValueError:
count = 3
print("⚠️ 无效数字,使用默认值3")
# 选择模式:一次性 or 定时
print("\n" + "-"*40)
print("🕒 发送模式")
print("-"*40)
print("1. 一次性发送 (立即发送一次后退出)")
print("2. 定时自动发送 (按指定间隔循环发送)")
mode = get_safe_input("\n请选择模式 (1/2): ", "1")
if mode == "2":
try:
interval = int(get_safe_input("\n发送间隔(分钟) (5-120):\n> ", "30"))
interval = max(5, min(120, interval)) # 限制在5-120分钟
except ValueError:
interval = 30
print("⚠️ 无效数字,使用默认值30分钟")
# 启动定时任务
schedule_sender(rss_url, count, interval, email_config)
else:
# 一次性发送
print("\n🚀 开始一次性发送任务...")
send_latest_entries(rss_url, count, email_config)
print("\n" + "="*60)
print("✨ 任务完成!感谢使用 RSS-to-Email 服务")
print("="*60)
if __name__ == "__main__":
# 依赖检查
try:
import feedparser
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.header import Header
import ssl
import requests
except ImportError as e:
print("\n" + "="*50)
print("❌ 依赖库未安装! 请运行:")
print(f"pip install feedparser smtplib requests")
print("="*50)
exit(1)
print("\n" + "="*50)
print("📧 RSS-to-Email 服务")
print(f"🕒 当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*50)
try:
main()
except KeyboardInterrupt:
print("\n\n🛑 程序被用户中断")
except Exception as e:
print(f"\n\n🔥 未捕获的异常: {str(e)}")
finally:
running = False
stop_event.set()
print("\n👋 程序已安全退出")
基于Smtp的邮件推送RSS订阅Python脚本
https://xiyue-org.pages.dev/posts/邮件/
作者
Matsuzaka Yuki
发布于
2025-11-22
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时

封面
Sample Song
Sample Artist
封面
Sample Song
Sample Artist
0:00 / 0:00