1.2 миллиона транзакций в секунду на входе. Через десять минут lag по партициям в Kafka пробивает отметку в сорок минут. Память пухнет, чекпоинты начинают падать по таймауту, а SLA на fraud-detection летит в пропасть. Бэкпрешер в потоковой аналитике: как Flink-пайплайн перестаёт захлёбываться — это вопрос, который бизнес задаёт ровно в тот момент, когда транзакции начинают одобряться вслепую, потому что realtime-streaming-ml модель не успевает вынести вердикт. И решать эту проблему нужно не перезапусками.
«Просто добавьте железа. Накиньте TaskManager-ов, выкрутите параллелизм х4, и Flink всё переварит».
Звучит логично, пока не посмотришь на сетевую топологию и диски. Бездумное наращивание параллелизма упирается в sink и сеть. Вы не ускоряете обработку, вы просто создаете больше потоков, которые будут синхронно ждать ответа от мертвой базы данных на записи. Бэкпрешер (backpressure) — это не баг. Это механизм самосохранения распределенной системы. Если ваш приемник данных (sink) может писать 50 тысяч строк в секунду, а Kafka отдает 200 тысяч, Flink обязан притормозить источник. Иначе у вас просто кончится RAM.
Начиная с версии 1.5, Flink использует credit-based flow control. Забудьте про старый добрый TCP-бэкпрешер. Теперь TaskManager-ы не отправляют данные по сети, пока принимающая сторона явно не выделит «кредиты» — свободные буферы под эти данные. Нет кредитов — данные стоят на месте, оператор помечается как заблокированный.
Как это выглядит на самом деле? Откройте Flink UI. Не смотрите на абстрактные графики потребления CPU на машинах, смотрите на вкладку BackPressure. Красные квадраты там означают одно: оператор не может протолкнуть данные дальше по графу. Здесь критически важно читать метрики правильно: status busy и idle. Если у вас 100% busy на операторе записи в базу, а предыдущий оператор трансформации показывает 0% idle — бутылочное горлышко найдено. Трансформация простаивает не потому, что ей нечего делать, а потому, что ее выходные буферы забиты под завязку ожиданием.
«Хорошо, база тормозит. Значит, надо просто буферизировать данные на стороне Flink, пока sink не оживет, увеличить таймауты и размер network buffers».
Агрессивные буферы просто маскируют проблему и взрывают память. Вы оттягиваете неизбежное. Как только увеличенные буферы переполнятся, бэкпрешер ударит с двойной силой, а заодно вы получите жесткие GC-паузы. Garbage Collector в Java не прощает хранения сотен гигабайт короткоживущих объектов. В realtime-сценариях телекома такие паузы по 5-10 секунд означают, что мы пропускаем окно принятия решения. ZGC здесь тоже не спасет: он дает субмиллисекундные паузы, но требует огромного запаса свободной памяти. Под бэкпрешером память забита, ZGC начинает голодать, и приложение падает с OutOfMemoryError.
Типовых причин, почему пайплайн захлебывается, ровно четыре: медленный sink, перекос ключей (data skew), паузы на сборку мусора и недостаток параллелизма на тяжелых вычислительных узлах (а не везде подряд). Перекос ключей — абсолютная классика.
«Ну так используйте хэширование ключей перед партиционированием, и всё распределится равномерно».
В теории. На практике в финтехе у вас всегда есть условный макро-регион или корпоративный шлюз, который генерирует 40% всего трафика. Хэш от его ID всегда падает в одну и ту же партицию. В итоге 99 тасок простаивают (idle 90%), а одна захлебывается (busy 100%), тянет за собой весь джоб, и лаг по этой конкретной партиции Kafka растет по экспоненте.
«Тогда раскидываем через rebalance() или rescale(). Пусть данные летят куда попало, лишь бы балансировщик справился».
Можно. Но давайте посмотрим на цену. Вызов rebalance() перетасовывает данные между всеми тасками по кругу (round-robin), создавая All-to-All сетевые соединения. На кластере из сотни узлов вы мгновенно укладываете сеть на лопатки служебным трафиком. Вызов rescale() работает умнее, распределяя нагрузку только внутри локальной группы тасок, экономя сеть. Но фундаментальная проблема в другом: потеря ключа означает, что вы больше не можете использовать keyBy(). А значит, прощай стейтфул-обработка, точные оконные функции и дедупликация.
Чтобы выровнять нагрузку и сбить бэкпрешер при skew, сохраняя стейт, применяют технику salt & pepper. К горячему ключу добавляется случайная соль, данные агрегируются в локальных микро-окнах, а затем соль отбрасывается для финальной глобальной агрегации. Это требует переписывания топологии, зато реально спасает SLA, не убивая логику.
«А что с чекпоинтами? Под бэкпрешером они падают. Включим Unaligned Checkpoints, и проблема исчезнет».
Механика Unaligned Checkpoints позволяет барьерам перепрыгивать очередь из данных. Да, чекпоинт пройдет быстро. Но размер вашего стейта улетит в стратосферу, потому что Flink начнет сохранять в чекпоинт все сырые данные, которые висят in-flight в сетевых буферах. Восстановление из такого раздутого чекпоинта превратится в кошмар дискового I/O.
Когда пайплайн начинает тупить, часто забывают про RocksDB. Это де-факто стандарт для хранения состояния в тяжелых стриминговых задачах, но из коробки он настроен под тепличные условия. Удачи удержать терабайт стейта за месяц в heap-памяти JVM — RocksDB на диске неизбежен, и его нужно тюнить. Вот что реально работает под нагрузкой:
- Перевод Managed Memory в офф-хип: отдаем RocksDB жестко лимитированный кусок RAM, чтобы он не конфликтовал с JVM и не провоцировал GC.
- Тюнинг Block Cache: радикальное увеличение размера кэша блоков спасает от лишних чтений с диска при частых лукапах стейта в окнах.
- Включение Bloom Filters: резко снижает I/O при проверке существования ключа перед записью.
- Использование выделенных локальных NVMe SSD под директории RocksDB. Использование сетевых дисков (EBS, Ceph) для горячего стейта — это архитектурное самоубийство.
Не забываем про watermark-стратегии. Слишком жесткие вотермарки заставляют окна закрываться рано, генерируя шквал late data, который тоже нужно обрабатывать отдельным потоком. Слишком ленивые вотермарки заставляют систему держать в стейте колоссальный объем данных в ожидании отстающих событий, раздувая RocksDB до предела.
Немного цифр. Пайплайн агрегации биллинга имел лаг в 350 миллионов сообщений после четырехчасовой аварии на апстриме. Попытка инженеров влить железо и поднять параллелизм до 512 потоков дала скорость восстановления всего 15 тысяч событий в секунду — сеть легла от All-to-All коммуникаций, а sink начал захлебываться и рвать коннекты. После отката параллелизма до 64 потоков, настройки асинхронного I/O для базы данных и тюнинга Block Cache в RocksDB, пропускная способность выросла до 140 тысяч событий в секунду на том же железе. Лаг в 350 миллионов был разобран за 40 минут. P99 latency на инференсе в стабильном режиме упал с 2000 мс до 45 мс.
Магия не в покупке новых серверов. Магия в понимании того, что каждый узел в распределенной системе имеет жесткий физический предел пропускной способности. И когда этот предел достигнут, система должна уметь элегантно притормозить источник, а вы должны точно знать, по какой метрике найти узел, которому не хватает воздуха.