Doppelte-Bilder-bzw-Dateien-finden-und-aufräumen
Aus lugvswiki
Zur Navigation springenZur Suche springenDoppelte-Bilder-bzw-Dateien-finden-und-aufräumen
#!/usr/bin/env python
# Findet doppelte Bilder nach Größe und falls Exif-Daten vorhanden sind, nach Kamera-Modell / Aufnahmedatum.
# Nehmen wir diesen Test an:
# Verzeichnisse erstellen
# mkdir -p find_groups/{a,b,c}
# Inhalte erstellen
# echo 1 > find_groups/a/A
# echo 1 > find_groups/b/A
# echo 1 > find_groups/c/A
# echo 1 > find_groups/a/B
# echo 1 > find_groups/b/B
# echo 1 > find_groups/c/B
#
# echo 2 > find_groups/a/Z
# echo 2 > find_groups/b/Z
#
# Dann werden 2 Gruppen gefunden:
# GRUPPE INDEX=0
# Verzeichnis 0: find_groups/a
# Verzeichnis 1: find_groups/b
# Verzeichnis 2: find_groups/c
# Dateien A,B
#
# GRUPPE INDEX=1
# Verzeichnis 0: find_groups/a
# Verzeichnis 1: find_groups/b
# Dateien Z
#
# Dann kann gesagt werden, dass Gruppenindex 0 aus Verzeichnis 1 gelöscht werden kann.
#
# Exif und sizes werden in cache.json gespeichert. Auch 1TB an Daten kann so
# schnell und wiederholend verglichen werden.
# finds duplicate images
# by size and if exif data present by comara model / capture date
#
# Assume this test:
#
# mkdir -p find_groups/{a,b,c}
# echo 1 > find_groups/a/A
# echo 1 > find_groups/b/A
# echo 1 > find_groups/c/A
# echo 1 > find_groups/a/B
# echo 1 > find_groups/b/B
# echo 1 > find_groups/c/B
#
# echo 2 > find_groups/a/Z
# echo 2 > find_groups/b/Z
#
#
# Then 2 groups will be found:
#
# GROUP INDEX=0
# directory 0: find_groups/a
# directory 1: find_groups/b
# directory 2: find_groups/c
# files A,B
#
# GROUP INDEX=1
# directory 0: find_groups/a
# directory 1: find_groups/b
# files Z
#
# Then you can say delete group index 0 from directory 1
#
# Exif and sizes get cached in cache.json. Even TB of data can be maintained
# this way easily.
import signal
import sys
import exifread
from datetime import datetime
import json
from concurrent.futures import ThreadPoolExecutor
# from typing import Self
from tqdm import tqdm
import os
import traceback
import json
from pathlib import Path
def files(path):
matches = []
for root, dirnames, filenames in os.walk(path):
for p in filenames:
x = os.path.join(root, p)
if os.path.isfile(x):
matches.append(x)
return matches
def get_exif_date_model(image_path):
try:
with open(image_path, 'rb') as image_file:
exif_tags = exifread.process_file(image_file, details=False)
if 'EXIF DateTimeOriginal' in exif_tags:
exif_date = exif_tags['EXIF DateTimeOriginal']
exif_date = datetime.strptime(str(exif_date), '%Y:%m:%d %H:%M:%S')
model = exif_tags['Image Model']
print([exif_date, model])
return [image_path, f"{exif_date}:f{model}"]
except Exception as e:
print(f"Error reading EXIF data: {e}")
traceback.print_exc()
return [image_path, "NO-EXIF"]
def get_size(path):
return [path, os.path.getsize(path)]
def fill_cache(cache, paths, f, prefix):
with tqdm(total=len(paths)) as progress:
with ThreadPoolExecutor() as executor:
for x in executor.map(f, paths):
if x:
file, r = x
progress.update()
cache[f"{prefix}{file}"] = r
else:
pass # no jpeg
executor.shutdown()
class UJsonStorage:
def __init__(self, file_path):
self.file_path = file_path
self.data = {}
try:
with open(file_path, 'r') as file:
self.data = json.load(file)
except FileNotFoundError:
pass
def __enter__(self):
# even ctrl-c is enough to have the file written incompletely.
self.set_sigterm_handler()
return self
def __exit__(self, exc_type, exc_value, traceback):
with open(self.file_path, 'w') as file:
json.dump(self.data, file, indent=4)
def get_or(self, k, f):
if not k in self.data:
self.data[k] = f()
return self.data[k]
def __getitem__(self, item):
return self.data[item]
def __setitem__(self, key, value):
self.data[key] = value
def __delitem__(self, key):
del self.data[key]
def set_sigterm_handler(self):
Assigns sigterm_handler for graceful shutdown during dump()
def sigterm_handler(*args, **kwargs):
if self.dthread is not None:
self.dthread.join()
sys.exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
def main(directories):
with UJsonStorage("cache.json") as cache:
files_ = [y for x in directories for y in files(x)]
print("files_")
print(files_)
print("e")
exif_missing = []
size_missing = []
def exif_key(file):
return f"exif:{file}"
def size_key(file):
return f"size:{file}"
for x in files_:
ek = exif_key(x)
sk = size_key(x)
if not ek in cache.data:
exif_missing.append(x)
if not sk in cache.data:
size_missing.append(x)
print('lese missing exif')
fill_cache(cache, exif_missing, get_exif_date_model, "exif:")
print('lese missing size')
fill_cache(cache, size_missing, get_size, "size:")
def key(path):
size = cache[f"size:{path}"]
exif = cache[f"exif:{path}"]
return f"{os.path.basename(path)}:{size}:{exif}"
bydirs = {}
for f in files_:
p = Path(f)
k = key(f)
if not k in bydirs:
bydirs[k] = {"basename": p.name, "directories": []}
bydirs[k]["directories"].append(str(p.parent))
print(bydirs)
groups = {}
for k, v in bydirs.items():
v["directories"].sort()
d_str = "::".join(v["directories"])
if not d_str in groups:
groups[d_str] = []
groups[d_str].append(v["basename"])
group_list = []
for k, v in groups.items():
group_list.append({
"directory_list": k.split("::"),
"files": v
})
group_list = [x for x in group_list if len(x["directory_list"]) > 1]
group_list.sort(key = lambda x: len(x["files"]))
def print_dirs(g):
for i, d in enumerate(g["directory_list"]):
print("%s: %s " % (i, d))
for i, g in enumerate(group_list):
print("")
print("")
print("GROUP [INDEX=%s] === count: %s" % (i, len(g["files"])))
print("directories:")
print_dirs(g)
print("files:")
print(g["files"])
while True:
a = input("delete group [index|Q=quit]: ")
if (a == "Q"):
return
a = int(a)
group = group_list[int(a)]
print_dirs(group)
b = input("delete files from directory [index]: ")
idx = int(a)
d = group["directory_list"][int(b)]
for f in group["files"]:
x = os.path.join(d, f)
print("del %s" % x)
os.unlink(x)
if __name__ == "__main__":
directories = sys.argv[1:]
print(directories)
main(directories)