Des scripts Python ciblés automatisent le nettoyage, l’extraction, la conversion et l’analyse de métadonnées, en s’appuyant sur Pillow, pydub, mutagen et ffprobe. Des implémentations robustes et documentées existent sur GitHub et dans la documentation officielle des librairies.
Comment nettoyer les fichiers temporaires
Nettoyer les répertoires temporaires consiste à scanner, identifier les fichiers non modifiés depuis un seuil configurable, produire un rapport par répertoire, puis supprimer après confirmation tout en journalisant chaque action.
Objectif et risques :
Avant la liste, gardez en tête les principaux risques et bonnes pratiques.
- Espaces disque et performances : Supprimer libère de l’espace mais peut impacter des services si des fichiers sont encore utilisés.
- Fichiers en cours d’utilisation et permissions : Exécuter avec des droits limités ; prévoir gestion d’erreurs en cas de permission refusée.
- Bonnes pratiques : Activer le mode dry-run, utiliser des seuils conservateurs (30–90 jours), exclure par extension ou motif, et créer une sauvegarde temporaire si nécessaire.
Bibliothèques utiles : pathlib (doc officielle https://docs.python.org/3/library/pathlib.html), logging (doc officielle https://docs.python.org/3/library/logging.html), os, shutil, argparse, stat, datetime, signal. Optionnel : humanize pour tailles lisibles et send2trash pour suppression réversible.
#!/usr/bin/env python3
import argparse, logging, logging.handlers, sys, signal
from pathlib import Path
from datetime import datetime, timedelta
import os
try:
from send2trash import send2trash
TRASH_OK = True
except Exception:
TRASH_OK = False
def setup_logger(path=None):
logger = logging.getLogger("clean_temp")
logger.setLevel(logging.INFO)
handler = logging.handlers.RotatingFileHandler(path or "clean_temp.log", maxBytes=5_000_000, backupCount=3)
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
handler.setFormatter(fmt); logger.addHandler(handler)
return logger
def iter_old_files(root:Path, cutoff:datetime, excludes):
for p in root.rglob("*"):
try:
if p.is_file():
if any(p.match(pat) for pat in excludes): continue
mtime = datetime.fromtimestamp(p.stat().st_mtime)
if mtime < cutoff:
yield p, mtime, p.stat().st_size
except PermissionError:
continue
def human(n): return f"{n/1024/1024:.2f} MB"
def sigint_handler(signum, frame):
raise KeyboardInterrupt
signal.signal(signal.SIGINT, sigint_handler)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--paths", nargs="+", required=True)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--yes", action="store_true")
parser.add_argument("--exclude", nargs="*", default=[])
parser.add_argument("--log-file", default="clean_temp.log")
args = parser.parse_args()
logger = setup_logger(args.log_file)
cutoff = datetime.now() - timedelta(days=args.days)
overall = []
for p in args.paths:
root = Path(p)
files = list(iter_old_files(root, cutoff, args.exclude))
cnt = len(files); size = sum(s for _,_,s in files)
overall.append((root, cnt, size, files))
logger.info("Scanned %s: %d files, %s", root, cnt, human(size))
# Rapport
for root,cnt,size,_ in overall:
print(f"{root}: {cnt} files, {human(size)}")
if not any(cnt for _,cnt,_,_ in overall):
print("Rien à supprimer."); return
if not args.yes:
ans = input("Confirmez suppression ? [y/N]: ").lower()
if ans != "y": print("Annulé"); return
try:
for root,cnt,size,files in overall:
for p,mt,sz in files:
try:
logger.info("Deleting %s (%s)", p, human(sz))
if not args.dry_run:
if TRASH_OK:
send2trash(str(p))
else:
p.unlink()
except PermissionError:
logger.exception("Permission denied: %s", p)
except KeyboardInterrupt:
logger.warning("Interrompu par utilisateur")
print("Interrompu, journalisé.")
if __name__ == "__main__": main()
Snippets : Parcours récursif avec pathlib :
for p in Path(root).rglob("*"):
if p.is_file(): ...
Calcul d'âge :
cutoff = datetime.now() - timedelta(days=days)
if datetime.fromtimestamp(p.stat().st_mtime) < cutoff: ...
Suppression sécurisée et journalisation :
logger.info("Deleting %s", p)
if send2trash_ok: send2trash(str(p))
else: p.unlink()
Tester : Exécuter avec --dry-run pour voir le rapport sans suppression. Consulter le fichier de log rotatif (--log-file) pour suivre chaque action et les erreurs de permission.
| --paths | Liste de répertoires | Obligatoire |
| --days | Seuil en jours | 30–90 recommandé |
| --dry-run | Afficher sans supprimer | Toujours utiliser avant suppression |
| --yes | Passer la confirmation interactive | À utiliser avec précaution |
| --exclude | Patterns à ignorer | Ex : *.sock, *.lock |
Comment extraire des archives ZIP imbriquées
Un extracteur récursif permet d’extraire des archives ZIP imbriquées tout en évitant les boucles, le retraitement et les risques de sécurité classiques (traversal de chemin, fichiers exécutables).
Problématique et bonnes pratiques :
- Gérer des archives imbriquées jusqu’à une profondeur configurable pour éviter explosion de taille.
- Extraire dans un répertoire temporaire isolé et appliquer un filtre pour bloquer .exe/.bat/.sh.
- Vérifier les chemins relatifs pour prévenir le zip-slip (../) et limiter la taille totale extraite.
- Utiliser un cache SHA256 pour ne pas retraiter des archives identiques et détecter les boucles.
Bibliothèques recommandées : zipfile (doc officielle https://docs.python.org/3/library/zipfile.html), pathlib, tempfile, hashlib.
#!/usr/bin/env python3
import sys, argparse, zipfile, hashlib, json
from pathlib import Path
from tempfile import TemporaryDirectory
def is_zip_file(p:Path)->bool:
return p.suffix.lower()=='.zip' and zipfile.is_zipfile(p)
def sha256(path:Path)->str:
h=hashlib.sha256()
with path.open('rb') as f:
for b in iter(lambda: f.read(8192), b''): h.update(b)
return h.hexdigest()
def resolve_name_conflict(dest:Path)->Path:
if not dest.exists(): return dest
stem, i = dest.stem, 1
while True:
candidate = dest.with_name(f"{stem}_{i}{dest.suffix}")
if not candidate.exists(): return candidate
i += 1
def safe_extract(z:zipfile.ZipFile, target:Path, deny_ext={'.exe','.bat','.sh'}):
for member in z.infolist():
member_path = target.joinpath(member.filename)
if '..' in Path(member.filename).parts:
raise RuntimeError('Zip slip detected')
if Path(member.filename).suffix.lower() in deny_ext:
continue
target_parent = member_path.parent
target_parent.mkdir(parents=True, exist_ok=True)
with z.open(member) as src, open(resolve_name_conflict(member_path),'wb') as dst:
dst.write(src.read())
def process(path:Path, out:Path, seen:set, depth:int, max_depth:int, manifest:list):
if depth>max_depth: return
h=sha256(path)
if h in seen: return
seen.add(h)
with TemporaryDirectory() as td:
td_path=Path(td)
try:
with zipfile.ZipFile(path) as z:
safe_extract(z, td_path)
except zipfile.BadZipFile:
manifest.append({'source':str(path),'error':'badzip'})
return
for f in td_path.rglob('*'):
if f.is_file():
if is_zip_file(f):
process(f, out, seen, depth+1, max_depth, manifest)
else:
dest = resolve_name_conflict(out.joinpath(f.name))
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(f.read_bytes())
manifest.append({'source':str(path),'level':depth,'final':str(dest),'size':dest.stat().st_size})
if __name__=='__main__':
parser=argparse.ArgumentParser()
parser.add_argument('--input-dir', required=True)
parser.add_argument('--output-dir', required=True)
parser.add_argument('--max-depth', type=int, default=10)
parser.add_argument('--manifest', default='manifest.json')
parser.add_argument('--dry-run', action='store_true')
args=parser.parse_args()
inp, out = Path(args.input_dir), Path(args.output_dir)
out.mkdir(parents=True, exist_ok=True)
seen=set(); manifest=[]
for z in inp.rglob('*.zip'):
process(z, out, seen, 0, args.max_depth, manifest)
if not args.dry_run:
Path(args.manifest).write_text(json.dumps(manifest, indent=2))
Parallélisation : utiliser multiprocessing.Pool pour lancer process sur plusieurs archives racines si IO bound ; limiter workers à nombre de disques/IO et contrôler mémoire car extraction crée copies temporaires.
| --input-dir | Répertoire source | Risque : fichiers corrompus |
| --output-dir | Répertoire final | Risque : écrasement (résolu par suffixe) |
| --max-depth | Profondeur maximale | Risque : explosion de taille |
| --manifest | Fichier JSON/CSV de sortie | Permet audit |
| --dry-run | Simuler sans écrire | Utile pour validation |
Comment convertir en masse des formats de fichiers
Je fournis ici une approche pragmatique pour convertir en masse des fichiers en combinant bibliothèques spécialisées et logique de traitement par lot, en sautant les fichiers déjà au format cible, en exposant qualité/bitrate et en journalisant les échecs.
Cas d'usage typiques : images PNG→JPEG/WebP (attention à la perte et aux couleurs, préserver EXIF si nécessaire), audio WAV→MP3 (bitrate vs taille), documents .docx→.pdf ou .txt (perte de mise en page possible).
Bibliothèques recommandées : Pillow pour images (Documentation Pillow: https://pillow.readthedocs.io/), pydub pour audio (https://github.com/jiaaro/pydub) avec ffmpeg sous-jacent pour robustesse (https://ffmpeg.org/), python-docx pour lire .docx et pandoc/LibreOffice via subprocess pour produire des PDFs.
#!/usr/bin/env python3
import argparse, logging, mimetypes, subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import Image, ExifTags
from pydub import AudioSegment
from tqdm import tqdm
import docx
def convert_image(src, dst, quality, preserve_exif):
with Image.open(src) as im:
if im.mode in ("RGBA","P"): im = im.convert("RGB")
info = im.info
exif = info.get("exif") if preserve_exif else None
im.save(dst, quality=quality, exif=exif)
def convert_audio(src, dst, bitrate):
audio = AudioSegment.from_file(src)
audio.export(dst, bitrate=bitrate, format=dst.suffix[1:])
def docx_to_txt(src, dst):
doc = docx.Document(src)
with open(dst, "w", encoding="utf-8") as f:
for p in doc.paragraphs:
f.write(p.text + "\\n")
def convert_file(p, out_dir, args):
try:
mime, _ = mimetypes.guess_type(p)
dst = out_dir / (p.stem + "." + args.target_format)
if args.skip_existing and dst.exists():
logging.info(f"SKIP: {p} exists")
return "skipped"
if mime and mime.startswith("image"):
convert_image(p, dst, args.quality, args.preserve_exif)
elif mime and mime.startswith("audio"):
convert_audio(p, dst, args.bitrate)
elif p.suffix.lower()==".docx" and args.target_format=="txt":
docx_to_txt(p, dst)
else:
# Fallback to external converter for complex cases (PDF)
if args.target_format=="pdf":
subprocess.run(["soffice","--headless","--convert-to","pdf","--outdir",str(out_dir),str(p)], check=True)
else:
logging.error(f"NO_HANDLER: {p}")
return "no_handler"
logging.info(f"OK: {p} -> {dst}")
return "ok"
except Exception as e:
logging.exception(f"ERR: {p} {e}")
return "error"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input-dir", required=True)
parser.add_argument("--output-dir", required=True)
parser.add_argument("--target-format", required=True)
parser.add_argument("--quality", type=int, default=85)
parser.add_argument("--bitrate", default="192k")
parser.add_argument("--threads", type=int, default=4)
parser.add_argument("--skip-existing", action="store_true")
parser.add_argument("--log-file", default="batch_convert.log")
parser.add_argument("--preserve-exif", action="store_true")
args = parser.parse_args()
logging.basicConfig(filename=args.log_file, level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
inp = Path(args.input_dir)
out = Path(args.output_dir); out.mkdir(parents=True, exist_ok=True)
files = [p for p in inp.rglob("*") if p.is_file()]
with ThreadPoolExecutor(max_workers=args.threads) as ex:
futures = {ex.submit(convert_file, p, out, args): p for p in files}
for f in tqdm(as_completed(futures), total=len(futures)):
pass
if __name__=="__main__":
main()
Conseils de test : lancer sur un petit échantillon, vérifier EXIF et onde audio, augmenter les logs en DEBUG si nécessaire.
Conseils pour gros volumes : limiter les threads pour éviter d'épuiser l'I/O, surveiller CPU/IO et disque, conserver logs détaillés et réessayer les échecs.
| Source | Outil | Paramètre recommandé | Risques |
| PNG, JPEG, WebP | Pillow | quality=75–95 | Perte, couleurs, EXIF |
| WAV | pydub + ffmpeg | bitrate 128k–320k | Perte auditive |
| DOCX | python-docx / soffice | n/a (texte) / use soffice for PDF | Perte de mise en page |
Comment extraire les métadonnées médias à grande échelle
Extraire les métadonnées médias à grande échelle permet d'inventorier, trier automatiquement et enrichir des catalogues pour 10k+ fichiers sans intervention manuelle. Les champs utiles comprennent EXIF (datetime, GPS), résolution, durée, codec et bitrate.
- Outils recommandés : Pillow + piexif pour les images, Mutagen pour l'audio, ffprobe (FFmpeg) pour la vidéo (docs : https://ffmpeg.org/ffprobe.html, https://mutagen.readthedocs.io).
- Schéma normalisé conseillé : filename, path, type, width, height, duration_s, codec, bitrate, gps_lat, gps_lon, datetime.
Exemple de script minimal (media_metadata_export.py) : lecture d'arguments, détection par extension, EXIF via piexif, tags audio via Mutagen, ffprobe JSON pour vidéo, journalisation des échecs et écriture CSV UTF-8.
#!/usr/bin/env python3
import argparse, csv, json, subprocess, os
from PIL import Image
import piexif
from mutagen import File as MFile
def gps_to_deg(gps):
# gps: ((num,den), ...)
def conv(pair): return pair[0]/pair[1]
lat = conv(gps[0]) + conv(gps[1])/60 + conv(gps[2])/3600
return lat
def ffprobe(path):
cmd = ['ffprobe','-v','quiet','-print_format','json','-show_format','-show_streams', path]
r = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return json.loads(r.stdout)
# Parcours, extraction et écriture CSV simplifiés
# Voir le dépôt pour version complète avec --recursive, --include-types et logging des échecs
Conseils performance : itérer en lots, éviter de rouvrir les mêmes fichiers, limiter le nombre de processus ffprobe simultanés (IO bound).
Conseils d'enrichissement : effectuer le reverse geocoding en tâche asynchrone séparée pour éviter les latences et conserver la reproductibilité.
| Champ | Images | Audio | Vidéo | Outil |
| datetime | EXIF DateTime | Tag | format/metadata | Pillow/piexif, Mutagen, ffprobe |
| resolution | width/height | - | width/height | Pillow, ffprobe |
| duration_s | - | duration | duration | Mutagen, ffprobe |
| bitrate / codec | - | bitrate/codec | bitrate/codec | Mutagen, ffprobe |
| gps_lat / gps_lon | EXIF GPS | - | - | piexif |
Comment orchestrer et planifier ces scripts en production
Orchestrer vos scripts Python signifie garantir qu’ils tournent au bon moment, dans le bon ordre, et qu’ils remontent des erreurs utilisables.
Besoin d’orchestration : Fiabilité, retries, notifications, gestion d’erreurs et dépendances entre tâches sont essentielles. Extraction doit se terminer avant conversion, puis export des métadonnées. Retries et backoff évitent les échecs transitoires.
- Solutions simples (cron / systemd). Cron pour fréquence simple : exemple crontab pour lancer hourly_script.py chaque heure.
0 * * * * /usr/bin/python3 /opt/scripts/hourly_script.py >> /var/log/hourly_script.log 2>&1 - Systemd timer (plus fiable et observable). Exemple unit + timer pour clean_temp.py hebdomadaire.
[Unit] Description=Clean temp files [Service] Type=oneshot ExecStart=/usr/bin/python3 /opt/scripts/clean_temp.py -- fichier /etc/systemd/system/clean_temp.timer -- [Unit] Description=Weekly clean temp timer [Timer] OnCalendar=weekly Persistent=true [Install] WantedBy=timers.target - Logging & alerting. Logger vers fichiers puis logrotate (/etc/logrotate.d/). Alerting basique via mail (ssmtp/postfix) ou webhook HTTP vers un outil de notification.
Solutions avancées : Airflow (https://airflow.apache.org/docs/) gère DAG complexes, retries fins, SLA, monitoring et UI riche. N8n (https://docs.n8n.io/) est No/Low-Code pour intégrations rapides (APIs, Slack, S3). GitHub Actions (https://docs.github.com/actions) convient pour pipelines CI légers et déclenchements git.
- Quand préférer n8n. Pour intégrations rapides, non-devs, workflows d’API et connecteurs prêts.
- Quand préférer Airflow. Pour dépendances complexes, scalabilité, observabilité et scheduling fin.
Workflow n8n conceptuel :
Trigger (Schedule) -> Extract_Nested_Zip -> Batch_Convert -> Media_Metadata_Export -> Notify (Slack/Email)
Secrets & monitoring : Stocker secrets en variables d’environnement ou HashiCorp Vault. Centraliser logs via ELK/Graylog et exposer métriques basiques (success_count, duration) vers Prometheus. Préparer rollback en conservant artefacts et en versionnant scripts.
Exemple de wrapper Python avec retries et rapport final :
#!/usr/bin/env python3
import subprocess, time, sys, urllib.request, json
commands = [
"/usr/bin/python3 /opt/scripts/extract.py",
"/usr/bin/python3 /opt/scripts/convert.py",
"/usr/bin/python3 /opt/scripts/export_meta.py",
]
report = {"success": True, "details": []}
for cmd in commands:
attempts = 0
while attempts < 3:
attempts += 1
rc = subprocess.call(cmd, shell=True)
report["details"].append({"cmd": cmd, "rc": rc, "attempt": attempts})
if rc == 0:
break
time.sleep(5 * attempts)
if rc != 0:
report["success"] = False
break
# Envoi rapport via webhook
data = json.dumps(report).encode()
req = urllib.request.Request("https://hooks.example.com/notify", data=data, headers={"Content-Type":"application/json"})
try:
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
sys.exit(0 if report["success"] else 1)
| Solution | Complexité | Cas d’usage | Avantages | Inconvénients |
| Cron | Faible | Tâches simples, périodiques | Léger, universel | Peu d’observabilité, pas de dépendances |
| Systemd timers | Moyen | Tâches système, fiabilité | Restart, logging natif, persistent | Configuration système requise |
| n8n | Moyen | Intégrations API, non-devs | No/low-code, connecteurs | Moins adapté aux DAGs complexes |
| Airflow | Élevé | Workflows DAG complexes, grande échelle | Observabilité, retries, SLA | Installation et maintenance lourdes |
Sources officielles : Apache Airflow docs, n8n docs, GitHub Actions docs, systemd man pages.
Prêt à automatiser vos tâches de fichiers avec Python ?
En combinant quatre scripts ciblés — nettoyage, extraction d’archives imbriquées, conversion en masse et extraction de métadonnées — vous réduisez significativement le temps passé sur les tâches répétitives et limitez les erreurs humaines. En ajoutant une orchestration fiable (cron, systemd ou n8n) et des logs structurés, vous obtenez des traitements reproductibles, audités et évolutifs. Le bénéfice immédiat pour vous : gain de temps mesurable, espace disque récupéré et données média exploitables rapidement pour vos workflows ou analyses.
FAQ
A propos de l'auteur
Franck Scandolera — expert & formateur en Tracking avancé server-side, Analytics Engineering, Automatisation No/Low Code (n8n) et intégration de l'IA en entreprise. Responsable de l'agence webAnalyste et de l'organisme de formation Formations Analytics. Références clients : Logis Hôtel, Yelloh Village, BazarChic, Fédération Française de Football, Texdecor. Dispo pour aider les entreprises => contactez-moi.
⭐ Data Analyst, Analytics Engineer et expert dans l’automatisation IA ⭐
Ref clients : Logis Hôtel, Yelloh Village, BazarChic, Fédération Football Français, Texdecor…
Mon terrain de jeu :
Data Analyst & Analytics engineering : tracking propre RGPD, entrepôt de données (GTM server, BigQuery…), modèles (dbt/Dataform), dashboards décisionnels (Looker, SQL, Python).
Automatisation IA des taches Data, Marketing, RH, compta etc : conception de workflows intelligents robustes (n8n, Make, App Script, scraping) connectés aux API de vos outils et LLM (OpenAI, Mistral, Claude…).
Engineering IA pour créer des applications et agent IA sur mesure : intégration de LLM (OpenAI, Mistral…), RAG, assistants métier, génération de documents complexes, APIs, backends Node.js/Python.


