KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(brokers).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("antifraud-ml").build();Люди пишут этот кусок кода, деплоят джобу и идут спокойно спать. Уверенные, что теперь их данные в безопасности. Это иллюзия. Магической кнопки не существует. В Morana Labs мы разворачиваем индустриальный ИИ: real-time streaming ML, антифрод и edge-вычисления прямо на железе заказчика. Железо часто ограничено, данные никогда не покидают периметр, а профиль нагрузки такой, что абстракции начинают трещать по швам. И когда ты выводишь ML-модель в прод, exactly-once в Kafka и Flink: как не потерять и не задвоить событие под нагрузкой — становится вопросом не красивой архитектуры, а буквально выживания бизнеса.
Вы скажете: зачем вообще об этом думать? Включил идемпотентность в Кафке, настроил чекпоинты во Флинке, и всё работает из коробки. Вендоры же написали мануалы. Так говорят те, кто никогда не ловил зависшие транзакции на p99 latency под потоком в сотни тысяч сообщений в секунду. Снимаем розовые очки.
Есть три уровня гарантий доставки. At-most-once — кинул и забыл. Событие может потеряться, задвоения не будет. At-least-once — ретраи спасают от потерь, но рождают дубли. И тот самый святой Грааль — exactly-once. Только вот в распределённых системах его не бывает. То, что мы называем exactly-once, на самом деле effectively-once. Система внутри теряет и дублирует пакеты сколько угодно, но на выходе отрезает лишнее так, чтобы потребитель увидел событие ровно один раз.
Чтобы это чудо произошло на стороне брокера, продюсеру нужна идемпотентность. Под капотом брокер присваивает каждому продюсеру PID и следит за sequence numbers сообщений в рамках партиции. Если прилетает дубль с тем же номером — брокер молча его дропает. Цена? Почти нулевая. Но это спасает только от сетевых ретраев самого продюсера. А что если падает Flink, который вычитывает стрим, скорит моделью и пишет обратно? Здесь включается Transactional API. И вот тут начинаются проблемы.
Транзакции в Kafka стоят времени. Вы не можете просто писать в топик, вы должны открыть транзакцию, записать батч, закрыть транзакцию. Transaction Coordinator назначает эпоху, пишет стейт во внутренний топик. Это дополнительные сетевые прыжки. Оппонент возразит: ну так батчуй больше, в чём проблема? В latency. В антифроде для финтеха мы не можем ждать полсекунды, пока наберётся жирный батч. Модель должна заблокировать транзакцию по карте до того, как процессинг даст добро. Стриминговый конвейер обязан отвечать за миллисекунды. А транзакционный коммит — это смерть для throughput.
Flink решает проблему согласованности через чекпоинты. Алгоритм Чанди-Лампорта прогоняет барьеры через весь граф операторов. State backend сохраняет промежуточное состояние. Обычно это RocksDB. Скептик усмехнётся: чекпоинты асинхронные, они не блокируют обработку потока. Теоретически — да. Практически — скажите это диску, когда RocksDB начинает compaction, а ваш state раздулся до сотен гигабайт. Возникает I/O throttle. Барьеры выстраиваются в очередь. Чекпоинт начинает тайм-аутить. Оппонент не сдаётся: ну включи unaligned checkpoints, барьеры перепрыгнут очередь! Включаем. Latency падает, но state раздувается ещё сильнее, потому что теперь нужно снапшотить мегабайты in-flight данных из буферов сети. Железо стонет. Хвосты улетают в космос.
Связка Flink и Kafka работает через Two-Phase Commit. На этапе пре-коммита Flink записывает данные в Кафку, но транзакция остаётся открытой. Потребители, у которых стоит read_committed, этих данных не видят. Они ждут. Когда чекпоинт завершается, Flink делает коммит. И тут оппонент хлопает в ладоши: ну отлично же, железобетонная консистентность! А теперь представьте, что Flink упал после пре-коммита, но до коммита. Транзакция повисла. Kafka ждёт. Консьюмеры стоят намертво. Вы поднимаете Flink, JobManager вычитывает стейт, находит оборванную транзакцию и пытается её завершить через тот самый transactional.id из конфига. Скептик скажет: автоматика работает! Работает, если вы статически привязали ID к сабтаскам. А если нет — транзакция висит вечно, пока вы руками не прибьёте её через CLI. Всё это время ваш даунстрим парализован.
Это честный трейд-офф. Exactly-once — это всегда обмен пропускной способности и предсказуемости на гарантии консистентности. Сложность инфраструктуры возрастает нелинейно. Главный инженерный вопрос: а вам точно нужен exactly-once на уровне потоковой передачи?
Возьмём тот самый финтех-кейс: скоринг платежей. Нейросеть прожёвывает поток транзакций. Если мы потеряем событие — пропустим фрод, банк понесёт убытки. Если задвоим событие — дважды спишем баланс или ложно заблокируем чистую операцию, клиент устроит скандал. Казалось бы, без строгого EOS никуда.
Оппонент торжествует: вот видишь, я был прав! И жестоко ошибается. Во многих хайлоад-проектах мы намеренно выкидываем EOS из транзитного слоя. Мы настраиваем Kafka и Flink на банальный at-least-once. События гарантированно не теряются, но дублируются. Как мы защищаем логику? Через идемпотентного консьюмера на самом краю пайплайна.
Вместо того чтобы заставлять Flink мучительно держать двухфазные коммиты, мы переносим ответственность в базу данных, которая принимает финальный вердикт модели. Мы пишем ID банковской транзакции вместе с результатом скоринга в рамках одной атомарной операции. Либо используем upsert. Либо полагаемся на constraint уникальности. Если нода падает, перечитывает топик и снова присылает скоринг для той же операции — база просто обновляет ту же строку или игнорирует конфликт. Ложного блока не происходит.
Это кратно дешевле. Мы снимаем с брокера overhead на координацию эпох. Мы избавляем стриминг от необходимости держать открытые транзакции между долгими чекпоинтами. Мы убираем задержку для консьюмеров, потому что read_committed больше не нужен. Пропускная способность взлетает. Задержка становится стабильной как рельс. Архитектура становится дубовой и отказоустойчивой, потому что ломаться нечему.
Идемпотентный консьюмер решает проблему в девяноста процентах случаев. Транзакционный sink реально нужен только тогда, когда целевая система физически не умеет в идемпотентность. Например, вы дёргаете внешний legacy API, где каждый вызов тарифицируется или сразу отправляет физическое письмо. Во всех остальных сценариях — не платите налог на сложность там, где можно выехать на математике уникального ключа. Проектирование надёжного пайплайна — это не искусство собрать все модные галочки из документации. Это способность выбрать самый дешёвый и топорный инструмент, который железно закроет задачу на вашем профиле нагрузки. И отвечать за это решение головой.