在处理JSON数据时,经常会遇到格式损坏的情况。下面我将介绍几种在Python中快速修复损坏JSON文件的方法,包括使用现有的json-repair库和自定义修复方案。
![图片[1]_Python中实现JSON修复:快速修复损坏的JSON文件_知途无界](https://zhituwujie.com/wp-content/uploads/2025/11/d2b5ca33bd20251125094824.png)
一、使用json-repair库(推荐)
1. 安装json-repair
pip install json-repair
2. 基本使用示例
import json_repair
from json_repair import repair_json
# 示例1:修复简单的损坏JSON
broken_json = '{"name": "John", "age": 30, "city": "New York"' # 缺少闭合括号
try:
repaired = repair_json(broken_json)
data = json.loads(repaired)
print("修复成功:", data)
except Exception as e:
print(f"修复失败: {e}")
# 示例2:处理更复杂的损坏情况
complex_broken_json = '''
{
"users": [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob" // 缺少闭合引号和逗号
{"id": 3, "name": "Charlie"}
],
"count": 3
}
'''
repaired_complex = repair_json(complex_broken_json)
data = json.loads(repaired_complex)
print("复杂修复结果:", data)
3. 高级用法:处理文件
def repair_json_file(input_path, output_path=None):
"""
修复JSON文件
"""
if output_path is None:
output_path = input_path.replace('.json', '_repaired.json')
try:
with open(input_path, 'r', encoding='utf-8') as f:
broken_content = f.read()
# 尝试直接解析
try:
data = json.loads(broken_content)
print("文件JSON格式正常,无需修复")
return data
except json.JSONDecodeError:
# 使用json-repair修复
repaired_content = repair_json(broken_content)
data = json.loads(repaired_content)
# 保存修复后的内容
with open(output_path, 'w', encoding='utf-8') as f:
f.write(repaired_content)
print(f"JSON修复完成,已保存到: {output_path}")
return data
except FileNotFoundError:
print(f"文件不存在: {input_path}")
return None
except Exception as e:
print(f"修复过程中发生错误: {e}")
return None
# 使用示例
repaired_data = repair_json_file('broken_data.json')
二、自定义JSON修复器
如果json-repair无法满足特定需求,可以创建自定义的修复器:
import json
import re
from typing import Any, Dict, List
class JSONRepair:
def __init__(self):
self.common_fixes = [
self._fix_trailing_commas,
self._fix_missing_quotes,
self._fix_single_quotes,
self._fix_missing_braces,
self._fix_unquoted_keys,
self._fix_comments
]
def repair(self, json_string: str) -> str:
"""修复JSON字符串"""
original = json_string
fixed = json_string
for fix_func in self.common_fixes:
try:
fixed = fix_func(fixed)
except Exception as e:
print(f"修复函数 {fix_func.__name__} 出错: {e}")
continue
# 验证修复结果
try:
json.loads(fixed)
return fixed
except json.JSONDecodeError:
# 如果自动修复失败,尝试手动修复
return self._manual_repair(original)
def _fix_trailing_commas(self, text: str) -> str:
"""移除尾随逗号"""
# 移除对象中的尾随逗号
text = re.sub(r',\s*}', '}', text)
# 移除数组中的尾随逗号
text = re.sub(r',\s*]', ']', text)
return text
def _fix_missing_quotes(self, text: str) -> str:
"""修复缺少引号的值"""
# 修复键名缺少引号的情况(简单情况)
text = re.sub(r'(\{|,)\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:', r'\1"\2":', text)
return text
def _fix_single_quotes(self, text: str) -> str:
"""将单引号转换为双引号"""
# 简单的单引号转双引号(需要处理转义情况)
in_string = False
result = []
escape_next = False
for i, char in enumerate(text):
if escape_next:
result.append(char)
escape_next = False
continue
if char == '\\':
result.append(char)
escape_next = True
continue
if char == "'" and not in_string:
# 检查是否在字符串外
result.append('"')
continue
elif char == "'" and in_string:
result.append('"')
continue
elif char == '"':
in_string = not in_string
result.append(char)
continue
else:
result.append(char)
return ''.join(result)
def _fix_missing_braces(self, text: str) -> str:
"""修复缺少的大括号"""
# 简单的平衡检查
open_braces = text.count('{')
close_braces = text.count('}')
if open_braces > close_braces:
text += '}' * (open_braces - close_braces)
elif close_braces > open_braces:
text = '{' * (close_braces - open_braces) + text
return text
def _fix_unquoted_keys(self, text: str) -> str:
"""修复未引用的键"""
# 匹配 key: value 模式,其中key没有引号
pattern = r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)(\s*:)'
return re.sub(pattern, r'\1"\2"\3', text)
def _fix_comments(self, text: str) -> str:
"""移除注释"""
# 移除单行注释
text = re.sub(r'//.*$', '', text, flags=re.MULTILINE)
# 移除多行注释
text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
return text
def _manual_repair(self, text: str) -> str:
"""手动修复复杂情况"""
# 这里可以添加更复杂的修复逻辑
# 例如:基于上下文的智能修复
# 最简单的回退方案:包装成有效的JSON
try:
# 尝试提取可能的JSON部分
start = text.find('{')
end = text.rfind('}') + 1
if start != -1 and end != -1:
possible_json = text[start:end]
json.loads(possible_json)
return possible_json
except:
pass
# 如果所有方法都失败,返回一个空的JSON对象
return '{}'
# 使用示例
repair_tool = JSONRepair()
# 测试各种损坏的JSON
test_cases = [
# 缺少闭合括号
'{"name": "John", "age": 30, "city": "New York"',
# 尾随逗号
'{"items": [1, 2, 3,], "valid": true,}',
# 单引号
"{'name': 'John', 'age': 30}",
# 未引用的键
'{name: "John", age: 30}',
# 包含注释
'''
{
"name": "John", // 用户名
"age": 30 /* 年龄 */
}
'''
]
for i, broken_json in enumerate(test_cases):
print(f"\n测试用例 {i+1}:")
print(f"原始: {broken_json[:50]}...")
try:
repaired = repair_tool.repair(broken_json)
data = json.loads(repaired)
print(f"修复成功: {data}")
except Exception as e:
print(f"修复失败: {e}")
三、批量修复JSON文件
import os
import glob
from pathlib import Path
class BatchJSONRepairer:
def __init__(self, repair_method='auto'):
self.repair_method = repair_method
if repair_method == 'auto':
self.repairer = JSONRepair()
else:
import json_repair
self.repairer = json_repair
def repair_directory(self, input_dir: str, output_dir: str = None,
extensions=['.json'], backup=True):
"""
批量修复目录中的所有JSON文件
"""
if output_dir is None:
output_dir = input_dir + '_repaired'
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 查找所有JSON文件
search_pattern = os.path.join(input_dir, '**/*')
files = glob.glob(search_pattern, recursive=True)
json_files = [f for f in files if any(f.endswith(ext) for ext in extensions)]
results = {
'total': len(json_files),
'success': 0,
'failed': 0,
'details': []
}
for file_path in json_files:
relative_path = os.path.relpath(file_path, input_dir)
output_path = os.path.join(output_dir, relative_path)
# 创建输出子目录
Path(os.path.dirname(output_path)).mkdir(parents=True, exist_ok=True)
try:
# 备份原文件
if backup:
backup_path = file_path + '.backup'
import shutil
shutil.copy2(file_path, backup_path)
# 修复文件
if self.repair_method == 'auto':
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
repaired_content = self.repairer.repair(content)
# 验证修复结果
json.loads(repaired_content)
# 保存修复后的文件
with open(output_path, 'w', encoding='utf-8') as f:
f.write(repaired_content)
else: # 使用json-repair库
import json_repair
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
repaired_content = json_repair.repair_json(content)
# 验证修复结果
json.loads(repaired_content)
# 保存修复后的文件
with open(output_path, 'w', encoding='utf-8') as f:
f.write(repaired_content)
results['success'] += 1
results['details'].append({
'file': relative_path,
'status': 'success',
'output': output_path
})
print(f"✓ 修复成功: {relative_path}")
except Exception as e:
results['failed'] += 1
results['details'].append({
'file': relative_path,
'status': 'failed',
'error': str(e)
})
print(f"✗ 修复失败: {relative_path} - {e}")
return results
# 使用示例
if __name__ == "__main__":
# 批量修复示例
batch_repairer = BatchJSONRepairer()
results = batch_repairer.repair_directory(
input_dir='./corrupted_json_files',
output_dir='./repaired_json_files',
backup=True
)
print(f"\n批量修复完成:")
print(f"总计: {results['total']} 个文件")
print(f"成功: {results['success']} 个")
print(f"失败: {results['failed']} 个")
四、实用工具和技巧
1. 检测JSON损坏程度
def assess_json_damage(json_string: str) -> Dict[str, Any]:
"""评估JSON损坏程度"""
assessment = {
'length': len(json_string),
'has_opening_brace': '{' in json_string,
'has_closing_brace': '}' in json_string,
'has_opening_bracket': '[' in json_string,
'has_closing_bracket': ']' in json_string,
'quote_count': json_string.count('"'),
'single_quote_count': json_string.count("'"),
'comma_count': json_string.count(','),
'colon_count': json_string.count(':'),
'estimated_damage_level': 'unknown'
}
# 简单的损坏程度评估
issues = []
if not assessment['has_opening_brace'] and not assessment['has_opening_bracket']:
issues.append('missing_main_structure')
if assessment['single_quote_count'] > assessment['quote_count']:
issues.append('excessive_single_quotes')
if json_string.count(',}') > 0 or json_string.count(',]') > 0:
issues.append('trailing_commas')
if len(issues) == 0:
assessment['estimated_damage_level'] = 'minor'
elif len(issues) <= 2:
assessment['estimated_damage_level'] = 'moderate'
else:
assessment['estimated_damage_level'] = 'severe'
assessment['issues'] = issues
return assessment
2. 智能修复策略选择器
def smart_repair_strategy(json_string: str) -> str:
"""根据JSON损坏情况选择修复策略"""
assessment = assess_json_damage(json_string)
if assessment['estimated_damage_level'] == 'minor':
return 'quick_fix'
elif assessment['estimated_damage_level'] == 'moderate':
return 'standard_repair'
else:
return 'aggressive_repair'
总结
- 推荐使用json-repair库:对于大多数情况,
json-repair库是最简单有效的解决方案 - 自定义修复器:当需要特定业务逻辑或标准库无法满足需求时使用
- 批量处理:对于大量损坏的JSON文件,使用批量修复工具提高效率
- 备份重要数据:在修复前务必备份原始文件,防止数据丢失
- 验证修复结果:修复后一定要验证JSON的有效性,确保数据结构正确
这些方法可以帮助你快速有效地修复大多数常见的JSON格式问题。
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END
























暂无评论内容