Python文件批量处理与数据转换实战指南

引言

在日常开发中,我们经常需要对大量文件进行批量处理,或者将数据在不同格式之间转换。本文分享几个生产环境中实用的Python脚本,帮助你提升工作效率。

一、批量重命名文件

项目中经常需要统一文件命名规范,手动一个个改名既繁琐又容易出错。下面这个脚本可以按正则表达式批量重命名:

import os
import re

def batch_rename(directory, pattern, replacement):
    """批量重命名文件"""
    count = 0
    for filename in os.listdir(directory):
        old_path = os.path.join(directory, filename)
        if not os.path.isfile(old_path):
            continue
        new_filename = re.sub(pattern, replacement, filename)
        new_path = os.path.join(directory, new_filename)
        if old_path != new_path:
            os.rename(old_path, new_path)
            count += 1
            print(f'Renamed: {filename} -> {new_filename}')
    print(f'Total renamed: {count} files')

# 使用示例
batch_rename('/path/to/project', r'^test_(.+)$', r'test_')

二、CSV与JSON互转

CSV和JSON是常用的数据交换格式,下面是互转函数:

CSV转JSON

import csv, json

def csv_to_json(csv_file, json_file, encoding='utf-8'):
    data = []
    with open(csv_file, 'r', encoding=encoding, newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            data.append({k: v for k, v in row.items() if v.strip()})
    with open(json_file, 'w', encoding=encoding) as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    return len(data)

csv_to_json('data.csv', 'data.json')

JSON转CSV

import csv, json

def json_to_csv(json_file, csv_file, encoding='utf-8'):
    with open(json_file, 'r', encoding=encoding) as f:
        data = json.load(f)
    if not data:
        return 0
    fields = sorted(set().union(*[set(i.keys()) for i in data]))
    with open(csv_file, 'w', encoding=encoding, newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fields)
        writer.writeheader()
        writer.writerows(data)
    return len(data)

json_to_csv('data.json', 'data.csv')

三、图片批量压缩

from PIL import Image
from pathlib import Path

def compress_images(directory, quality=85, max_width=1920):
    supported = {'.jpg', '.jpeg', '.png', '.webp'}
    count = 0
    for path in Path(directory).rglob('*'):
        if path.suffix.lower() not in supported:
            continue
        img = Image.open(path)
        if img.width > max_width:
            ratio = max_width / img.width
            img = img.resize((max_width, int(img.height * ratio)), Image.LANCZOS)
        img.convert('RGB').save(path.with_suffix('.jpg'), 'JPEG', quality=quality, optimize=True)
        count += 1
    return count

compress_images('/path/to/images', quality=80)

四、日志文件分析

import re
from collections import Counter

def analyze_errors(log_file, pattern=r'(ERROR|Exception):\s*(.+?)(?:\n|$)'):
    errors = Counter()
    with open(log_file, 'r') as f:
        for line in f:
            for match in re.finditer(pattern, line):
                errors[match.group(2)[:100]] += 1
    for msg, cnt in errors.most_common(10):
        print(f'[{cnt:4d}] {msg}')

analyze_errors('/var/log/myapp.log')

五、定时备份MySQL数据库

import subprocess, datetime, os, tarfile

def backup_mysql(db_name, db_user, db_password, backup_dir):
    today = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    sql_file = os.path.join(backup_dir, f'{db_name}_{today}.sql')
    with open(sql_file, 'w') as f:
        subprocess.run(['mysqldump', f'-u{db_user}', f'-p{db_password}', db_name], stdout=f, check=True)
    tar_file = os.path.join(backup_dir, f'{db_name}_{today}.tar.gz')
    with tarfile.open(tar_file, 'w:gz') as tar:
        tar.add(sql_file, arcname=os.path.basename(sql_file))
    os.remove(sql_file)
    return tar_file

# crontab: 0 2 * * * python3 /path/to/backup.py

总结

建议将脚本保存为独立模块复用,敏感信息使用环境变量管理。

© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片快捷回复

    暂无评论内容