Python中实现JSON修复:快速修复损坏的JSON文件

在处理JSON数据时,经常会遇到格式损坏的情况。下面我将介绍几种在Python中快速修复损坏JSON文件的方法,包括使用现有的json-repair库和自定义修复方案。

图片[1]_Python中实现JSON修复:快速修复损坏的JSON文件_知途无界

一、使用json-repair库(推荐)

1. 安装json-repair

pip install json-repair

2. 基本使用示例

import json_repair
from json_repair import repair_json

# 示例1:修复简单的损坏JSON
broken_json = '{"name": "John", "age": 30, "city": "New York"'  # 缺少闭合括号
try:
    repaired = repair_json(broken_json)
    data = json.loads(repaired)
    print("修复成功:", data)
except Exception as e:
    print(f"修复失败: {e}")

# 示例2:处理更复杂的损坏情况
complex_broken_json = '''
{
    "users": [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"  // 缺少闭合引号和逗号
        {"id": 3, "name": "Charlie"}
    ],
    "count": 3
}
'''

repaired_complex = repair_json(complex_broken_json)
data = json.loads(repaired_complex)
print("复杂修复结果:", data)

3. 高级用法:处理文件

def repair_json_file(input_path, output_path=None):
    """
    修复JSON文件
    """
    if output_path is None:
        output_path = input_path.replace('.json', '_repaired.json')
    
    try:
        with open(input_path, 'r', encoding='utf-8') as f:
            broken_content = f.read()
        
        # 尝试直接解析
        try:
            data = json.loads(broken_content)
            print("文件JSON格式正常,无需修复")
            return data
        except json.JSONDecodeError:
            # 使用json-repair修复
            repaired_content = repair_json(broken_content)
            data = json.loads(repaired_content)
            
            # 保存修复后的内容
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(repaired_content)
            
            print(f"JSON修复完成,已保存到: {output_path}")
            return data
            
    except FileNotFoundError:
        print(f"文件不存在: {input_path}")
        return None
    except Exception as e:
        print(f"修复过程中发生错误: {e}")
        return None

# 使用示例
repaired_data = repair_json_file('broken_data.json')

二、自定义JSON修复器

如果json-repair无法满足特定需求,可以创建自定义的修复器:

import json
import re
from typing import Any, Dict, List

