zhaxnb
Published on 2024-10-15 / 15 Visits
0
1

磁力链接的思考

说明
  • 除特别标注外都是claude生成

什么是磁力链接

磁力链接是一种用于点对点文件共享的特殊 URI(统一资源标识符)格式。它是 BitTorrent 协议的一部分,用于识别和定位可以通过点对点网络下载的内容。以下是关于磁力链接的一些关键信息:

  1. 定义: 磁力链接是一个包含文件标识信息的文本字符串,而不是直接指向文件位置的链接。

结构: 一个典型的磁力链接看起来像这样:

magnet:?xt=urn:btih:HASH&dn=NAME&tr=TRACKER

  • magnet:? 是所有磁力链接的开始

  • xt=urn:btih:HASH 是文件的唯一标识符(通常是 SHA-1 哈希)

  • dn=NAME 是可选的,表示下载的文件名

  • tr=TRACKER 是可选的,指定 tracker 服务器的地址

  1. 工作原理:

    • 当你点击或输入磁力链接时,它会被发送到支持 BitTorrent 协议的客户端软件。

    • 客户端使用链接中的信息在 P2P 网络中搜索拥有该文件的用户(称为"peers")。

    • 一旦找到peers,客户端就开始从这些用户那里下载文件片段。

  2. 优势:

    • 不需要中央服务器存储 .torrent 文件。

    • 更容易分享,因为它只是一个文本字符串。

    • 可以包含多个 tracker,增加了找到文件的几率。

  3. 与 .torrent 文件的区别:

    • .torrent 文件包含更详细的元数据,如文件列表和大小。

    • 磁力链接更简洁,只包含必要的信息来开始搜索和下载。

  4. 安全性考虑:

    • 磁力链接本身不包含实际内容,但它们可能指向非法或有害的内容。

    • 使用磁力链接时应谨慎,确保遵守版权法和其他相关法规。

  5. 使用示例: 要使用磁力链接,通常只需在支持的客户端(如 uTorrent、qBittorrent 等)中粘贴链接即可开始下载过程。

技术应用: 在编程中,可以解析磁力链接来提取信息或验证其有效性。例如,使用 Python:

from urllib.parse import urlparse, parse_qs

def parse_magnet(magnet_link):
    parsed = urlparse(magnet_link)
    return parse_qs(parsed.query)

magnet = "magnet:?xt=urn:btih:HASH&dn=NAME&tr=TRACKER"
info = parse_magnet(magnet)
print(info)

这段代码会解析磁力链接并返回一个包含其各个组成部分的字典。

磁力链接在文件共享领域非常流行,但重要的是要认识到它们可能被用于分享受版权保护或其他非法内容。在使用磁力链接时,请确保您遵守所有相关的法律和道德准则。

从哪里获取磁力链接

P2P网站

怎么获取

import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin, urlparse

def get_links(url):
    try:
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 获取所有链接
        links = [urljoin(url, link.get('href')) for link in soup.find_all('a', href=True)]
        
        # 获取磁力链接
        magnet_links = re.findall(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]*', response.text)
        
        return links, magnet_links
    except Exception as e:
        print(f"Error processing {url}: {str(e)}")
        return [], []

def crawl(start_url, max_depth=3):
    visited = set()
    magnets = set()
    to_visit = [(start_url, 0)]
    base_host = urlparse(start_url).netloc

    while to_visit:
        current_url, depth = to_visit.pop(0)
        
        if depth > max_depth:
            continue
        
        if current_url in visited:
            continue
        
        print(f"Crawling: {current_url}")
        visited.add(current_url)
        
        links, new_magnets = get_links(current_url)
        magnets.update(new_magnets)
        
        for link in links:
            if urlparse(link).netloc == base_host and link not in visited:
                to_visit.append((link, depth + 1))
    
    return visited, magnets

def save_to_file(filename, data):
    with open(filename, 'w', encoding='utf-8') as f:
        for item in data:
            f.write(f"{item}\n")

if __name__ == "__main__":
    start_url = input("Enter the starting URL: ")
    visited_links, magnet_links = crawl(start_url)
    
    save_to_file('visited_links.txt', visited_links)
    save_to_file('magnet_links.txt', magnet_links)
    
    print(f"Crawling completed. Found {len(visited_links)} links and {len(magnet_links)} magnet links.")
import re
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time

# 设置请求头避免被封
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

# 保存磁力链接和普通链接的列表
magnet_links = set()
normal_links = set()

