从RSS源中获取动画集信息、通过SSH将URL转换为磁力链接进行解析(linux),然后使用迅雷应用程序开始下载的过程。在执行过程中,用户会被提示输入一些参数。这是Web抓取、SSH自动化和Windows GUI自动化的组合应用
import requests
import xml.etree.ElementTree as ET
import re
import paramiko
import datetime
import time
from pywinauto.application import Application
import time
import os
import win32gui
import win32con
def ssh_execute(hostname, port, username, password, command):
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到远程服务器
client.connect(hostname, port, username, password)
# 获取一个传输通道
channel = client.get_transport().open_session()
# 执行命令
channel.exec_command(command)
output = ''
while True: # 持续读取输出
if channel.exit_status_ready(): # 如果命令已经执行完毕
break
# 读取部分输出
output += channel.recv(1024).decode()
# print(output, end='')
# return output
# 关闭连接
client.close()
return output
# 替换为实际的服务器信息和命令
# ssh_console(['python3 /home/yys/script/zcl.py http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e'])
def replace_sensitive_chars(folder_name):
sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in sensitive_chars:
folder_name = folder_name.replace(char, '_')
return folder_name
def fetch_rss_data(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9",
}
response = requests.get(url, headers=headers)
lis = []
if response.status_code == 200:
root = ET.fromstring(response.content.decode('utf-8'))
items = root.findall(".//item")
for item in items:
enclosure_url = item.find("enclosure").get("url")
item_source_code = ET.tostring(item, encoding="utf-8").decode("utf-8")
tilte = ''
title_element = item.find("./title")
if title_element is not None:
# print("主标题:",title_element.text)
tilte += replace_sensitive_chars(title_element.text)
r_list = re.compile('<p><strong>(.*?)</strong></p>', re.S).findall(item_source_code)
if r_list and len(r_list[0]) < 100:
title += replace_sensitive_chars(r_list[0])
# print("副标题:", title)
else:
pass
# print("未找到副标题")
# print("链接:", enclosure_url)
# print("-" * 50)
lis.append({'title': tilte, 'url': enclosure_url})
return lis
else:
print(f"无法获取 RSS 源。状态码: {response.status_code}")
def activate_xunlei_window():
# 替换成迅雷窗口的标题
xunlei_title = "迅雷"
# 查找迅雷窗口
xunlei_handle = win32gui.FindWindow(None, xunlei_title)
if xunlei_handle != 0:
# 将窗口激活到最前面
win32gui.ShowWindow(xunlei_handle, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(xunlei_handle)
print(f"成功激活迅雷窗口:{xunlei_title}")
else:
print(f"未找到迅雷窗口:{xunlei_title}")
def download_with_xunlei(url, download_folder_path):
# 启动迅雷应用程序
# app = Application(backend='uia').start(r"D:\app\Thunder\Program\Thunder.exe")
os.popen(r"D:\app\Thunder\Program\Thunder.exe")
activate_xunlei_window()
app = Application(backend='uia').connect(title="迅雷")
# # 等待迅雷主窗口出现
main_window = app.top_window()
main_window.wait('visible')
# 点击新建按钮
new_button = main_window.child_window(title="新建", control_type="Button")
new_button.click()
# 获取新建下载窗口
new_download_window = app.window(title="新建任务面板", control_type="Pane")
# print(new_download_window.print_control_identifiers())
# 输入下载链接
url_edit = new_download_window.child_window(found_index=0, control_type="Edit")
url_edit.set_text(url)
# 输入文件夹路径
folder_path_edit = new_download_window.child_window(found_index=1, control_type="Edit")
text = folder_path_edit.get_value().strip()
print('默认路径', text)
download_folder_path = download_folder_path.strip()
if text != download_folder_path:
folder_path_edit.set_text(download_folder_path)
print("修改路径为:", download_folder_path)
else:
print("无需修改路径", text)
# 点击下载按钮
download_button = new_download_window.child_window(title="立即下载", control_type="Button")
download_button.click()
# 等待下载完成
print("点击下载")
# app = Application(backend='uia').connect(title="迅雷")
for i in range(2):
try:
new_download_window.child_window(title="立即下载", control_type="Button").click()
break
except:
new_download_window.type_keys("{ENTER}")
pass
# Example usage:
if __name__ == "__main__":
hostname = '192.168.31.166'
port = 22 # SSH默认端口是22
username = ''
password = ''
url = "https://www.comicat.org/rss-METALLIC+ROUGE%5D%5B.xml"
ret = fetch_rss_data(url)
# print(ret)
set_file_name = ''
for one_data in ret:
url = one_data.get('url')
title = one_data.get('title')
print("标题=%s" % title)
file_name = input("收入标题名,按回车继续,(如不输入和上次名字相同):").strip()
if file_name == "c":
print("跳过")
continue
elif file_name:
set_file_name = file_name
print("设置文件名=【%s】" % set_file_name)
if not set_file_name:
print("请输入标题名字")
exit()
command = 'python3 /home/yys/script/zcl.py "%s"' % url
# 执行命令并获取结果,连接转磁力链
output = ssh_execute(hostname, port, username, password, command)
url = output.split('【')[-1].split('】')[0]
print(url)
folder_path = os.path.join(r"Z:\jellyfin\media\anime", set_file_name) # 替换为你的文件夹路径
download_with_xunlei(url, folder_path)
使用libtorrent 解析磁力链,这个库在windows不能运行,
sudo apt install python3-libtorrent
import libtorrent as lt
import requests
import sys
def torrent_to_magnet(torrent_url):
# 下载种子文件
response = requests.get(torrent_url, stream=True)
torrent_file = response.content
# 加载种子文件到libtorrent
info = lt.torrent_info(lt.bdecode(torrent_file))
# 生成磁力链接
magnet_link = lt.make_magnet_uri(info)
return magnet_link
arg = sys.argv
# 替换为实际的种子文件链接
torrent_link = "http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e"
if len(arg) >= 2:
torrent_link = str(arg[1])
# 转换为磁力链接
magnet_link = torrent_to_magnet(torrent_link)
# 打印磁力链接
print("磁力链接:【%s】" % magnet_link)
这个代码不能在windowns运行,libtorrent装上提示缺少dll库未解决,在mac和linux可以运行
这段代码主要用于从指定的RSS源中获取条目信息,然后使用Aria2进行下载。
主要函数和功能:
-
fetch_rss_data(url):
- 通过指定的URL获取RSS源的内容。
- 解析XML格式的RSS数据,提取每个条目的标题和下载链接。
- 将标题中的敏感字符(如斜杠、冒号等)替换为下划线。
-
get_bt_link(url):
- 从给定的URL下载种子文件,并使用libtorrent库解析种子文件。
- 生成磁力链接(magnet link)用于下载。
-
download_with_aria2_rpc(password, ip, url, download_path):
- 使用Aria2的RPC接口进行下载。
- 构建JSON-RPC请求,包括Aria2的认证token、下载链接和下载目录。
- 发送POST请求到Aria2的RPC服务器,开始下载任务。
主程序执行流程:
- 主程序通过指定的RSS源地址获取所有的RSS条目信息。
- 对每个条目,从条目中提取下载链接。
- 使用
get_bt_link(url)获取磁力链接。 - 调用
download_with_aria2_rpc(password, ip, url, download_path)开始下载任务到指定的下载路径。
注意事项:
- 代码中使用了第三方库requests和libtorrent,需要确保这些库已安装。
- Aria2需要预先配置并启用RPC功能,并确保主机和端口号与代码中的
ip和端口号匹配。 - 确保在执行前替换所有实际的服务器信息、下载路径、Aria2 RPC密码和其他参数。
这段代码适合用于自动化从RSS源下载资源,并且通过Aria2实现高效的下载管理。
import requests
import xml.etree.ElementTree as ET
import re
import paramiko
import datetime
import time
import time
import os
def ssh_execute(hostname, port, username, password, command):
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到远程服务器
client.connect(hostname, port, username, password)
# 获取一个传输通道
channel = client.get_transport().open_session()
# 执行命令
channel.exec_command(command)
output = ''
while True: # 持续读取输出
if channel.exit_status_ready(): # 如果命令已经执行完毕
break
# 读取部分输出
output += channel.recv(1024).decode()
# print(output, end='')
# return output
# 关闭连接
client.close()
return output
# 替换为实际的服务器信息和命令
# ssh_console(['python3 /home/yys/script/zcl.py http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e'])
def replace_sensitive_chars(folder_name):
sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in sensitive_chars:
folder_name = folder_name.replace(char, '_')
return folder_name
def fetch_rss_data(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9",
}
response = requests.get(url, headers=headers)
lis = []
if response.status_code == 200:
root = ET.fromstring(response.content.decode('utf-8'))
items = root.findall(".//item")
for item in items:
enclosure_url = item.find("enclosure").get("url")
item_source_code = ET.tostring(item, encoding="utf-8").decode("utf-8")
tilte = ''
title_element = item.find("./title")
if title_element is not None:
# print("主标题:",title_element.text)
tilte += replace_sensitive_chars(title_element.text)
r_list = re.compile('<p><strong>(.*?)</strong></p>', re.S).findall(item_source_code)
if r_list and len(r_list[0]) < 100:
title += replace_sensitive_chars(r_list[0])
# print("副标题:", title)
else:
pass
# print("未找到副标题")
# print("链接:", enclosure_url)
# print("-" * 50)
lis.append({'title': tilte, 'url': enclosure_url})
return lis
else:
print(f"无法获取 RSS 源。状态码: {response.status_code}")
def activate_xunlei_window():
# 替换成迅雷窗口的标题
xunlei_title = "迅雷"
# 查找迅雷窗口
xunlei_handle = win32gui.FindWindow(None, xunlei_title)
if xunlei_handle != 0:
# 将窗口激活到最前面
win32gui.ShowWindow(xunlei_handle, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(xunlei_handle)
print(f"成功激活迅雷窗口:{xunlei_title}")
else:
print(f"未找到迅雷窗口:{xunlei_title}")
def download_with_xunlei(url, download_folder_path):
from pywinauto.application import Application
import win32gui
import win32con
# 启动迅雷应用程序
# app = Application(backend='uia').start(r"D:\app\Thunder\Program\Thunder.exe")
os.popen(r"D:\app\Thunder\Program\Thunder.exe")
activate_xunlei_window()
app = Application(backend='uia').connect(title="迅雷")
# # 等待迅雷主窗口出现
main_window = app.top_window()
main_window.wait('visible')
# 点击新建按钮
new_button = main_window.child_window(title="新建", control_type="Button")
new_button.click()
# 获取新建下载窗口
new_download_window = app.window(title="新建任务面板", control_type="Pane")
# print(new_download_window.print_control_identifiers())
# 输入下载链接
url_edit = new_download_window.child_window(found_index=0, control_type="Edit")
url_edit.set_text(url)
# 输入文件夹路径
folder_path_edit = new_download_window.child_window(found_index=1, control_type="Edit")
text = folder_path_edit.get_value().strip()
print('默认路径', text)
download_folder_path = download_folder_path.strip()
if text != download_folder_path:
folder_path_edit.set_text(download_folder_path)
print("修改路径为:", download_folder_path)
else:
print("无需修改路径", text)
# 点击下载按钮
download_button = new_download_window.child_window(title="立即下载", control_type="Button")
download_button.click()
# 等待下载完成
print("点击下载")
# app = Application(backend='uia').connect(title="迅雷")
for i in range(2):
try:
new_download_window.child_window(title="立即下载", control_type="Button").click()
break
except:
new_download_window.type_keys("{ENTER}")
pass
import requests
import json
def download_with_aria2_rpc(password, ip, url, download_path):
rpc_url = f"http://{ip}:6800/jsonrpc"
headers = {'Content-Type': 'application/json'}
# JSON-RPC payload to add new URI
payload = {
"jsonrpc": "2.0",
"method": "aria2.addUri",
"id": "1",
"params": [
f"token:{password}",
[url],
{"dir": download_path}
]
}
response = requests.post(rpc_url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
result = response.json()
if 'result' in result:
print("Download started successfully. GID:", result['result'])
else:
print("Failed to start download. Response:", result)
else:
print(f"Failed to connect to aria2 RPC server. Status code: {response.status_code}")
def get_bt_link(url):
import libtorrent as lt
import requests
import sys
# 下载种子文件
response = requests.get(url, stream=True)
torrent_file = response.content
# 加载种子文件到libtorrent
info = lt.torrent_info(lt.bdecode(torrent_file))
# 生成磁力链接
magnet_link = lt.make_magnet_uri(info)
return magnet_link
# Example usage:
if __name__ == "__main__":
# rss源地址
url = "https://www.comicat.org/rss-%5BGM-Team%5D%5B%E5%9B%BD%E6%BC%AB%5D%5B%E6%96%97%E7%BD%97%E5%A4%A7%E9%99%86%E2%85%A1+%E7%BB%9D%E4%B8%96%E5%94%90%E9%97%A8%5D%5BSoul+Land+%E2%85%A1%EF%BC%9AThe+Peerless+Tang+Clan%5D%5B2023%5D%5B56%5D%5BAVC%5D%5BGB%5D%5B1080P%5D.xml"
ret = fetch_rss_data(url) # 获取所有链接
# araia2
password = "123456" # aria2 RPC 密钥
ip = "192.168.31.113"
set_file_name = '斗罗大陆Ⅱ 绝世唐门' # 自动创建文件夹
download_path = "/jellyfin/media/anime/" + set_file_name # 这个安装了aria2的电脑的地址
for one_data in ret:
url = one_data.get('url')
title = one_data.get('title')
print("标题=%s" % title)
output = ''
for i in range(10):
try:
output = get_bt_link(url) # 获取磁力链接
break
except Exception as e:
print(e)
time.sleep(2)
url = output.split('【')[-1].split('】')[0]
print(url)
# 示例调用
download_with_aria2_rpc( password, ip, url, download_path)
aria2直接超链接下载
import requests
import xml.etree.ElementTree as ET
import re
import datetime
import time
import time
import os
def ssh_execute(hostname, port, username, password, command):
import paramiko
# 创建SSH客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到远程服务器
client.connect(hostname, port, username, password)
# 获取一个传输通道
channel = client.get_transport().open_session()
# 执行命令
channel.exec_command(command)
output = ''
while True: # 持续读取输出
if channel.exit_status_ready(): # 如果命令已经执行完毕
break
# 读取部分输出
output += channel.recv(1024).decode()
# print(output, end='')
# return output
# 关闭连接
client.close()
return output
# 替换为实际的服务器信息和命令
# ssh_console(['python3 /home/yys/script/zcl.py http://v2.uploadbt.com/?r=down&hash=7e53dc22cc08eb7681723885e5548642edb5271e'])
def replace_sensitive_chars(folder_name):
sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in sensitive_chars:
folder_name = folder_name.replace(char, '_')
return folder_name
def fetch_rss_data(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9",
}
response = requests.get(url, headers=headers)
lis = []
if response.status_code == 200:
root = ET.fromstring(response.content.decode('utf-8'))
items = root.findall(".//item")
for item in items:
enclosure_url = item.find("enclosure").get("url")
item_source_code = ET.tostring(item, encoding="utf-8").decode("utf-8")
tilte = ''
title_element = item.find("./title")
if title_element is not None:
# print("主标题:",title_element.text)
tilte += replace_sensitive_chars(title_element.text)
r_list = re.compile('<p><strong>(.*?)</strong></p>', re.S).findall(item_source_code)
if r_list and len(r_list[0]) < 100:
title += replace_sensitive_chars(r_list[0])
# print("副标题:", title)
else:
pass
# print("未找到副标题")
# print("链接:", enclosure_url)
# print("-" * 50)
lis.append({'title': tilte, 'url': enclosure_url})
return lis
else:
print(f"无法获取 RSS 源。状态码: {response.status_code}")
def activate_xunlei_window():
# 替换成迅雷窗口的标题
xunlei_title = "迅雷"
# 查找迅雷窗口
xunlei_handle = win32gui.FindWindow(None, xunlei_title)
if xunlei_handle != 0:
# 将窗口激活到最前面
win32gui.ShowWindow(xunlei_handle, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(xunlei_handle)
print(f"成功激活迅雷窗口:{xunlei_title}")
else:
print(f"未找到迅雷窗口:{xunlei_title}")
def download_with_xunlei(url, download_folder_path):
from pywinauto.application import Application
import win32gui
import win32con
# 启动迅雷应用程序
# app = Application(backend='uia').start(r"D:\app\Thunder\Program\Thunder.exe")
os.popen(r"D:\app\Thunder\Program\Thunder.exe")
activate_xunlei_window()
app = Application(backend='uia').connect(title="迅雷")
# # 等待迅雷主窗口出现
main_window = app.top_window()
main_window.wait('visible')
# 点击新建按钮
new_button = main_window.child_window(title="新建", control_type="Button")
new_button.click()
# 获取新建下载窗口
new_download_window = app.window(title="新建任务面板", control_type="Pane")
# print(new_download_window.print_control_identifiers())
# 输入下载链接
url_edit = new_download_window.child_window(found_index=0, control_type="Edit")
url_edit.set_text(url)
# 输入文件夹路径
folder_path_edit = new_download_window.child_window(found_index=1, control_type="Edit")
text = folder_path_edit.get_value().strip()
print('默认路径', text)
download_folder_path = download_folder_path.strip()
if text != download_folder_path:
folder_path_edit.set_text(download_folder_path)
print("修改路径为:", download_folder_path)
else:
print("无需修改路径", text)
# 点击下载按钮
download_button = new_download_window.child_window(title="立即下载", control_type="Button")
download_button.click()
# 等待下载完成
print("点击下载")
# app = Application(backend='uia').connect(title="迅雷")
for i in range(2):
try:
new_download_window.child_window(title="立即下载", control_type="Button").click()
break
except:
new_download_window.type_keys("{ENTER}")
pass
import requests
import json
def download_with_aria2_rpc(password, ip, url, download_path):
rpc_url = f"http://{ip}:6800/jsonrpc"
headers = {'Content-Type': 'application/json'}
# JSON-RPC payload to add new URI
payload = {
"jsonrpc": "2.0",
"method": "aria2.addUri",
"id": "1",
"params": [
f"token:{password}",
[url],
{"dir": download_path}
]
}
response = requests.post(rpc_url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
result = response.json()
if 'result' in result:
print("Download started successfully. GID:", result['result'])
else:
print("Failed to start download. Response:", result)
else:
print(f"Failed to connect to aria2 RPC server. Status code: {response.status_code}")
def get_bt_link(url):
import libtorrent as lt
import requests
import sys
# 下载种子文件
response = requests.get(url, stream=True)
torrent_file = response.content
# 加载种子文件到libtorrent
info = lt.torrent_info(lt.bdecode(torrent_file))
# 生成磁力链接
magnet_link = lt.make_magnet_uri(info)
return magnet_link
# Example usage:
if __name__ == "__main__":
# magnet_url = 'magnet:?xt=urn:btih:f5f7eac306a68fba3d6146188845fddd823d9c00&dn=%5BGM-Team%5D%5B%E5%9B%BD%E6%BC%AB%5D%5B%E6%96%97%E7%A0%B4%E8%8B%8D%E7%A9%B9%20%E7%AC%AC5%E5%AD%A3%5D%5BFights%20Break%20Sphere%20%E2%85%A4%5D%5B2022%5D%5B103%5D%5BHEVC%5D%5BGB%5D%5B4K%5D&tr=http%3A%2F%2Fnyaa.tracker.wf%3A7777%2Fannounce&tr=udp%3A%2F%2Fopen.stealth.si%3A80%2Fannounce&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337%2Fannounce&tr=udp%3A%2F%2Fexodus.desync.com%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.torrent.eu.org%3A451%2Fannounce'
magnet_url = ""
# araia2 信息
password = "123456" # aria2 RPC 密钥
ip = "192.168.31.63"
set_file_name = "斗破苍穹/第五季" # 自动创建文件夹
download_path = "/jellyfin/media/anime/" + set_file_name # 这个安装了aria2的电脑的地址
if not magnet_url:
# rss源地址
url = "https://www.comicat.org/rss-%5BGM-Team%5D%5B%E5%9B%BD%E6%BC%AB%5D%5B%E6%96%97%E7%A0%B4%E8%8B%8D%E7%A9%B9+%E7%AC%AC5%E5%AD%A3%5D%5BFights+Break+Sphere+%E2%85%A4%5D%5B2022%5D%5B%2A%5D%5BAVC%5D%5BGB%5D%5B1080P%5D.xml"
ret = fetch_rss_data(url) # 获取所有链接
# araia2
for one_data in ret:
url = one_data.get('url')
title = one_data.get('title')
print("标题=%s" % title)
output = ''
for i in range(10):
try:
output = get_bt_link(url) # 获取磁力链接
break
except Exception as e:
print(e)
time.sleep(2)
magnet_url = output.split('【')[-1].split('】')[0]
print(magnet_url)
download_with_aria2_rpc( password, ip, magnet_url, download_path)
# 示例调用
else:
print(magnet_url)
download_with_aria2_rpc( password, ip, magnet_url, download_path)
qbittorrent下载
pip install requests libtorrent python-qbittorrent
import requests
import xml.etree.ElementTree as ET
import re
import time
# --- 确保必要的库已安装 ---
# pip install requests libtorrent python-qbittorrent
try:
from qbittorrent import Client
except ImportError:
print("错误: 'python-qbittorrent' 库未找到。")
print("请使用以下命令安装: pip install python-qbittorrent")
exit()
try:
import libtorrent as lt
except ImportError:
print("错误: 'libtorrent' 库未找到。")
print("请参考其官方文档进行安装。例如,在 Ubuntu/Debian 上运行: sudo apt-get install python3-libtorrent")
exit()
def replace_sensitive_chars(folder_name):
"""替换在文件名中非法的字符"""
sensitive_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in sensitive_chars:
folder_name = folder_name.replace(char, '_')
return folder_name
def fetch_rss_data(url):
"""
从指定的 RSS 源 URL 获取标题和种子链接。
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "zh-CN,zh;q=0.9",
}
print(f"正在从 RSS 源获取数据: {url}")
try:
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status() # 如果状态码不是 200,则引发异常
except requests.exceptions.RequestException as e:
print(f"无法获取 RSS 源。错误: {e}")
return []
lis = []
try:
root = ET.fromstring(response.content.decode('utf-8'))
items = root.findall(".//item")
print(f"在 RSS 源中找到 {len(items)} 个项目。")
for item in items:
title_element = item.find("./title")
enclosure_element = item.find("enclosure")
if title_element is not None and enclosure_element is not None:
enclosure_url = enclosure_element.get("url")
title_text = replace_sensitive_chars(title_element.text)
lis.append({'title': title_text, 'url': enclosure_url})
return lis
except ET.ParseError as e:
print(f"解析 RSS XML 时出错: {e}")
return []
def get_bt_link(url):
"""
将种子文件的 URL 转换为磁力链接。
"""
try:
response = requests.get(url, stream=True, timeout=15)
response.raise_for_status()
torrent_file = response.content
info = lt.torrent_info(lt.bdecode(torrent_file))
magnet_link = lt.make_magnet_uri(info)
return magnet_link
except (requests.exceptions.RequestException, RuntimeError) as e:
print(f"获取或解析种子文件时出错: {url} - {e}")
return None
def download_with_qbittorrent(qb_url, qb_username, qb_password, magnet_url, download_path, category_name):
"""
使用 qBittorrent 的 Web API 添加下载任务。
"""
try:
qb = Client(qb_url)
print(f"正在登录 qBittorrent: {qb_url}")
qb.login(qb_username, qb_password)
print(f"正在添加任务到 qBittorrent...")
print(f" - 磁力链接: {magnet_url[:70]}...")
print(f" - 保存路径: {download_path}")
print(f" - 分类: {category_name}")
result = qb.download_from_link(magnet_url, savepath=download_path, category=category_name)
if result == 'Ok.':
print("成功将任务添加到 qBittorrent。")
else:
print(f"无法将任务添加到 qBittorrent。响应: {result}")
except Exception as e:
print(f"连接到 qBittorrent 或添加任务时出错: {e}")
if __name__ == "__main__":
# ========================== 用户配置区 ==========================
# --- 请根据您的实际情况修改以下变量 ---
# 1. 手动指定磁力链接 (如果此处有链接,则优先下载此链接,并忽略RSS)
# - 留空 ("") 以使用 RSS 订阅模式
# - 填入磁力链接以使用手动下载模式
MANUAL_MAGNET_URL = ""
# MANUAL_MAGNET_URL = "magnet:?xt=urn:btih:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # 示例
# 2. qBittorrent Web UI 信息
QB_URL = "http://192.168.31.207:8080/" # 您的 qBittorrent Web UI 地址
QB_USERNAME = "admin" # 您的 qBittorrent 用户名
QB_PASSWORD = "your_password" # 您的 qBittorrent 密码
# 3. RSS 源地址 (仅在 MANUAL_MAGNET_URL 为空时使用)
RSS_URL = "https://www.comicat.org/rss-%5BGM-Team%5D+%E9%BE%99%E6%97%8F+S02.xml"
# 4. Jellyfin 下载目录设置(如果报错,给读写权限)
JELLYFIN_BASE_PATH = "/mnt/nas/share/jellyfin/media/动漫/"
# 动漫节目的文件夹名称,例如 "龙族/第二季" 或 "斗破苍穹/第五季"
# 这将被附加到 JELLYFIN_BASE_PATH 后面
SET_FILE_NAME = "龙族/第二季"
# ========================== 脚本执行区 ==========================
# 检查 SET_FILE_NAME 是否已配置
if not SET_FILE_NAME:
print("错误: 'SET_FILE_NAME' 不能为空!请指定动漫节目的保存文件夹。")
exit()
# 构建完整的下载路径和分类
download_path = JELLYFIN_BASE_PATH + SET_FILE_NAME
category_name = SET_FILE_NAME.split('/')[0] # 使用根文件夹作为分类名
# --- 模式判断 ---
# 模式一:如果手动指定了磁力链接,则直接下载
if MANUAL_MAGNET_URL:
print("--- 检测到手动磁力链接,进入手动下载模式 ---")
download_with_qbittorrent(
qb_url=QB_URL,
qb_username=QB_USERNAME,
qb_password=QB_PASSWORD,
magnet_url=MANUAL_MAGNET_URL,
download_path=download_path,
category_name=category_name
)
# 模式二:如果磁力链接为空,则进入 RSS 订阅模式
else:
print("--- 未指定手动磁力链接,进入 RSS 订阅模式 ---")
# 步骤 1: 从 RSS 源获取下载列表
download_list = fetch_rss_data(RSS_URL)
if not download_list:
print("未能从 RSS 源获取任何项目,脚本退出。")
else:
print("\n--- 开始处理 RSS 下载任务 ---")
for item in download_list:
torrent_url = item.get('url')
title = item.get('title')
print(f"\n处理项目: {title}")
# 步骤 2: 将种子的 URL 转换为磁力链接
magnet_link = None
for i in range(3): # 最多重试3次
magnet_link = get_bt_link(torrent_url)
if magnet_link:
break
else:
print(f"获取磁力链接失败,将在2秒后重试... (第 {i+1}/3 次)")
time.sleep(2)
if not magnet_link:
print(f"无法获取磁力链接,跳过项目: {title}")
continue
print("成功获取磁力链接。")
# 步骤 3: 调用 qBittorrent 添加下载
download_with_qbittorrent(
qb_url=QB_URL,
qb_username=QB_USERNAME,
qb_password=QB_PASSWORD,
magnet_url=magnet_link,
download_path=download_path,
category_name=category_name
)
time.sleep(1) # 短暂等待,避免过于频繁地请求 API
print("\n--- 所有 RSS 任务处理完毕 ---")
评论区(0 条)
发表评论⏳ 加载编辑器…