Python实现页面链接提取的完整方案

一、基础实现方案

1.1 使用requests+BeautifulSoup

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse

def extract_links(url):
    try:
        # 发送HTTP请求
        headers = {'User-Agent': 'Mozilla/5.0'}
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        
        # 解析HTML内容
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 提取并处理所有链接
        base_url = response.url
        links = set()
        for tag in soup.find_all(['a', 'link', 'img', 'script', 'iframe']):
            attr = 'href' if tag.name in ['a', 'link'] else 'src'
            if attr in tag.attrs:
                link = urljoin(base_url, tag[attr])
                if is_valid_url(link):
                    links.add(link)
        
        return sorted(links)
    
    except Exception as e:
        print(f"Error extracting links: {e}")
        return []

def is_valid_url(url):
    """验证URL是否合法"""
    parsed = urlparse(url)
    return bool(parsed.scheme) and bool(parsed.netloc)
图片[1]_Python实现页面链接提取的完整方案_知途无界

1.2 功能增强版

def enhanced_extract_links(url, max_depth=1, current_depth=0, visited=None):
    if visited is None:
        visited = set()
    
    if current_depth > max_depth or url in visited:
        return []
    
    visited.add(url)
    links = extract_links(url)
    
    # 多线程处理子链接(示例)
    if current_depth < max_depth:
        from concurrent.futures import ThreadPoolExecutor
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = []
            for link in links:
                futures.append(executor.submit(
                    enhanced_extract_links, 
                    link, max_depth, current_depth+1, visited
                ))
            
            for future in futures:
                links.extend(future.result())
    
    return list(set(links))

二、高级实现方案

2.1 使用Scrapy框架

import scrapy
from scrapy.crawler import CrawlerProcess

class LinkExtractorSpider(scrapy.Spider):
    name = 'link_extractor'
    custom_settings = {
        'DEPTH_LIMIT': 2,
        'CONCURRENT_REQUESTS': 10,
        'ROBOTSTXT_OBEY': True
    }
    
    def __init__(self, start_url=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_urls = [start_url] if start_url else []
        self.links = set()
    
    def parse(self, response):
        # 提取当前页面的所有链接
        for href in response.css('a::attr(href), link::attr(href), img::attr(src), script::attr(src)'):
            link = response.urljoin(href.get())
            if self.is_valid_url(link):
                self.links.add(link)
                yield {'url': link}
                
                # 跟踪链接(根据深度限制)
                if response.meta.get('depth', 0) < self.custom_settings['DEPTH_LIMIT']:
                    yield response.follow(link, self.parse)
    
    def is_valid_url(self, url):
        parsed = urlparse(url)
        return bool(parsed.scheme) and bool(parsed.netloc)

def scrapy_extract_links(url):
    process = CrawlerProcess(settings={
        'LOG_LEVEL': 'ERROR',
        'FEED_FORMAT': 'json',
        'FEED_URI': 'links.json'
    })
    
    spider = LinkExtractorSpider(start_url=url)
    process.crawl(spider)
    process.start()
    
    return list(spider.links)

2.2 使用Playwright(支持动态加载)

from playwright.sync_api import sync_playwright
from urllib.parse import urlparse

def dynamic_extract_links(url):
    links = set()
    
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        
        try:
            page.goto(url, timeout=15000)
            
            # 等待页面完全加载
            page.wait_for_load_state('networkidle')
            
            # 获取所有链接元素
            elements = page.query_selector_all('a, link, img, script, iframe')
            
            for element in elements:
                attr = 'href' if element.get_attribute('href') else 'src'
                link = element.get_attribute(attr)
                if link:
                    full_url = page.evaluate('(url) => new URL(url, document.baseURI).href', link)
                    if is_valid_url(full_url):
                        links.add(full_url)
                        
        except Exception as e:
            print(f"Error extracting dynamic links: {e}")
        finally:
            browser.close()
    
    return sorted(links)

三、功能扩展模块

3.1 链接过滤与分类

def categorize_links(links):
    categorized = {
        'internal': [],
        'external': [],
        'images': [],
        'scripts': [],
        'documents': [],
        'other': []
    }
    
    doc_extensions = ['.pdf', '.doc', '.docx', '.xls', '.ppt']
    
    for link in links:
        parsed = urlparse(link)
        path = parsed.path.lower()
        
        if any(path.endswith(ext) for ext in doc_extensions):
            categorized['documents'].append(link)
        elif path.endswith(('.jpg', '.png', '.gif', '.webp')):
            categorized['images'].append(link)
        elif path.endswith('.js'):
            categorized['scripts'].append(link)
        elif parsed.netloc == urlparse(base_url).netloc:
            categorized['internal'].append(link)
        else:
            categorized['external'].append(link)
    
    return categorized

3.2 链接质量分析

import requests
from datetime import datetime

def analyze_links(links):
    results = []
    
    for link in links[:50]:  # 限制检查数量
        try:
            start = datetime.now()
            response = requests.head(link, allow_redirects=True, timeout=5)
            latency = (datetime.now() - start).total_seconds() * 1000
            
            result = {
                'url': link,
                'status': response.status_code,
                'latency_ms': round(latency, 2),
                'size_kb': int(response.headers.get('content-length', 0)) / 1024,
                'type': response.headers.get('content-type', 'unknown')
            }
            
        except Exception as e:
            result = {
                'url': link,
                'error': str(e),
                'status': 'failed'
            }
        
        results.append(result)
    
    return results

四、完整命令行工具

4.1 参数化脚本

import argparse
import json

def main():
    parser = argparse.ArgumentParser(description='网页链接提取工具')
    parser.add_argument('url', help='目标网页URL')
    parser.add_argument('--depth', type=int, default=1, help='爬取深度')
    parser.add_argument('--dynamic', action='store_true', help='使用动态渲染')
    parser.add_argument('--output', help='输出文件路径')
    parser.add_argument('--analyze', action='store_true', help='执行链接分析')
    
    args = parser.parse_args()
    
    if args.dynamic:
        print("使用动态渲染模式...")
        links = dynamic_extract_links(args.url)
    else:
        print("使用静态解析模式...")
        links = enhanced_extract_links(args.url, max_depth=args.depth)
    
    if args.analyze:
        print("执行链接分析...")
        results = analyze_links(links)
    else:
        results = [{'url': url} for url in links]
    
    if args.output:
        with open(args.output, 'w') as f:
            json.dump(results, f, indent=2)
        print(f"结果已保存到 {args.output}")
    else:
        print(json.dumps(results, indent=2))

if __name__ == '__main__':
    main()

4.2 使用示例

# 静态解析(默认)
python link_extractor.py https://example.com --depth 2

# 动态渲染模式
python link_extractor.py https://example.com --dynamic

# 保存结果并分析
python link_extractor.py https://example.com --output links.json --analyze

五、性能优化技巧

5.1 异步IO实现

import aiohttp
import asyncio
from bs4 import BeautifulSoup

async def async_extract_links(url):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url, timeout=10) as response:
                html = await response.text()
                soup = BeautifulSoup(html, 'html.parser')
                
                links = set()
                for tag in soup.find_all(['a', 'link', 'img', 'script', 'iframe']):
                    attr = 'href' if tag.name in ['a', 'link'] else 'src'
                    if attr in tag.attrs:
                        link = urljoin(url, tag[attr])
                        if is_valid_url(link):
                            links.add(link)
                
                return sorted(links)
        
        except Exception as e:
            print(f"Error: {e}")
            return []

