日常开发中,重复性的文件操作浪费了大量时间。本文整理了10个Python文件处理与自动化的实战脚本,覆盖批量重命名、文件分类、增量备份、日志分析等常见场景,拿来即用。
1. 批量重命名文件(支持预览/确认)
怕误操作?先预览再执行,这是最稳妥的方式。
import os
import shutil
import re
def batch_rename(directory, pattern, replacement, dry_run=True):
"""批量重命名文件,支持预览模式"""
renamed = []
for filename in os.listdir(directory):
new_name = re.sub(pattern, replacement, filename)
if new_name != filename:
renamed.append((filename, new_name))
if not dry_run:
src = os.path.join(directory, filename)
dst = os.path.join(directory, new_name)
os.rename(src, dst)
return renamed
# 示例:将所有 test_*.txt 改为 demo_*.txt
if __name__ == '__main__':
directory = './files'
# 先预览
changes = batch_rename(directory, r'^test_', 'demo_', dry_run=True)
print(f'将重命名 {len(changes)} 个文件:')
for old, new in changes:
print(f' {old} → {new}')
# 确认后执行
# batch_rename(directory, r'^test_', 'demo_', dry_run=False)
2. 按扩展名自动分类文件
下载文件夹总是乱糟糟?写一个脚本按类型自动归类。
import os
import shutil
from pathlib import Path
def classify_files(source_dir, target_base=None):
"""按扩展名将文件分类到不同子目录"""
if target_base is None:
target_base = source_dir
target_base = Path(target_base)
category_map = {
'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.bmp'],
'documents': ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt'],
'videos': ['.mp4', '.avi', '.mov', '.mkv', '.flv'],
'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'],
'code': ['.py', '.js', '.java', '.cpp', '.html', '.css', '.go', '.rs'],
'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'],
}
stats = {cat: 0 for cat in category_map}
moved = 0
for filepath in Path(source_dir).iterdir():
if not filepath.is_file():
continue
ext = filepath.suffix.lower()
category = 'others'
for cat, exts in category_map.items():
if ext in exts:
category = cat
break
target_dir = target_base / category
target_dir.mkdir(exist_ok=True)
shutil.move(str(filepath), target_dir / filepath.name)
stats[category] += 1
moved += 1
print(f'共处理 {moved} 个文件:')
for cat, count in stats.items():
if count:
print(f' {cat}: {count} 个')
# 使用
classify_files('/Users/wang/Downloads')
3. 增量备份脚本(带去重)
完整备份太占空间?用增量备份+文件哈希去重,只备份变化的内容。
import os
import shutil
import hashlib
import json
from pathlib import Path
from datetime import datetime
class IncrementalBackup:
def __init__(self, source, backup_root, manifest_path=None):
self.source = Path(source)
self.backup_root = Path(backup_root)
self.manifest_path = Path(manifest_path or backup_root / '.manifest.json')
self.manifest = self._load_manifest()
def _load_manifest(self):
if self.manifest_path.exists():
return json.loads(self.manifest_path.read_text())
return {}
def _save_manifest(self):
self.manifest_path.write_text(json.dumps(self.manifest, indent=2, ensure_ascii=False))
def _file_hash(self, filepath):
h = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
return h.hexdigest()
def backup(self):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_dir = self.backup_root / f'backup_{timestamp}'
backup_dir.mkdir(parents=True, exist_ok=True)
new_files = 0
skipped = 0
for filepath in self.source.rglob('*'):
if not filepath.is_file():
continue
rel_path = filepath.relative_to(self.source)
file_hash = self._file_hash(filepath)
if rel_path in self.manifest and self.manifest[rel_path] == file_hash:
skipped += 1
continue
dest = backup_dir / rel_path
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(filepath, dest)
self.manifest[rel_path] = file_hash
new_files += 1
self._save_manifest()
print(f'✅ 备份完成:新增 {new_files} 个文件,跳过重复 {skipped} 个')
print(f'📁 备份位置:{backup_dir}')
# 使用
backup = IncrementalBackup('/Users/wang/Projects/myapp', '/Users/wang/backups')
backup.backup()
4. 日志文件分析脚本
分析access日志,统计IP访问量、热门路径、错误状态码。
import re
import json
from collections import Counter
from pathlib import Path
def analyze_access_log(log_path, top_n=20):
"""分析Nginx/Apache访问日志"""
ip_counter = Counter()
path_counter = Counter()
status_counter = Counter()
log_pattern = re.compile(
r'(?P\d+\.\d+\.\d+\.\d+).*?'
r'\[(?P[^\]]+)\].*?'
r'"(?P\w+) (?P[^\s"]+)[^"]*?" '
r'(?P\d{3})'
)
with open(log_path, 'r', errors='ignore') as f:
for line in f:
m = log_pattern.search(line)
if m:
ip_counter[m.group('ip')] += 1
path_counter[m.group('path')] += 1
status_counter[m.group('status')] += 1
print(f'{"="*50}')
print(f'📊 日志分析报告:{log_path}')
print(f'{"="*50}')
print(f'
🚨 Top {top_n} 访问IP:')
for ip, count in ip_counter.most_common(top_n):
print(f' {ip}: {count} 次')
print(f'
🔥 Top {top_n} 热门路径:')
for path, count in path_counter.most_common(top_n):
print(f' {path}: {count} 次')
print(f'
📈 HTTP状态码分布:')
for status, count in sorted(status_counter.items()):
bar = '█' * (count // 100)
print(f' {status}: {count:>6} {bar}')
# 返回数据供后续处理
return {
'ips': dict(ip_counter),
'paths': dict(path_counter),
'status': dict(status_counter),
}
# 使用
result = analyze_access_log('/var/log/nginx/access.log')
# 保存JSON报告
Path('/tmp/log_report.json').write_text(json.dumps(result, indent=2, ensure_ascii=False))
5. CSV数据清洗与转换
处理脏数据:去重、空值填充、类型转换、导出。
import pandas as pd
import json
from datetime import datetime
def clean_csv(input_path, output_path=None):
"""CSV数据清洗流水线"""
df = pd.read_csv(input_path)
print(f'原始数据:{len(df)} 行 × {len(df.columns)} 列')
# 1. 删除完全重复的行
before = len(df)
df = df.drop_duplicates()
print(f'去重:删除了 {before - len(df)} 条重复记录')
# 2. 填充数值列的空值(用中位数)
numeric_cols = df.select_dtypes(include='number').columns
for col in numeric_cols:
if df[col].isna().sum() > 0:
median_val = df[col].median()
df[col] = df[col].fillna(median_val)
print(f' 数值列 [{col}] 空值已用中位数 {median_val} 填充')
# 3. 填充字符串列的空值(用众数)
str_cols = df.select_dtypes(include='object').columns
for col in str_cols:
if df[col].isna().sum() > 0:
mode_val = df[col].mode().iloc[0] if not df[col].mode().empty else '未知'
df[col] = df[col].fillna(mode_val)
print(f' 字符串列 [{col}] 空值已用众数 "{mode_val}" 填充')
# 4. 删除无效行(所有列为空)
df = df.dropna(how='all')
# 5. 类型推断与转换
for col in df.columns:
if 'date' in col.lower() or 'time' in col.lower():
df[col] = pd.to_datetime(df[col], errors='coerce')
print(f'
✅ 清洗后数据:{len(df)} 行 × {len(df.columns)} 列')
if output_path:
df.to_csv(output_path, index=False, encoding='utf-8-sig')
print(f'📁 已保存到:{output_path}')
return df
# 使用
clean_df = clean_csv('/tmp/dirty_data.csv', '/tmp/clean_data.csv')
print(clean_df.head())
6. 定时文件清理(防止磁盘爆满)
根据文件大小和修改时间自动清理,保留重要文件。
import os
import time
from pathlib import Path
def cleanup_by_age(directory, days=30, extensions=None, dry_run=True):
"""
删除指定目录下超过指定天数的旧文件
:param directory: 目标目录
:param days: 超过多少天视为过期
:param extensions: 仅清理这些扩展名(如 ['.log', '.tmp']),None 表示全部
:param dry_run: True=预览,False=执行
"""
cutoff = time.time() - days * 86400
deleted_count = 0
freed_bytes = 0
for root, dirs, files in os.walk(directory):
# 跳过隐藏目录和系统目录
dirs[:] = [d for d in dirs if not d.startswith('.')]
for filename in files:
if filename.startswith('.'):
continue
filepath = os.path.join(root, filename)
if extensions and Path(filename).suffix.lower() not in extensions:
continue
try:
mtime = os.path.getmtime(filepath)
if mtime < cutoff:
size = os.path.getsize(filepath)
if not dry_run:
os.remove(filepath)
deleted_count += 1
freed_bytes += size
except OSError:
pass
freed_mb = freed_bytes / 1024 / 1024
action = '将删除' if dry_run else '已删除'
print(f'{action} {deleted_count} 个文件,释放 {freed_mb:.2f} MB')
if dry_run:
print('(这是预览模式,加 --confirm 参数正式执行)')
# 使用
# 预览30天以上的.log和.tmp文件
cleanup_by_age('/var/log', days=30, extensions=['.log'])
# 确认后执行
# cleanup_by_age('/var/log', days=30, extensions=['.log'], dry_run=False)
7. 批量压缩与解压缩
import os
import zipfile
import tarfile
from pathlib import Path
def batch_compress(source_dir, output_path=None, fmt='zip', level=6):
"""
批量压缩目录
:param source_dir: 源目录
:param output_path: 输出路径,默认在同目录生成同名压缩包
:param fmt: zip 或 tar.gz
:param level: 压缩级别 0-9
"""
source = Path(source_dir)
if output_path is None:
output_path = source.parent / f'{source.name}.{fmt}'
if fmt == 'zip':
with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(source):
# 跳过 __pycache__ 和 .git
dirs[:] = [d for d in dirs if d not in ('__pycache__', '.git', 'node_modules')]
for file in files:
if file.endswith('.pyc'):
continue
filepath = Path(root) / file
zf.write(filepath, filepath.relative_to(source.parent))
elif fmt == 'tar.gz':
with tarfile.open(output_path, 'w:gz', compresslevel=level) as tf:
tf.add(source, arcname=source.name)
size_mb = output_path.stat().st_size / 1024 / 1024
print(f'✅ 压缩完成:{output_path} ({size_mb:.2f} MB)')
def batch_unzip(archive_paths, dest_dir=None):
"""批量解压"""
for ap in archive_paths:
archive = Path(ap)
dest = Path(dest_dir) if dest_dir else archive.parent
dest.mkdir(exist_ok=True)
if archive.suffix == '.zip':
with zipfile.ZipFile(archive, 'r') as zf:
zf.extractall(dest)
elif archive.suffix == '.gz' and archive.stem.endswith('.tar'):
with tarfile.open(archive, 'r:gz') as tf:
tf.extractall(dest)
print(f'✅ 解压:{archive.name} → {dest}')
# 示例
batch_compress('/Users/wang/Projects/myapp', fmt='zip')
batch_unzip(['/tmp/app.zip', '/tmp/data.tar.gz'])
8. 监听目录变化(文件监控)
用watchdog监控目录变化,文件新增/修改/删除时自动触发动作。
import time
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class WatcherHandler(FileSystemEventHandler):
def __init__(self, extensions=None):
self.extensions = extensions or ['.py', '.js', '.html', '.css', '.json']
self.log = []
def on_modified(self, event):
if event.is_directory:
return
ext = Path(event.src_path).suffix
if ext in self.extensions:
self._log('📝 修改', event.src_path)
def on_created(self, event):
if event.is_directory:
return
ext = Path(event.src_path).suffix
if ext in self.extensions:
self._log('🆕 新建', event.src_path)
# 触发后续处理,比如自动lint
# subprocess.run(['pylint', event.src_path])
def on_deleted(self, event):
if event.is_directory:
return
self._log('🗑️ 删除', event.src_path)
def _log(self, action, path):
msg = f'{action}: {path}'
print(msg)
self.log.append(msg)
def watch_directory(path, extensions=None):
handler = WatcherHandler(extensions)
observer = Observer()
observer.schedule(handler, path, recursive=True)
observer.start()
print(f'👀 监控中:{path}(按 Ctrl+C 停止)')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
# 使用
watch_directory('/Users/wang/Projects/myapp/src', extensions=['.py', '.vue'])
9. Excel多表合并工具
import pandas as pd
from pathlib import Path
from openpyxl import load_workbook
def merge_excel_sheets(excel_paths, output_path):
"""
将多个Excel文件的Sheet合并到新文件
每个文件的每个Sheet转为一列子表
"""
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
for file_path in excel_paths:
wb = load_workbook(file_path, read_only=True)
sheet_names = wb.sheetnames
wb.close()
for sheet_name in sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
safe_name = f'{Path(file_path).stem}_{sheet_name}'[:31]
df.to_excel(writer, sheet_name=safe_name, index=False)
print(f' + {safe_name} ({len(df)} 行)')
print(f'
✅ 合并完成:{output_path}')
print(f' 共 {len(pd.ExcelFile(output_path).sheet_names)} 个工作表')
# 使用:合并目录下所有xlsx
excel_files = list(Path('/Users/wang/Documents').glob('*.xlsx'))
merge_excel_sheets(excel_files, '/tmp/merged.xlsx')
10. 自动生成目录树结构文档
import os
from pathlib import Path
def generate_tree(root_dir, ignore_patterns=None, max_depth=5):
"""生成项目目录树,可用于文档"""
if ignore_patterns is None:
ignore_patterns = {
'__pycache__', '.git', '.svn', 'node_modules',
'.venv', 'venv', '.idea', '.vscode', 'dist', 'build',
'*.pyc', '*.log', '.DS_Store'
}
def should_ignore(name):
for pattern in ignore_patterns:
if pattern.startswith('*'):
if name.endswith(pattern[1:]):
return True
elif name == pattern:
return True
return False
lines = []
def walk(dir_path, prefix='', depth=0):
if depth >= max_depth:
return
entries = [e for e in Path(dir_path).iterdir() if not should_ignore(e.name)]
entries = sorted(entries, key=lambda x: (not x.is_dir(), x.name.lower()))
for i, entry in enumerate(entries):
is_last = i == len(entries) - 1
connector = '└── ' if is_last else '├── '
icon = '📁' if entry.is_dir() else '📄'
lines.append(f'{prefix}{connector}{icon} {entry.name}')
if entry.is_dir():
extension = ' ' if is_last else '│ '
walk(entry, prefix + extension, depth + 1)
root = Path(root_dir)
lines.append(f'📂 {root.name}/')
walk(root)
return '
'.join(lines)
# 生成并输出
tree = generate_tree('/Users/wang/Projects/myapp')
print(tree)
# 保存到文件
Path('/tmp/project_tree.txt').write_text(tree)
总结
以上10个脚本覆盖了文件处理中最常见的场景:
- ✅ 批量操作:重命名、分类、压缩解压缩
- ✅ 数据处理:CSV清洗、Excel合并
- ✅ 自动化运维:增量备份、定时清理、目录监控
- ✅ 日志分析:快速定位问题流量
建议将这些脚本保存到本地工具库(加 --dry-run 或 dry_run=True 预览参数更安全),需要时直接调用,避免重复造轮子。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END















暂无评论内容