# 爬取函数
def crawl(url, visited):
    # 避免重复访问
    if url in visited:
        return
    visited.add(url)
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
    except requests.RequestException as e:
        print(f"Error accessing {url}: {e}")
        return

    # 提取磁力链接和普通链接
    page_magnet_links = set(re.findall(r'magnet:\?xt=urn:btih:[a-zA-Z0-9]+', response.text))
    global magnet_links
    magnet_links.update(page_magnet_links)

    for link in soup.find_all('a', href=True):
        href = link['href']
        # 将相对链接转为绝对链接
        full_url = urljoin(url, href)
        # 解析链接以获取host信息
        parsed_url = urlparse(full_url)
        # 如果是与输入网站相同域名的链接,则加入普通链接并递归爬取
        if parsed_url.netloc == urlparse(url).netloc:
            if full_url not in visited:
                normal_links.add(full_url)
                crawl(full_url, visited)

# 主函数
def main(start_url):
    visited = set()
    crawl(start_url, visited)
    
    # 保存结果
    with open('magnet_links.txt', 'w') as f:
        for link in magnet_links:
            f.write(link + '\n')
    
    with open('normal_links.txt', 'w') as f:
        for link in normal_links:
            f.write(link + '\n')
    
    print(f"磁力链接共采集到 {len(magnet_links)} 个,普通链接共采集到 {len(normal_links)} 个。")

if __name__ == "__main__":
    start_url = input("请输入起始网站URL:")
    main(start_url)
    print("采集完成!")
由chatgpt生成

验证可用性

import libtorrent as lt
import time

def check_magnet(magnet_link, timeout=30):
    ses = lt.session()
    params = lt.parse_magnet_uri(magnet_link)
    handle = ses.add_torrent(params)

    start_time = time.time()
    while not handle.has_metadata():
        time.sleep(1)
        if time.time() - start_time > timeout:
            return False  # 超时,种子可能不可用

    return True  # 成功获取元数据,种子可能可用

# 使用示例
magnet_link = "magnet:?xt=urn:btih:..."  # 替换为实际的磁力链接
if check_magnet(magnet_link):
    print("种子可用")
else:
    print("种子不可用或验证超时")

连接到 torrent 网络并获取种子的元数据。如果在指定的超时时间内成功获取元数据,可以认为种子是可用的。

import requests
from urllib.parse import urlparse

def check_tracker(tracker_url, timeout=5):
    try:
        response = requests.get(tracker_url, timeout=timeout)
        return response.status_code == 200
    except requests.RequestException:
        return False

def extract_trackers(magnet_link):
    parsed = urlparse(magnet_link)
    return [tracker for tracker in parsed.query.split('&') if tracker.startswith('tr=')]

# 使用示例
magnet_link = "magnet:?xt=urn:btih:...&tr=http://tracker1.com&tr=http://tracker2.com"
trackers = extract_trackers(magnet_link)

for tracker in trackers:
    if check_tracker(tracker):
        print(f"{tracker} 在线")
    else:
        print(f"{tracker} 离线或无响应")

尝试连接到磁力链接中列出的 trackers,检查它们是否在线和响应。

磁力链接下载

import libtorrent as lt
import time
import sys

def download_magnet(magnet_link, save_path="./"):
    ses = lt.session()
    params = lt.parse_magnet_uri(magnet_link)
    params.save_path = save_path
    handle = ses.add_torrent(params)

    print("Downloading metadata...")
    while not handle.has_metadata():
        time.sleep(1)
    print("Got metadata, starting torrent download...")

    while handle.status().state != lt.torrent_status.seeding:
        s = handle.status()
        state_str = ['queued', 'checking', 'downloading metadata',
                     'downloading', 'finished', 'seeding', 'allocating']
        print('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' %
              (s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000,
               s.num_peers, state_str[s.state]))
        time.sleep(5)

    print("Download complete")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python script.py <magnet_link>")
        sys.exit(1)
    
    magnet_link = sys.argv[1]
    download_magnet(magnet_link)
# 设置上传和下载速度限制(单位:字节/秒)
ses.set_upload_rate_limit(100000)  # 100 KB/s
ses.set_download_rate_limit(200000)  # 200 KB/s

# 设置最大连接数
ses.set_max_connections(50)

# 控制做种时间(例如,做种10分钟后停止)
seed_time = 600  # 10分钟
start_time = time.time()
while (time.time() - start_time) < seed_time:
    # 做种逻辑
    time.sleep(1)

其他P2P增强功能


Comment