class JSONRepair:
    def __init__(self):
        self.common_fixes = [
            self._fix_trailing_commas,
            self._fix_missing_quotes,
            self._fix_single_quotes,
            self._fix_missing_braces,
            self._fix_unquoted_keys,
            self._fix_comments
        ]
    
    def repair(self, json_string: str) -> str:
        """修复JSON字符串"""
        original = json_string
        fixed = json_string
        
        for fix_func in self.common_fixes:
            try:
                fixed = fix_func(fixed)
            except Exception as e:
                print(f"修复函数 {fix_func.__name__} 出错: {e}")
                continue
        
        # 验证修复结果
        try:
            json.loads(fixed)
            return fixed
        except json.JSONDecodeError:
            # 如果自动修复失败,尝试手动修复
            return self._manual_repair(original)
    
    def _fix_trailing_commas(self, text: str) -> str:
        """移除尾随逗号"""
        # 移除对象中的尾随逗号
        text = re.sub(r',\s*}', '}', text)
        # 移除数组中的尾随逗号
        text = re.sub(r',\s*]', ']', text)
        return text
    
    def _fix_missing_quotes(self, text: str) -> str:
        """修复缺少引号的值"""
        # 修复键名缺少引号的情况(简单情况)
        text = re.sub(r'(\{|,)\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', text)
        return text
    
    def _fix_single_quotes(self, text: str) -> str:
        """将单引号转换为双引号"""
        # 简单的单引号转双引号(需要处理转义情况)
        in_string = False
        result = []
        escape_next = False
        
        for i, char in enumerate(text):
            if escape_next:
                result.append(char)
                escape_next = False
                continue
                
            if char == '\\':
                result.append(char)
                escape_next = True
                continue
                
            if char == "'" and not in_string:
                # 检查是否在字符串外
                result.append('"')
                continue
            elif char == "'" and in_string:
                result.append('"')
                continue
            elif char == '"':
                in_string = not in_string
                result.append(char)
                continue
            else:
                result.append(char)
        
        return ''.join(result)
    
    def _fix_missing_braces(self, text: str) -> str:
        """修复缺少的大括号"""
        # 简单的平衡检查
        open_braces = text.count('{')
        close_braces = text.count('}')
        
        if open_braces > close_braces:
            text += '}' * (open_braces - close_braces)
        elif close_braces > open_braces:
            text = '{' * (close_braces - open_braces) + text
        
        return text
    
    def _fix_unquoted_keys(self, text: str) -> str:
        """修复未引用的键"""
        # 匹配 key: value 模式,其中key没有引号
        pattern = r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)(\s*:)'
        return re.sub(pattern, r'\1"\2"\3', text)
    
    def _fix_comments(self, text: str) -> str:
        """移除注释"""
        # 移除单行注释
        text = re.sub(r'//.*$', '', text, flags=re.MULTILINE)
        # 移除多行注释
        text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
        return text
    
    def _manual_repair(self, text: str) -> str:
        """手动修复复杂情况"""
        # 这里可以添加更复杂的修复逻辑
        # 例如:基于上下文的智能修复
        
        # 最简单的回退方案:包装成有效的JSON
        try:
            # 尝试提取可能的JSON部分
            start = text.find('{')
            end = text.rfind('}') + 1
            if start != -1 and end != -1:
                possible_json = text[start:end]
                json.loads(possible_json)
                return possible_json
        except:
            pass
        
        # 如果所有方法都失败,返回一个空的JSON对象
        return '{}'

# 使用示例
repair_tool = JSONRepair()

# 测试各种损坏的JSON
test_cases = [
    # 缺少闭合括号
    '{"name": "John", "age": 30, "city": "New York"',
    # 尾随逗号
    '{"items": [1, 2, 3,], "valid": true,}',
    # 单引号
    "{'name': 'John', 'age': 30}",
    # 未引用的键
    '{name: "John", age: 30}',
    # 包含注释
    '''
    {
        "name": "John", // 用户名
        "age": 30 /* 年龄 */
    }
    '''
]

for i, broken_json in enumerate(test_cases):
    print(f"\n测试用例 {i+1}:")
    print(f"原始: {broken_json[:50]}...")
    try:
        repaired = repair_tool.repair(broken_json)
        data = json.loads(repaired)
        print(f"修复成功: {data}")
    except Exception as e:
        print(f"修复失败: {e}")

三、批量修复JSON文件

import os
import glob
from pathlib import Path

class BatchJSONRepairer:
    def __init__(self, repair_method='auto'):
        self.repair_method = repair_method
        if repair_method == 'auto':
            self.repairer = JSONRepair()
        else:
            import json_repair
            self.repairer = json_repair
    
    def repair_directory(self, input_dir: str, output_dir: str = None, 
                        extensions=['.json'], backup=True):
        """
        批量修复目录中的所有JSON文件
        """
        if output_dir is None:
            output_dir = input_dir + '_repaired'
        
        Path(output_dir).mkdir(parents=True, exist_ok=True)
        
        # 查找所有JSON文件
        search_pattern = os.path.join(input_dir, '**/*')
        files = glob.glob(search_pattern, recursive=True)
        
        json_files = [f for f in files if any(f.endswith(ext) for ext in extensions)]
        
        results = {
            'total': len(json_files),
            'success': 0,
            'failed': 0,
            'details': []
        }
        
        for file_path in json_files:
            relative_path = os.path.relpath(file_path, input_dir)
            output_path = os.path.join(output_dir, relative_path)
            
            # 创建输出子目录
            Path(os.path.dirname(output_path)).mkdir(parents=True, exist_ok=True)
            
            try:
                # 备份原文件
                if backup:
                    backup_path = file_path + '.backup'
                    import shutil
                    shutil.copy2(file_path, backup_path)
                
                # 修复文件
                if self.repair_method == 'auto':
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                    
                    repaired_content = self.repairer.repair(content)
                    
                    # 验证修复结果
                    json.loads(repaired_content)
                    
                    # 保存修复后的文件
                    with open(output_path, 'w', encoding='utf-8') as f:
                        f.write(repaired_content)
                        
                else:  # 使用json-repair库
                    import json_repair
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                    
                    repaired_content = json_repair.repair_json(content)
                    
                    # 验证修复结果
                    json.loads(repaired_content)
                    
                    # 保存修复后的文件
                    with open(output_path, 'w', encoding='utf-8') as f:
                        f.write(repaired_content)
                
                results['success'] += 1
                results['details'].append({
                    'file': relative_path,
                    'status': 'success',
                    'output': output_path
                })
                print(f"✓ 修复成功: {relative_path}")
                
            except Exception as e:
                results['failed'] += 1
                results['details'].append({
                    'file': relative_path,
                    'status': 'failed',
                    'error': str(e)
                })
                print(f"✗ 修复失败: {relative_path} - {e}")
        
        return results

