Python文件处理与自动化实战:10个实用脚本让效率翻倍

日常开发中,重复性的文件操作浪费了大量时间。本文整理了10个Python文件处理与自动化的实战脚本,覆盖批量重命名、文件分类、增量备份、日志分析等常见场景,拿来即用。

1. 批量重命名文件(支持预览/确认)

怕误操作?先预览再执行,这是最稳妥的方式。

import os
import shutil
import re

def batch_rename(directory, pattern, replacement, dry_run=True):
    """批量重命名文件,支持预览模式"""
    renamed = []
    for filename in os.listdir(directory):
        new_name = re.sub(pattern, replacement, filename)
        if new_name != filename:
            renamed.append((filename, new_name))
            if not dry_run:
                src = os.path.join(directory, filename)
                dst = os.path.join(directory, new_name)
                os.rename(src, dst)
    return renamed

# 示例:将所有 test_*.txt 改为 demo_*.txt
if __name__ == '__main__':
    directory = './files'
    # 先预览
    changes = batch_rename(directory, r'^test_', 'demo_', dry_run=True)
    print(f'将重命名 {len(changes)} 个文件:')
    for old, new in changes:
        print(f'  {old} → {new}')
    # 确认后执行
    # batch_rename(directory, r'^test_', 'demo_', dry_run=False)

2. 按扩展名自动分类文件

下载文件夹总是乱糟糟?写一个脚本按类型自动归类。

import os
import shutil
from pathlib import Path

def classify_files(source_dir, target_base=None):
    """按扩展名将文件分类到不同子目录"""
    if target_base is None:
        target_base = source_dir
    target_base = Path(target_base)

    category_map = {
        'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.bmp'],
        'documents': ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt'],
        'videos': ['.mp4', '.avi', '.mov', '.mkv', '.flv'],
        'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'],
        'code': ['.py', '.js', '.java', '.cpp', '.html', '.css', '.go', '.rs'],
        'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'],
    }

    stats = {cat: 0 for cat in category_map}
    moved = 0

    for filepath in Path(source_dir).iterdir():
        if not filepath.is_file():
            continue
        ext = filepath.suffix.lower()
        category = 'others'
        for cat, exts in category_map.items():
            if ext in exts:
                category = cat
                break
        target_dir = target_base / category
        target_dir.mkdir(exist_ok=True)
        shutil.move(str(filepath), target_dir / filepath.name)
        stats[category] += 1
        moved += 1

    print(f'共处理 {moved} 个文件:')
    for cat, count in stats.items():
        if count:
            print(f'  {cat}: {count} 个')

# 使用
classify_files('/Users/wang/Downloads')

3. 增量备份脚本(带去重)

完整备份太占空间?用增量备份+文件哈希去重,只备份变化的内容。

import os
import shutil
import hashlib
import json
from pathlib import Path
from datetime import datetime

