码执行效果:

 

仅供研究学习phthon代码,不做其它用途。

代码思路:
以前借鉴其它同学的代码,发现下载小说时总是会出现各种问题。

主要解决两大问题:
一是避开网站的反爬虫机制,采用单线程下载。
二是适用于单章节分两页的小说站。


创新点:
一是重试机制采取多轮重试,从第二轮开始只重试下载失败的章节。
二是针对小说内容写在p标签和不在p标签中的分别进行了处理,代码稍加修改即可适用于大多数同类型的小说站。

 

import os
import re
import random
import time
import requests
from lxml import etree
from requests.adapters import HTTPAdapter
import chardet
 
# 用户代理列表
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
]
# 设置连接池大小并设置重定向限制
session = requests.Session()
adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100, max_retries=5)  # 设置连接池大小为100
session.mount('http://', adapter)
session.mount('https://', adapter)
session.max_redirects = 300  # 设置最大重定向次数为300
def get_chaptercontent(chapter_url, temp_file, queue, semaphore, session):
 
    """下载单个章节内容,只尝试一次,同时要处理有一章内容分两页的情况"""
    try:
        time.sleep(3)  # 增加请求间隔,避免被封
        headers = {
            'User-Agent': random.choice(user_agents),
            'Accept-Language': 'en-US,en;q=0.9',
            'Referer': 'https://www.baiduxs.com'
        }
 
        # 尝试获取章节内容
        response = session.get(chapter_url, headers=headers, timeout=50)
        response.raise_for_status()  # 如果请求失败会抛出异常
 
        # 检查页面是否完整加载
        raw_data = response.content
        detected = chardet.detect(raw_data)
        encoding = detected['encoding'] if detected['encoding'] else 'utf-8'
        html = raw_data.decode(encoding, errors='replace')
 
        # 检查是否有章节内容
        if '<div id="content">' not in html:
            print(f"页面内容不完整: {chapter_url}")
            if queue:
                queue.put(None)
            return False
 
        selector = etree.HTML(html)
        chapter_title = selector.xpath('//h1/text()')
 
        # 一般情况下的contents下面是p标签,但不一定都是这样,需要进行多种情况的判断
        contents = selector.xpath('//div[@id="content"]/p/text()')
        flag = False
        # 有的内容是直接放在div下面,没有用p标签,所以用下面的代码处理
        if not contents:
            contents = selector.xpath('//div[@id="content"]/text()')
            if contents and isinstance(contents, list):  # 确保contents不为空且是list类型
                flag = True
        if not chapter_title or not contents:
            print(f"未能找到章节内容: {chapter_url}")
            if queue:
                queue.put(None)
            return False
        # print(f'flag: {flag} 为True表明小说内容用的不是p标签')
 
        # 开始处理一章分多页的情况
        next_page = selector.xpath('//a[@id="A3"]/@href')[0]
        # print(f'next_page: {next_page}')
        # 取next_page的右边数12字符
        char12 = next_page[-12:]
        if char12.find("_") != -1: #网址中包含”_“,说明是单章分多页
            next_page_url = f'{base_url}{next_page}'
            # 尝试获取章节内容
            response = session.get(next_page_url, headers=headers, timeout=60)
            response.raise_for_status()  # 如果请求失败会抛出异常
            # 检查页面是否完整加载
            raw_data = response.content
            detected = chardet.detect(raw_data)
            encoding = detected['encoding'] if detected['encoding'] else 'utf-8'
            html = raw_data.decode(encoding, errors='replace')
            # 检查是否有章节内容
            if '<div id="content">' not in html:
                print(f"第2页页面内容不完整: {chapter_url}")
                if queue:
                    queue.put(None)
                return False
 
            selector = etree.HTML(html)
            # 一般情况下的contents下面是p标签,但不一定都是这样,需要进行多种情况的判断
            contents2 = selector.xpath('//div[@id="content"]/p/text()')
            flag = False
 
            if not contents2:
                contents2 = selector.xpath('//div[@id="content"]/text()')
                if contents2 and isinstance(contents2, list):
                    flag = True
            if not contents2:
                print(f"未能找到本章节第2页内容: {chapter_url}")
                if queue:
                    queue.put(None)
                return False
            else:
                # 把第二页内容合并到第一而内容中
                print("        发现本章节分两页,将合并到同一章节...")
                contents.extend(contents2)
 
        # 拼接章节内容
        chapter_content = '\n'.join(content.strip() for content in contents if content.strip())
 
        # 剔除掉contents中<style>与</style>之间的内容
        chapter_content = re.sub(r'(?is)(?:<style.*?>.*?</style>|</style>)', '', chapter_content).strip()
        # 下一句是把内容中的<br />或者<br>标签替换为换行
        chapter_content = re.sub(r'<br />|<br>', '\n', chapter_content).strip()
        chapter_content = re.sub(r'\n\n', '\n', chapter_content).strip()
        # 把内容中的&nbsp;换成空格
        chapter_content = re.sub(r'&nbsp;', '', chapter_content).strip()
        chapter_title = chapter_title[0]  # 假设只有一个标题
        print(f"\t正在下载:{chapter_title}")
 
        # 写入临时文件
        with open(temp_file, 'w', encoding='utf-8') as f:
            f.write(chapter_title + '\n\n')
            f.write(chapter_content)
 
        if queue:
            queue.put(temp_file)
        return True
 
    except requests.exceptions.RequestException as e:
        print(f"下载章节失败: {chapter_url} - 错误: {str(e)}")
        if queue:
            queue.put(None)
        return False
    except Exception as e:
        print(f"处理章节时发生未知错误: {str(e)}")
        if queue:
            queue.put(None)
        return False
 
 
