- Регистрация
- 20.01.2011
- Сообщения
- 7,665
- Розыгрыши
- 0
- Реакции
- 135
Всем привет, не так давно у меня появилась идея реализовать на питоне скрипт который делает быструю сортировку и очистку дублей строк в огромном текстовом файле без колоссального потребления ресурсов ОЗУ в операционной системе.
Минимальные системные требования: Python 3.12, 4 ядра CPU, 4 гигабайта ОЗУ, свободное место на накопителе размером в импортируемый файл+10%.
Скрипт работает на встроенных библиотеках питона, кроме одной библиотеки используемой для разметки выдачи цвета лога (в общем все подробно объяснил в комментариях к коду, может быть потом исправлю тут на полноценную статью, если настроение будет):
by: rand
Минимальные системные требования: Python 3.12, 4 ядра CPU, 4 гигабайта ОЗУ, свободное место на накопителе размером в импортируемый файл+10%.
Скрипт работает на встроенных библиотеках питона, кроме одной библиотеки используемой для разметки выдачи цвета лога (в общем все подробно объяснил в комментариях к коду, может быть потом исправлю тут на полноценную статью, если настроение будет):
Bash:
pip install colorlog==6.8.2
Python:
import os
import tempfile
import heapq
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorlog import ColoredFormatter # Для цветного логирования
# Настраиваем цветное логирование
formatter = ColoredFormatter(
"%(log_color)s%(asctime)s - %(levelname)s - %(message)s", # Формат вывода логов
datefmt=None, # Формат даты, по умолчанию
reset=True, # Сброс цветов после каждой строки
log_colors={ # Задаем цвета для различных уровней логов
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}
)
# Настройка обработчика логов с UTF-8
handler = logging.StreamHandler() # Логи выводятся в консоль
handler.setFormatter(formatter) # Применяем цветной формат
logger = logging.getLogger() # Получаем объект логгера
logger.addHandler(handler) # Добавляем к нему наш обработчик
logger.setLevel(logging.INFO) # Устанавливаем уровень логирования INFO (можно изменять на DEBUG для более детальных логов)
# Создаем путь к папке TEMP для временных файлов
temp_dir = os.path.join(os.getcwd(), 'TEMP') # Директория для хранения временных файлов
if not os.path.exists(temp_dir): # Если папки нет, создаем её в корне скрипта
os.makedirs(temp_dir)
# Функция для обработки чанков: удаление дублей и сортировка
def process_chunk(chunk, temp_dir):
try:
logger.info(f"Начало обработки чанка размером {len(chunk)} строк") # Логируем количество строк в чанке
unique_items = set(chunk) # Превращаем список в множество для удаления дублей
logger.info(f"Удалено {len(chunk) - len(unique_items)} дублей в чанке") # Логируем, сколько дублей удалено
sorted_chunk = sorted(unique_items) # Сортируем уникальные строки
# Создаем временный файл, который не будет удален автоматически (delete=False)
with tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8', errors='ignore', dir=temp_dir) as temp_file:
temp_file.write('\n'.join(sorted_chunk) + '\n') # Записываем отсортированный чанк во временный файл
logger.info(f"Чанк записан во временный файл {temp_file.name}") # Логируем путь к временному файлу
return temp_file.name # Возвращаем имя временного файла
except Exception as e:
logger.error(f"Ошибка при обработке чанка: {e}") # Логируем ошибку, если что-то пошло не так
return None # Возвращаем None, если произошла ошибка
# Функция для пакетного слияния временных файлов в один общий файл
def merge_files(temp_files, output_file):
try:
logger.info(f"Окончательное слияние {len(temp_files)} временных файлов") # Логируем количество файлов для слияния
unique_count = 0 # Счетчик уникальных строк
duplicate_count = 0 # Счетчик дублей
# Открываем выходной файл для записи
with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
# Открываем все временные файлы и создаем итераторы
file_iters = [open(f, 'r', encoding='utf-8', errors='replace') for f in temp_files]
merged_iter = heapq.merge(*file_iters) # Сливаем отсортированные временные файлы в один поток
prev_line = None # Переменная для отслеживания предыдущей строки
# Проходим по слитым строкам
for line in merged_iter:
if line != prev_line: # Если строка не совпадает с предыдущей (уникальная)
outfile.write(line) # Записываем строку в выходной файл
prev_line = line # Обновляем предыдущую строку
unique_count += 1 # Увеличиваем счетчик уникальных строк
else:
duplicate_count += 1 # Если строка дублируется, увеличиваем счетчик дублей
# Закрываем временные файлы
for f in file_iters:
f.close()
logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}") # Логируем результаты
return unique_count, duplicate_count # Возвращаем количество уникальных строк и дублей
except Exception as e:
logger.error(f"Ошибка при слиянии файлов: {e}") # Логируем ошибку, если слияние не удалось
return 0, 0 # Возвращаем нули в случае ошибки
# Функция для пакетного слияния временных файлов
def batch_merge(temp_files, batch_size, temp_dir):
try:
logger.info(f"Начало пакетного слияния с размером пакета {batch_size}") # Логируем начало пакетного слияния
merged_files = [] # Список для хранения результатов пакетного слияния
total_unique_count = 0 # Общий счетчик уникальных строк
total_duplicate_count = 0 # Общий счетчик дублей
# Обрабатываем файлы по частям (батчами)
for i in range(0, len(temp_files), batch_size):
batch = temp_files[i:i + batch_size] # Берем очередной пакет файлов
logger.info(f"Слияние пакета с файлов {i+1} по {min(i + batch_size, len(temp_files))}") # Логируем диапазон файлов
# Создаем временный файл для результата слияния пакета
with tempfile.NamedTemporaryFile(delete=False, mode='w', encoding='utf-8', dir=temp_dir) as temp_merged_file:
unique_count, duplicate_count = merge_files(batch, temp_merged_file.name) # Сливаем пакет файлов
merged_files.append(temp_merged_file.name) # Добавляем результат слияния в список
total_unique_count += unique_count # Обновляем общий счетчик уникальных строк
total_duplicate_count += duplicate_count # Обновляем общий счетчик дублей
# Удаляем временные файлы после слияния
logger.info(f"Удаление временных файлов пакета с {i+1} по {min(i + batch_size, len(temp_files))}")
for temp_file in batch:
if os.path.exists(temp_file):
os.remove(temp_file) # Удаляем временный файл
return merged_files, total_unique_count, total_duplicate_count # Возвращаем список объединенных файлов и итоговые счетчики
except Exception as e:
logger.error(f"Ошибка при пакетном слиянии: {e}") # Логируем ошибку при пакетном слиянии
return temp_files, 0, 0 # Возвращаем исходные файлы и нули в случае ошибки
# Основная функция сортировки и удаления дубликатов с параллельной обработкой чанков
def sort_and_uniq_streaming(input_file, output_file, chunk_size=2000000, batch_size=10):
temp_files = [] # Список для хранения временных файлов
chunk = [] # Текущий чанк строк
original_count = 0 # Счетчик всех строк в исходном файле
logger.info(f"Чтение файла {input_file}...") # Логируем начало чтения файла
try:
with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile: # Открываем файл для чтения
with ThreadPoolExecutor() as executor: # Создаем пул потоков для параллельной обработки
futures = [] # Список задач для параллельной обработки
for line in infile:
chunk.append(line.strip()) # Добавляем строку в чанк
original_count += 1 # Увеличиваем счетчик строк
if len(chunk) >= chunk_size: # Если размер чанка достиг предела
futures.append(executor.submit(process_chunk, chunk, temp_dir)) # Запускаем обработку чанка в отдельном потоке
chunk = [] # Очищаем чанк для следующего набора строк
if chunk: # Если остались необработанные строки после завершения чтения файла
futures.append(executor.submit(process_chunk, chunk, temp_dir)) # Обрабатываем последний чанк
for future in as_completed(futures): # Ждем завершения всех задач
temp_file = future.result() # Получаем результат обработки (имя временного файла)
if temp_file:
temp_files.append(temp_file) # Добавляем временный файл в список
logger.info(f"Все чанки обработаны. Начинается пакетное слияние временных файлов...") # Логируем завершение обработки всех чанков
total_unique_count = 0 # Общий счетчик уникальных строк
total_duplicate_count = 0 # Общий счетчик дублей
# Пока временных файлов больше, чем размер батча, продолжаем пакетное слияние
while len(temp_files) > batch_size:
temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir) # Выполняем пакетное слияние
total_unique_count += unique_count # Увеличиваем общий счетчик уникальных строк
total_duplicate_count += duplicate_count # Увеличиваем общий счетчик дублей
# Если остался больше одного временного файла, выполняем финальное слияние
if len(temp_files) > 1:
logger.info("Завершающий этап слияния крупных оставшихся файлов") # Логируем финальный этап
unique_count, duplicate_count = merge_files(temp_files, output_file) # Финальное слияние временных файлов
total_unique_count += unique_count # Добавляем количество уникальных строк
total_duplicate_count += duplicate_count # Добавляем количество дублей
else:
# Если остался только один временный файл, переименовываем его в выходной файл
os.rename(temp_files[0], output_file) # Переименование файла
# Удаляем все оставшиеся временные файлы
for temp_file in temp_files:
if os.path.exists(temp_file): # Проверяем существование файла перед удалением
os.remove(temp_file) # Удаляем временный файл
logger.info("Все временные файлы удалены") # Логируем успешное удаление всех временных файлов
unique_count = original_count - total_duplicate_count # Рассчитываем количество уникальных строк
logger.info(f"Итоговые результаты: Уникальных строк: {unique_count}, Дублей удалено: {total_duplicate_count}") # Выводим результаты
return original_count # Возвращаем общее количество строк в исходном файле
except Exception as e:
logger.error(f"Ошибка при обработке файла: {e}") # Логируем ошибку при обработке файла
return 0 # Возвращаем 0 в случае ошибки
# Точка входа
if __name__ == '__main__':
tic = time.perf_counter() # Замеряем время начала выполнения программы
input_file = "large_random_emails.txt" # Исходный файл с данными
output_file = "output-sorted-unique.txt" # Выходной файл для сохранения результатов
original_count = sort_and_uniq_streaming(input_file, output_file) # Запускаем процесс сортировки и удаления дублей
tac = time.perf_counter() # Замеряем время завершения выполнения программы
logging.info(f"Всего обработано строк: {original_count}") # Логируем общее количество обработанных строк
logging.info(f"Удаление дублей и сортировка заняли {tac - tic:0.4f} секунд") # Логируем время выполнения программы
by: rand