class IncrementalBackup:
    def __init__(self, source, backup_root, manifest_path=None):
        self.source = Path(source)
        self.backup_root = Path(backup_root)
        self.manifest_path = Path(manifest_path or backup_root / '.manifest.json')
        self.manifest = self._load_manifest()

    def _load_manifest(self):
        if self.manifest_path.exists():
            return json.loads(self.manifest_path.read_text())
        return {}

    def _save_manifest(self):
        self.manifest_path.write_text(json.dumps(self.manifest, indent=2, ensure_ascii=False))

    def _file_hash(self, filepath):
        h = hashlib.sha256()
        with open(filepath, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                h.update(chunk)
        return h.hexdigest()

    def backup(self):
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_dir = self.backup_root / f'backup_{timestamp}'
        backup_dir.mkdir(parents=True, exist_ok=True)

        new_files = 0
        skipped = 0

        for filepath in self.source.rglob('*'):
            if not filepath.is_file():
                continue
            rel_path = filepath.relative_to(self.source)
            file_hash = self._file_hash(filepath)

            if rel_path in self.manifest and self.manifest[rel_path] == file_hash:
                skipped += 1
                continue

            dest = backup_dir / rel_path
            dest.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(filepath, dest)
            self.manifest[rel_path] = file_hash
            new_files += 1

        self._save_manifest()
        print(f'✅ 备份完成:新增 {new_files} 个文件,跳过重复 {skipped} 个')
        print(f'📁 备份位置:{backup_dir}')

# 使用
backup = IncrementalBackup('/Users/wang/Projects/myapp', '/Users/wang/backups')
backup.backup()

4. 日志文件分析脚本

分析access日志,统计IP访问量、热门路径、错误状态码。

import re
import json
from collections import Counter
from pathlib import Path

def analyze_access_log(log_path, top_n=20):
    """分析Nginx/Apache访问日志"""
    ip_counter = Counter()
    path_counter = Counter()
    status_counter = Counter()

    log_pattern = re.compile(
        r'(?P\d+\.\d+\.\d+\.\d+).*?'
        r'\[(?P[^\]]+)\].*?'
        r'"(?P\w+) (?P[^\s"]+)[^"]*?" '
        r'(?P\d{3})'
    )

    with open(log_path, 'r', errors='ignore') as f:
        for line in f:
            m = log_pattern.search(line)
            if m:
                ip_counter[m.group('ip')] += 1
                path_counter[m.group('path')] += 1
                status_counter[m.group('status')] += 1

    print(f'{"="*50}')
    print(f'📊 日志分析报告:{log_path}')
    print(f'{"="*50}')

    print(f'
🚨 Top {top_n} 访问IP:')
    for ip, count in ip_counter.most_common(top_n):
        print(f'  {ip}: {count} 次')

    print(f'
🔥 Top {top_n} 热门路径:')
    for path, count in path_counter.most_common(top_n):
        print(f'  {path}: {count} 次')

    print(f'
📈 HTTP状态码分布:')
    for status, count in sorted(status_counter.items()):
        bar = '█' * (count // 100)
        print(f'  {status}: {count:>6} {bar}')

    # 返回数据供后续处理
    return {
        'ips': dict(ip_counter),
        'paths': dict(path_counter),
        'status': dict(status_counter),
    }

# 使用
result = analyze_access_log('/var/log/nginx/access.log')
# 保存JSON报告
Path('/tmp/log_report.json').write_text(json.dumps(result, indent=2, ensure_ascii=False))

5. CSV数据清洗与转换

处理脏数据:去重、空值填充、类型转换、导出。

import pandas as pd
import json
from datetime import datetime

def clean_csv(input_path, output_path=None):
    """CSV数据清洗流水线"""
    df = pd.read_csv(input_path)

    print(f'原始数据:{len(df)} 行 × {len(df.columns)} 列')

    # 1. 删除完全重复的行
    before = len(df)
    df = df.drop_duplicates()
    print(f'去重:删除了 {before - len(df)} 条重复记录')

    # 2. 填充数值列的空值(用中位数)
    numeric_cols = df.select_dtypes(include='number').columns
    for col in numeric_cols:
        if df[col].isna().sum() > 0:
            median_val = df[col].median()
            df[col] = df[col].fillna(median_val)
            print(f'  数值列 [{col}] 空值已用中位数 {median_val} 填充')

    # 3. 填充字符串列的空值(用众数)
    str_cols = df.select_dtypes(include='object').columns
    for col in str_cols:
        if df[col].isna().sum() > 0:
            mode_val = df[col].mode().iloc[0] if not df[col].mode().empty else '未知'
            df[col] = df[col].fillna(mode_val)
            print(f'  字符串列 [{col}] 空值已用众数 "{mode_val}" 填充')

    # 4. 删除无效行(所有列为空)
    df = df.dropna(how='all')

    # 5. 类型推断与转换
    for col in df.columns:
        if 'date' in col.lower() or 'time' in col.lower():
            df[col] = pd.to_datetime(df[col], errors='coerce')

    print(f'
✅ 清洗后数据:{len(df)} 行 × {len(df.columns)} 列')

    if output_path:
        df.to_csv(output_path, index=False, encoding='utf-8-sig')
        print(f'📁 已保存到:{output_path}')

    return df

# 使用
clean_df = clean_csv('/tmp/dirty_data.csv', '/tmp/clean_data.csv')
print(clean_df.head())

6. 定时文件清理(防止磁盘爆满)

根据文件大小和修改时间自动清理,保留重要文件。

import os
import time
from pathlib import Path

def cleanup_by_age(directory, days=30, extensions=None, dry_run=True):
    """
    删除指定目录下超过指定天数的旧文件
    :param directory: 目标目录
    :param days: 超过多少天视为过期
    :param extensions: 仅清理这些扩展名(如 ['.log', '.tmp']),None 表示全部
    :param dry_run: True=预览,False=执行
    """
    cutoff = time.time() - days * 86400
    deleted_count = 0
    freed_bytes = 0

    for root, dirs, files in os.walk(directory):
        # 跳过隐藏目录和系统目录
        dirs[:] = [d for d in dirs if not d.startswith('.')]

        for filename in files:
            if filename.startswith('.'):
                continue
            filepath = os.path.join(root, filename)
            if extensions and Path(filename).suffix.lower() not in extensions:
                continue
            try:
                mtime = os.path.getmtime(filepath)
                if mtime < cutoff:
                    size = os.path.getsize(filepath)
                    if not dry_run:
                        os.remove(filepath)
                    deleted_count += 1
                    freed_bytes += size
            except OSError:
                pass

    freed_mb = freed_bytes / 1024 / 1024
    action = '将删除' if dry_run else '已删除'
    print(f'{action} {deleted_count} 个文件,释放 {freed_mb:.2f} MB')

    if dry_run:
        print('(这是预览模式,加 --confirm 参数正式执行)')

# 使用
# 预览30天以上的.log和.tmp文件
cleanup_by_age('/var/log', days=30, extensions=['.log'])
# 确认后执行
# cleanup_by_age('/var/log', days=30, extensions=['.log'], dry_run=False)

7. 批量压缩与解压缩

import os
import zipfile
import tarfile
from pathlib import Path

def batch_compress(source_dir, output_path=None, fmt='zip', level=6):
    """
    批量压缩目录
    :param source_dir: 源目录
    :param output_path: 输出路径,默认在同目录生成同名压缩包
    :param fmt: zip 或 tar.gz
    :param level: 压缩级别 0-9
    """
    source = Path(source_dir)
    if output_path is None:
        output_path = source.parent / f'{source.name}.{fmt}'

    if fmt == 'zip':
        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
            for root, dirs, files in os.walk(source):
                # 跳过 __pycache__ 和 .git
                dirs[:] = [d for d in dirs if d not in ('__pycache__', '.git', 'node_modules')]
                for file in files:
                    if file.endswith('.pyc'):
                        continue
                    filepath = Path(root) / file
                    zf.write(filepath, filepath.relative_to(source.parent))
    elif fmt == 'tar.gz':
        with tarfile.open(output_path, 'w:gz', compresslevel=level) as tf:
            tf.add(source, arcname=source.name)

    size_mb = output_path.stat().st_size / 1024 / 1024
    print(f'✅ 压缩完成:{output_path} ({size_mb:.2f} MB)')

def batch_unzip(archive_paths, dest_dir=None):
    """批量解压"""
    for ap in archive_paths:
        archive = Path(ap)
        dest = Path(dest_dir) if dest_dir else archive.parent
        dest.mkdir(exist_ok=True)

        if archive.suffix == '.zip':
            with zipfile.ZipFile(archive, 'r') as zf:
                zf.extractall(dest)
        elif archive.suffix == '.gz' and archive.stem.endswith('.tar'):
            with tarfile.open(archive, 'r:gz') as tf:
                tf.extractall(dest)
        print(f'✅ 解压:{archive.name} → {dest}')

# 示例
batch_compress('/Users/wang/Projects/myapp', fmt='zip')
batch_unzip(['/tmp/app.zip', '/tmp/data.tar.gz'])

8. 监听目录变化(文件监控)

用watchdog监控目录变化,文件新增/修改/删除时自动触发动作。

import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class WatcherHandler(FileSystemEventHandler):
    def __init__(self, extensions=None):
        self.extensions = extensions or ['.py', '.js', '.html', '.css', '.json']
        self.log = []

    def on_modified(self, event):
        if event.is_directory:
            return
        ext = Path(event.src_path).suffix
        if ext in self.extensions:
            self._log('📝 修改', event.src_path)

    def on_created(self, event):
        if event.is_directory:
            return
        ext = Path(event.src_path).suffix
        if ext in self.extensions:
            self._log('🆕 新建', event.src_path)
            # 触发后续处理,比如自动lint
            # subprocess.run(['pylint', event.src_path])

    def on_deleted(self, event):
        if event.is_directory:
            return
        self._log('🗑️ 删除', event.src_path)

    def _log(self, action, path):
        msg = f'{action}: {path}'
        print(msg)
        self.log.append(msg)

def watch_directory(path, extensions=None):
    handler = WatcherHandler(extensions)
    observer = Observer()
    observer.schedule(handler, path, recursive=True)
    observer.start()
    print(f'👀 监控中:{path}(按 Ctrl+C 停止)')
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 使用
watch_directory('/Users/wang/Projects/myapp/src', extensions=['.py', '.vue'])

9. Excel多表合并工具

import pandas as pd
from pathlib import Path
from openpyxl import load_workbook

def merge_excel_sheets(excel_paths, output_path):
    """
    将多个Excel文件的Sheet合并到新文件
    每个文件的每个Sheet转为一列子表
    """
    with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
        for file_path in excel_paths:
            wb = load_workbook(file_path, read_only=True)
            sheet_names = wb.sheetnames
            wb.close()

            for sheet_name in sheet_names:
                df = pd.read_excel(file_path, sheet_name=sheet_name)
                safe_name = f'{Path(file_path).stem}_{sheet_name}'[:31]
                df.to_excel(writer, sheet_name=safe_name, index=False)
                print(f'  + {safe_name} ({len(df)} 行)')

    print(f'
✅ 合并完成:{output_path}')
    print(f'   共 {len(pd.ExcelFile(output_path).sheet_names)} 个工作表')

# 使用:合并目录下所有xlsx
excel_files = list(Path('/Users/wang/Documents').glob('*.xlsx'))
merge_excel_sheets(excel_files, '/tmp/merged.xlsx')

10. 自动生成目录树结构文档

import os
from pathlib import Path

def generate_tree(root_dir, ignore_patterns=None, max_depth=5):
    """生成项目目录树,可用于文档"""
    if ignore_patterns is None:
        ignore_patterns = {
            '__pycache__', '.git', '.svn', 'node_modules',
            '.venv', 'venv', '.idea', '.vscode', 'dist', 'build',
            '*.pyc', '*.log', '.DS_Store'
        }

    def should_ignore(name):
        for pattern in ignore_patterns:
            if pattern.startswith('*'):
                if name.endswith(pattern[1:]):
                    return True
            elif name == pattern:
                return True
        return False

    lines = []

    def walk(dir_path, prefix='', depth=0):
        if depth >= max_depth:
            return
        entries = [e for e in Path(dir_path).iterdir() if not should_ignore(e.name)]
        entries = sorted(entries, key=lambda x: (not x.is_dir(), x.name.lower()))

        for i, entry in enumerate(entries):
            is_last = i == len(entries) - 1
            connector = '└── ' if is_last else '├── '
            icon = '📁' if entry.is_dir() else '📄'
            lines.append(f'{prefix}{connector}{icon} {entry.name}')

            if entry.is_dir():
                extension = '    ' if is_last else '│   '
                walk(entry, prefix + extension, depth + 1)

    root = Path(root_dir)
    lines.append(f'📂 {root.name}/')
    walk(root)
    return '
'.join(lines)

# 生成并输出
tree = generate_tree('/Users/wang/Projects/myapp')
print(tree)
# 保存到文件
Path('/tmp/project_tree.txt').write_text(tree)

总结

以上10个脚本覆盖了文件处理中最常见的场景:

  • 批量操作:重命名、分类、压缩解压缩
  • 数据处理:CSV清洗、Excel合并
  • 自动化运维:增量备份、定时清理、目录监控
  • 日志分析:快速定位问题流量

建议将这些脚本保存到本地工具库(加 --dry-rundry_run=True 预览参数更安全),需要时直接调用,避免重复造轮子。

© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片快捷回复

    暂无评论内容