Python 文件批量处理实战:10个高效自动化脚本技巧,让你的工作效率翻倍

在日常开发和运维工作中,文件处理是最常见的任务之一:批量重命名、格式转换、目录整理、日志分析……这些重复性工作如果手动操作,既耗时又容易出错。Python 凭借其丰富的标准库和简洁的语法,是处理这类任务的最佳工具。本文整理了 10 个真实可用的 Python 文件处理脚本,覆盖从基础到进阶的常见场景。

1. 批量重命名文件

按规则批量重命名文件夹中的所有文件,支持前缀、后缀、序号等多种模式。

import os
from pathlib import Path

def batch_rename(folder: str, prefix: str = "", suffix: str = "", start: int = 1):
    folder_path = Path(folder)
    files = sorted([f for f in folder_path.iterdir() if f.is_file()])
    for idx, file in enumerate(files, start=start):
        ext = file.suffix
        new_name = f"{prefix}{idx:04d}{suffix}{ext}"
        new_path = folder_path / new_name
        file.rename(new_path)
        print(f"  {file.name} -> {new_name}")
    print(f"共重命名 {len(files)} 个文件")

# batch_rename("./photos", prefix="img_", start=1)

2. 递归查找指定类型文件

在目录树中递归搜索特定扩展名的文件,并统计总大小。

from pathlib import Path

def find_files(root: str, extensions: list, min_size_kb: float = 0) -> list:
    result = []
    total_size = 0
    for path in Path(root).rglob("*"):
        if path.is_file() and path.suffix.lower() in extensions:
            size_kb = path.stat().st_size / 1024
            if size_kb >= min_size_kb:
                result.append(path)
                total_size += size_kb
    print(f"找到 {len(result)} 个文件,总大小: {total_size:.2f} KB")
    return result

# 查找所有 Python 和 JS 文件(大于 1KB)
files = find_files(".", [".py", ".js"], min_size_kb=1)
for f in files[:5]:
    print(f"  {f}")

3. 文件内容批量替换

在多个文件中批量查找并替换文本内容,常用于代码重构或配置更新。

import re
from pathlib import Path

def batch_replace(folder, pattern, replacement, extensions=None, use_regex=False):
    if extensions is None:
        extensions = ['.txt', '.py', '.js', '.ts', '.html', '.css', '.md', '.json']
    changed_files = 0
    total_replacements = 0
    for path in Path(folder).rglob("*"):
        if not path.is_file() or path.suffix not in extensions:
            continue
        try:
            original = path.read_text(encoding='utf-8')
        except UnicodeDecodeError:
            continue
        if use_regex:
            new_content, count = re.subn(pattern, replacement, original)
        else:
            count = original.count(pattern)
            new_content = original.replace(pattern, replacement)
        if count > 0:
            path.write_text(new_content, encoding='utf-8')
            print(f"  {path}: 替换 {count} 处")
            changed_files += 1
            total_replacements += count
    print(f"共修改 {changed_files} 个文件,替换 {total_replacements} 处")

# batch_replace("./src", "http://old-api.com", "https://new-api.com", ['.py', '.js'])

4. 按日期整理文件到子目录

将文件按修改日期自动归类到 YYYY-MM 格式的子目录中,非常适合整理下载文件夹或照片库。

import shutil
from pathlib import Path
from datetime import datetime

def organize_by_date(source_folder: str, dest_folder: str = None, date_format: str = "%Y-%m"):
    source = Path(source_folder)
    dest = Path(dest_folder) if dest_folder else source
    moved = 0
    for file in source.iterdir():
        if not file.is_file():
            continue
        mtime = datetime.fromtimestamp(file.stat().st_mtime)
        date_dir = dest / mtime.strftime(date_format)
        date_dir.mkdir(parents=True, exist_ok=True)
        target = date_dir / file.name
        counter = 1
        while target.exists():
            target = date_dir / f"{file.stem}_{counter}{file.suffix}"
            counter += 1
        shutil.move(str(file), str(target))
        print(f"  {file.name} -> {date_dir.name}/")
        moved += 1
    print(f"共整理 {moved} 个文件")

# organize_by_date("/Users/you/Downloads")

5. 重复文件检测

通过 MD5 哈希找出重复文件,帮助清理磁盘空间。

import hashlib
from pathlib import Path
from collections import defaultdict

def get_file_hash(filepath: Path, chunk_size: int = 8192) -> str:
    md5 = hashlib.md5()
    with open(filepath, 'rb') as f:
        while chunk := f.read(chunk_size):
            md5.update(chunk)
    return md5.hexdigest()