def download_chapters(chapters_url):
    try:
        # 增加目录页加载等待
        max_wait_attempts = 3
        wait_attempt = 0
        response = None
        html = None
        while wait_attempt < max_wait_attempts:
            try:
                response = session.get(chapters_url, headers={'User-Agent': random.choice(user_agents)}, timeout=30)
                # 使用lxml解析响应文本
                raw_data = response.content
                detected = chardet.detect(raw_data)
                encoding = detected['encoding'] if detected['encoding'] else 'utf-8'
                html = raw_data.decode(encoding, errors='replace')
 
                # 检查是否有章节列表
                if '<div id="list">' in html:
                    break
 
                wait_attempt += 1
                if wait_attempt < max_wait_attempts:
                    print(f"目录页内容未完整加载,等待2秒后重试... (尝试 {wait_attempt}/{max_wait_attempts})")
                    time.sleep(2)
                    response.close()
                    continue
            except requests.exceptions.RequestException as e:
                wait_attempt += 1
                if wait_attempt < max_wait_attempts:
                    print(f"请求失败,等待2秒后重试... (尝试 {wait_attempt}/{max_wait_attempts})")
                    time.sleep(2)
                    if 'response' in locals():
                        response.close()
                    continue
                else:
                    raise e
 
        response.close()  # 确保连接关闭
 
        if response.status_code != 200:
            print(f"未能获取URL: {response.status_code}")
            return
 
        if not html:
            print(f"未能获取URL: {response.status_code}")
            return
        else:
            print("章节网页原始内容已经成功获取: ------------")
 
        selector = etree.HTML(html)
        chapter_links = selector.xpath('//div[@id="list"]/dl/dd/a/@href')
 
        if not chapter_links:
            print("未找到章节链接。")
            return
 
        # 去除前12个最新更新章节,这样可以得到从第一章开始的章节,避免重复
        if len(chapter_links) >= 13:
            chapter_links = chapter_links[12:]
 
        print(f"共找到 {len(chapter_links)} 个章节链接,准备依次下载")
 
 
        # 获取书籍名称
        book_name = selector.xpath('//h1/text()')[0]
        if not book_name:
            print("无法获取书名。")
            return
        # 从页面头部的<meta property="og:novel:author" content="魔性沧月"/>中获取作者
        book_author = selector.xpath('//meta[@property="og:novel:author"]/@content')[0]
        if not book_author:
            book_author = "未知"
 
        book_intro_list = selector.xpath('//div[@id="intro"]/p/text()')
        book_intro = ''.join(book_intro for book_intro in book_intro_list if book_intro.strip())
        if not book_intro:
            book_intro = "无简介"
 
        print(f'\n正在下载小说:{book_name},请稍等……\n')
        # 将作者、书名和简介打印出来
        print(f"作者: {book_author}\n书名: {book_name}\n简介: {book_intro}")
 
        # 保存目录
        save_directory = os.path.join(os.environ["USERPROFILE"], 'Downloads', 'mybooks')
        os.makedirs(save_directory, exist_ok=True)  # 创建保存目录
 
        # 初始化失败章节列表
        failed_chapters = []
        for index, href in enumerate(chapter_links):
            failed_chapters.append({
                'index': index,
                'href': href,
                'retry_count': 0
            })
 
        # 最多下载5轮。第一轮正常下载,失败的章节会加入列表,待下一轮专门下载失败的章节,直到全部下载成功或重试次数达到上限。
        max_download_times = 5
        retry_round = 0
        temp_files = []
 
        while retry_round < max_download_times and len(failed_chapters) > 0:
            retry_round += 1
            print(f"\n=== 开始第 {retry_round} 轮下载 ===")
 
            current_failed = []
            for chapter in failed_chapters:
                index = chapter['index']
                href = chapter['href']
 
                # 拼成完整域名
                chapter_url = f'{base_url}{href}'
                temp_file = os.path.join(save_directory, f'temp_{index:04d}.txt')
 
                print(f"\n开始下载第 {index + 1}/{len(chapter_links)} 章: {chapter_url}")
                try:
                    success = get_chaptercontent(chapter_url, temp_file, None, None, session)
                    if success and os.path.exists(temp_file) and os.path.getsize(temp_file) > 0:
                        temp_files.append(temp_file)
                        print(f"    第 {index + 1} 章下载成功")
                    else:
                        print(f"    第 {index + 1} 章下载失败")
                        chapter['retry_count'] += 1
                        current_failed.append(chapter)
                except Exception as e:
                    print(f"下载第 {index + 1} 章时发生错误: {str(e)}")
                    chapter['retry_count'] += 1
                    current_failed.append(chapter)
 
            failed_chapters = current_failed
            if len(failed_chapters) > 0 and retry_round < max_download_times:
                print(f"\n本轮下载完成,{len(failed_chapters)} 章下载失败,将在下一轮重试...")
                time.sleep(5)  # 等待5秒后重试
 
        # 合并临时文件到主文件
        append_temp_files_to_main(temp_files, save_directory, book_name, book_author, book_intro)
        print(f"\n小说《{book_name}》下载完成,共下载 {len(temp_files)}/{len(chapter_links)} 章")
        if len(failed_chapters) > 0:
            print(f"以下章节下载失败: {[chap['index']+1 for chap in failed_chapters]}")
 
    except requests.exceptions.RequestException as e:
        print(f"获取章节列表时发生错误: {str(e)}")
        return
    except Exception as e:
        print(f"处理章节时发生未知错误: {str(e)}")
        return
 
 