# 使用示例
if __name__ == "__main__":
    # 批量修复示例
    batch_repairer = BatchJSONRepairer()
    
    results = batch_repairer.repair_directory(
        input_dir='./corrupted_json_files',
        output_dir='./repaired_json_files',
        backup=True
    )
    
    print(f"\n批量修复完成:")
    print(f"总计: {results['total']} 个文件")
    print(f"成功: {results['success']} 个")
    print(f"失败: {results['failed']} 个")

四、实用工具和技巧

1. 检测JSON损坏程度

def assess_json_damage(json_string: str) -> Dict[str, Any]:
    """评估JSON损坏程度"""
    assessment = {
        'length': len(json_string),
        'has_opening_brace': '{' in json_string,
        'has_closing_brace': '}' in json_string,
        'has_opening_bracket': '[' in json_string,
        'has_closing_bracket': ']' in json_string,
        'quote_count': json_string.count('"'),
        'single_quote_count': json_string.count("'"),
        'comma_count': json_string.count(','),
        'colon_count': json_string.count(':'),
        'estimated_damage_level': 'unknown'
    }
    
    # 简单的损坏程度评估
    issues = []
    if not assessment['has_opening_brace'] and not assessment['has_opening_bracket']:
        issues.append('missing_main_structure')
    if assessment['single_quote_count'] > assessment['quote_count']:
        issues.append('excessive_single_quotes')
    if json_string.count(',}') > 0 or json_string.count(',]') > 0:
        issues.append('trailing_commas')
    
    if len(issues) == 0:
        assessment['estimated_damage_level'] = 'minor'
    elif len(issues) <= 2:
        assessment['estimated_damage_level'] = 'moderate'
    else:
        assessment['estimated_damage_level'] = 'severe'
    
    assessment['issues'] = issues
    return assessment

2. 智能修复策略选择器

def smart_repair_strategy(json_string: str) -> str:
    """根据JSON损坏情况选择修复策略"""
    assessment = assess_json_damage(json_string)
    
    if assessment['estimated_damage_level'] == 'minor':
        return 'quick_fix'
    elif assessment['estimated_damage_level'] == 'moderate':
        return 'standard_repair'
    else:
        return 'aggressive_repair'

总结

  1. 推荐使用json-repair库​:对于大多数情况,json-repair库是最简单有效的解决方案
  2. 自定义修复器​:当需要特定业务逻辑或标准库无法满足需求时使用
  3. 批量处理​:对于大量损坏的JSON文件,使用批量修复工具提高效率
  4. 备份重要数据​:在修复前务必备份原始文件,防止数据丢失
  5. 验证修复结果​:修复后一定要验证JSON的有效性,确保数据结构正确

这些方法可以帮助你快速有效地修复大多数常见的JSON格式问题。

© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞50 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容