def find_duplicates(folder: str) -> dict:
    hash_map = defaultdict(list)
    for path in Path(folder).rglob("*"):
        if path.is_file():
            hash_map[get_file_hash(path)].append(path)
    duplicates = {h: paths for h, paths in hash_map.items() if len(paths) > 1}
    total_wasted = 0
    print(f"发现 {len(duplicates)} 组重复文件:")
    for file_hash, paths in duplicates.items():
        sizes = [p.stat().st_size for p in paths]
        wasted = sum(sizes[1:])
        total_wasted += wasted
        print(f"  [{file_hash[:8]}...] {len(paths)} 个副本,浪费 {wasted/1024:.1f} KB")
        for p in paths:
            print(f"    {p}")
    print(f"可释放空间: {total_wasted/1024/1024:.2f} MB")
    return duplicates

# find_duplicates("/path/to/folder")

6. CSV 文件批量合并与格式转换

将多个 CSV 文件合并为一个,或在 CSV 和 JSON 之间互相转换。

import csv
import json
from pathlib import Path

def merge_csv_files(folder: str, output: str, encoding: str = 'utf-8-sig'):
    csv_files = sorted(Path(folder).glob("*.csv"))
    if not csv_files:
        print("未找到 CSV 文件")
        return
    total_rows = 0
    with open(output, 'w', newline='', encoding=encoding) as outfile:
        writer = None
        for csv_file in csv_files:
            with open(csv_file, 'r', encoding='utf-8-sig') as infile:
                reader = csv.DictReader(infile)
                if writer is None:
                    writer = csv.DictWriter(outfile, fieldnames=reader.fieldnames)
                    writer.writeheader()
                rows = list(reader)
                writer.writerows(rows)
                total_rows += len(rows)
                print(f"  {csv_file.name}: {len(rows)} 行")
    print(f"合并完成,共 {total_rows} 行 -> {output}")

def csv_to_json(csv_path: str, json_path: str):
    with open(csv_path, 'r', encoding='utf-8-sig') as f:
        data = list(csv.DictReader(f))
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    print(f"转换完成: {len(data)} 条记录 -> {json_path}")

# merge_csv_files("./data", "./merged.csv")
# csv_to_json("./data.csv", "./data.json")

7. Nginx 日志分析

解析 Nginx 访问日志,统计 IP 访问频率、状态码分布、热门 URL 等信息。

import re
from collections import Counter

NGINX_PATTERN = re.compile(
    r'(?P\S+) \S+ \S+ \[[^\]]+\] '
    r'"(?P\S+) (?P\S+) \S+" '
    r'(?P\d{3}) (?P\d+)'
)