async def async_main(urls):
    tasks = [async_extract_links(url) for url in urls]
    return await asyncio.gather(*tasks)

5.2 缓存机制

from functools import lru_cache
import requests

@lru_cache(maxsize=100)
def get_page_content(url):
    try:
        response = requests.get(url, timeout=10)
        return response.text
    except:
        return None

def cached_extract_links(url):
    html = get_page_content(url)
    if not html:
        return []
    
    soup = BeautifulSoup(html, 'html.parser')
    # ... 提取链接逻辑 ...

六、异常处理与日志

6.1 健壮性增强

import logging
logging.basicConfig(level=logging.INFO)

def robust_extract_links(url):
    try:
        # 验证URL格式
        parsed = urlparse(url)
        if not all([parsed.scheme, parsed.netloc]):
            raise ValueError("Invalid URL format")
        
        # 设置超时和重试
        for attempt in range(3):
            try:
                response = requests.get(url, timeout=(3, 10))
                response.raise_for_status()
                break
            except requests.exceptions.RequestException as e:
                if attempt == 2:
                    raise
                logging.warning(f"Attempt {attempt+1} failed: {e}")
                time.sleep(1)
        
        # 内容类型检查
        content_type = response.headers.get('content-type', '')
        if 'text/html' not in content_type:
            raise ValueError(f"Unsupported content type: {content_type}")
        
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # ... 提取链接逻辑 ...
        
    except Exception as e:
        logging.error(f"Failed to extract links from {url}: {str(e)}")
        return []

七、可视化输出

7.1 控制台表格输出

from tabulate import tabulate

def print_link_table(links):
    table_data = []
    for i, link in enumerate(links[:20], 1):  # 限制显示数量
        table_data.append([i, link])
    
    print(tabulate(table_data, headers=['No.', 'URL'], tablefmt='grid'))

7.2 生成交互式HTML报告

def generate_html_report(links, filename='links_report.html'):
    html = """
    <html>
    <head>
        <title>Link Extraction Report</title>
        <style>
            table { width: 100%; border-collapse: collapse; }
            th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
            tr:hover { background-color: #f5f5f5; }
        </style>
    </head>
    <body>
        <h1>Extracted Links Report</h1>
        <p>Total links found: {count}</p>
        <table>
            <tr><th>No.</th><th>URL</th></tr>
            {rows}
        </table>
    </body>
    </html>
    """
    
    rows = ""
    for i, link in enumerate(links, 1):
        rows += f'<tr><td>{i}</td><td><a href="{link}" target="_blank">{link}</a></td></tr>'
    
    with open(filename, 'w') as f:
        f.write(html.format(count=len(links), rows=rows))
    
    print(f"HTML report generated: {filename}")

最佳实践建议​:

  1. 静态页面使用BeautifulSoup方案,动态页面选择Playwright
  2. 大规模抓取时采用Scrapy框架
  3. 添加适当的延迟和随机性避免被封禁
  4. 遵守robots.txt和网站使用条款
  5. 对敏感数据实施匿名化处理

扩展方向​:

  1. 集成OCR识别图片中的链接
  2. 添加机器学习模型识别垃圾/恶意链接
  3. 实现分布式爬虫架构
  4. 开发浏览器插件版本
© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞81 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容