def append_temp_files_to_main(temp_files, save_directory, book_name, book_author, book_intro):
    book_path = os.path.join(save_directory, f'{book_name}-{book_author}.txt')
    with open(book_path, 'w', encoding='utf-8') as main_file:
        main_file.write(f'书名:《{book_name}》\n作者:{book_author}\n简介:{book_intro}\n\n')
        for temp_file in temp_files:
            with open(temp_file, 'r', encoding='utf-8') as tf:
                chapter_text = tf.read().strip()
                if chapter_text:  # 确保章节文本非空
                    main_file.write(chapter_text + '\n\n')
            os.remove(temp_file)  # 删除临时文件
 
 
if __name__ == "__main__":
    base_url = 'https://www.baiduxs.com'
    url = input(f"请输入网站({base_url})内选定小说章节列表页所在页网址:")
    if url == '':
        url = 'https://www.baiduxs.com/read/84347/'
        print(f'你没有输入网址,默认以{url}《玩转大唐》为例进行下载……')
 
    while not url.startswith(base_url):
        print(f"    输入错误! 网址必须以{base_url}开头")
        url = input(f"请输入网站({base_url})内选定小说章节列表页所在页网址:")
 
    start_time = time.time()
    download_chapters(url)
    end_time = time.time()
    print(f'\n总耗时:{end_time - start_time:.2f}秒。\n')
    input("下载完成,小说保存在{用户}/Downloads/mybooks文件夹内,回车退出!")

 

本人也是Python的初学者,代码有不足之处,敬请各位大神批评指正

评论 (0)
嘿,我来帮您