def analyze_nginx_log(log_path: str, top_n: int = 10):
    ip_counter = Counter()
    url_counter = Counter()
    status_counter = Counter()
    total_bytes = 0

    with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            m = NGINX_PATTERN.match(line)
            if not m:
                continue
            ip_counter[m.group('ip')] += 1
            url_counter[m.group('url')] += 1
            status_counter[m.group('status')] += 1
            total_bytes += int(m.group('size'))

    total = sum(ip_counter.values())
    print(f"总请求数: {total:,} | 总流量: {total_bytes/1024/1024:.2f} MB")
    print(f"
Top {top_n} IP:")
    for ip, count in ip_counter.most_common(top_n):
        print(f"  {ip:<20} {count:>8,} 次 ({count/total*100:.1f}%)")
    print(f"
Top {top_n} URL:")
    for url, count in url_counter.most_common(top_n):
        print(f"  {url[:50]:<50} {count:>8,} 次")
    print(f"
状态码分布:")
    for status, count in sorted(status_counter.items()):
        print(f"  {status}: {count:,}")

# analyze_nginx_log("/var/log/nginx/access.log")

8. 文件监控与自动分类

使用 watchdog 库监控目录变化,当有新文件时自动触发分类处理,适合构建自动化工作流。

# pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import shutil, time
from pathlib import Path

class AutoProcessor(FileSystemEventHandler):
    def __init__(self, watch_dir: str, output_dir: str):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)

    def on_created(self, event):
        if event.is_directory:
            return
        src = Path(event.src_path)
        print(f"检测到新文件: {src.name}")
        if src.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif']:
            category = 'images'
        elif src.suffix.lower() in ['.pdf', '.doc', '.docx']:
            category = 'documents'
        elif src.suffix.lower() in ['.zip', '.tar', '.gz']:
            category = 'archives'
        else:
            category = 'others'
        dest = self.output_dir / category / src.name
        dest.parent.mkdir(exist_ok=True)
        shutil.copy2(src, dest)
        print(f"  -> {category}/")

def start_watcher(watch_dir: str, output_dir: str):
    handler = AutoProcessor(watch_dir, output_dir)
    observer = Observer()
    observer.schedule(handler, watch_dir, recursive=False)
    observer.start()
    print(f"开始监控: {watch_dir}")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# start_watcher("/Users/you/Downloads", "/Users/you/Sorted")

9. 自动备份与清理旧备份

自动将指定目录打包压缩并按日期命名,支持保留最近 N 份备份,自动清理旧备份。

import tarfile
from pathlib import Path
from datetime import datetime

def backup_directory(source: str, backup_dir: str, keep_last: int = 7):
    source_path = Path(source)
    backup_path = Path(backup_dir)
    backup_path.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_name = f"{source_path.name}_{timestamp}.tar.gz"
    backup_file = backup_path / backup_name

    print(f"正在备份 {source_path.name}...")
    with tarfile.open(backup_file, 'w:gz') as tar:
        tar.add(source_path, arcname=source_path.name)

    size_mb = backup_file.stat().st_size / 1024 / 1024
    print(f"备份完成: {backup_name} ({size_mb:.2f} MB)")

    # 清理旧备份
    old_backups = sorted(
        backup_path.glob(f"{source_path.name}_*.tar.gz"),
        key=lambda p: p.stat().st_mtime
    )
    for old in old_backups[:-keep_last]:
        old.unlink()
        print(f"已删除旧备份: {old.name}")

    return backup_file

# backup_directory("/var/www/myapp", "/backup/myapp", keep_last=7)

10. 图片批量压缩与格式转换

使用 Pillow 批量压缩图片、调整尺寸、转换格式,适合处理网站图片资源或照片库。

from PIL import Image
from pathlib import Path

def batch_compress_images(source_dir, output_dir, max_width=1920, quality=85, output_format=None):
    source = Path(source_dir)
    output = Path(output_dir)
    output.mkdir(parents=True, exist_ok=True)

    supported = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
    images = [f for f in source.iterdir() if f.suffix.lower() in supported]
    total_original = 0
    total_compressed = 0

    for img_path in images:
        with Image.open(img_path) as img:
            if img.mode in ('RGBA', 'P') and output_format == 'JPEG':
                img = img.convert('RGB')
            if img.width > max_width:
                ratio = max_width / img.width
                img = img.resize((max_width, int(img.height * ratio)), Image.LANCZOS)
            fmt = output_format or img.format or 'JPEG'
            ext_map = {'JPEG': '.jpg', 'PNG': '.png', 'WEBP': '.webp'}
            out_path = output / (img_path.stem + ext_map.get(fmt, img_path.suffix))
            save_kwargs = {'quality': quality, 'optimize': True} if fmt != 'PNG' else {'optimize': True}
            img.save(out_path, format=fmt, **save_kwargs)
        orig_size = img_path.stat().st_size
        comp_size = out_path.stat().st_size
        total_original += orig_size
        total_compressed += comp_size
        ratio = (1 - comp_size/orig_size) * 100
        print(f"  {img_path.name}: {orig_size/1024:.0f}KB -> {comp_size/1024:.0f}KB (压缩 {ratio:.0f}%)")

    saved = (total_original - total_compressed) / 1024 / 1024
    print(f"处理 {len(images)} 张图片,节省空间: {saved:.2f} MB")

# batch_compress_images("./images", "./images_webp", max_width=1200, quality=80, output_format="WEBP")

总结

Python 的强大之处在于:用几十行代码就能完成原本需要数小时手工操作的任务。以上 10 个脚本覆盖了文件处理的核心场景,每一个都可以直接拿来使用或稍作修改适配你的需求。

建议将这些脚本整理到一个工具库中,按需调用。随着积累越来越多,你会发现 Python 自动化能让你的工作效率产生质的飞跃。

推荐学习路径

  • 标准库pathlibshutiloscsvjsonrehashlib
  • 第三方库Pillow(图像处理)、watchdog(文件监控)、pandas(数据处理)
  • 进阶方向:结合 argparse 将脚本封装为命令行工具,或用 schedule 库实现定时执行
© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片快捷回复

    暂无评论内容