Публикация была переведена автоматически. Исходный язык: Русский
Есть задача, которая появляется внезапно и всегда "прямо сейчас":
- партнер присылает вебхуки (платежи, статусы, KYC, доставка, что угодно)
- иногда присылает один и тот же event 2-5 раз (ретраи - это норма)
- иногда присылает события не по порядку
- иногда сеть моргает, ваш сервис перезапускается, база на миг тормозит
- а бизнес хочет одно: "чтобы не было дублей и чтобы ничего не терялось"
Наивный подход “просто обработаем в контроллере” ломается быстро:
- обработка заняла 2-3 секунды, партнер ждал 1 секунду и ретраит
- вы получили дубли
- часть запросов упала, часть успела списать деньги, часть нет
- и потом вы неделю объясняете, почему у клиента два списания
Ниже - разбор по фактам и рабочий шаблон на Kotlin back-end: идемпотентный прием вебхуков и обработка через PostgreSQL (inbox pattern). Без магии, без "у меня где-то было", только то, что реально работает в проде у многих команд.
- “Прием” вебхука должен быть быстрым и стабильным
- вернуть 2xx как можно раньше
- не делать тяжелую бизнес-логику в HTTP-обработчике
- Идемпотентность по event_id
- один и тот же event можно получить много раз
- итоговый бизнес-эффект должен быть ровно один раз
- Очередь должна переживать рестарты и падения
- поэтому "очередь в памяти" не подходит
- используем БД как надежный журнал
- Обработка параллельная, но без гонок
- несколько воркеров/инстансов сервиса
- каждое событие должен обрабатывать только один воркер
- Наблюдаемость и управляемость
- видим backlog
- видим ретраи, ошибки, DLQ
- можем "переиграть" проблемные события
Разделяем на две части:
- HTTP endpoint принимает вебхук и кладет запись в таблицу webhook_inbox
- Фоновый воркер читает webhook_inbox батчами через FOR UPDATE SKIP LOCKED и выполняет бизнес-обработку
Ключевой момент: дедуп делается в момент вставки в inbox через уникальный индекс.
create table webhook_inbox (
id bigserial primary key,
provider text not null,
event_id text not null,
received_at timestamptz not null default now(),
headers jsonb not null,
payload jsonb not null,
status smallint not null default 0, -- 0-ready, 1-processing, 2-done, 3-dead
attempt int not null default 0,
next_attempt_at timestamptz not null default now(),
last_error text null,
processed_at timestamptz null
);
-- Идемпотентность: один event_id на provider только один раз
create unique index ux_webhook_inbox_provider_event
on webhook_inbox(provider, event_id);
-- Для выборки очереди воркером
create index ix_webhook_inbox_ready
on webhook_inbox(status, next_attempt_at, received_at);Факт, на котором держится вся идемпотентность:
- уникальный индекс (provider, event_id) + INSERT ... ON CONFLICT DO NOTHING
Это гарантирует, что при повторной доставке того же события вы не создадите вторую запись.
Ниже пример на Ktor. Он делает только то, что должен делать HTTP слой:
- читает тело как bytes (это важно для подписи)
- валидирует HMAC (пример)
- парсит JSON как JsonElement (минимальная валидация “это JSON”)
- вставляет в webhook_inbox идемпотентно
- возвращает 200
import java.util.Base64
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
fun hmacSha256Hex(secret: ByteArray, message: ByteArray): String {
val mac = Mac.getInstance("HmacSHA256")
mac.init(SecretKeySpec(secret, "HmacSHA256"))
val digest = mac.doFinal(message)
return digest.joinToString("") { "%02x".format(it) }
}import java.sql.Connection
import javax.sql.DataSource
data class InsertResult(val inserted: Boolean)
fun insertInbox(
ds: DataSource,
provider: String,
eventId: String,
headersJson: String,
payloadJson: String
): InsertResult {
ds.connection.use { c ->
c.autoCommit = false
try {
val sql = """
insert into webhook_inbox(provider, event_id, headers, payload)
values (?, ?, ?::jsonb, ?::jsonb)
on conflict (provider, event_id) do nothing
""".trimIndent()
val inserted = c.prepareStatement(sql).use { ps ->
ps.setString(1, provider)
ps.setString(2, eventId)
ps.setString(3, headersJson)
ps.setString(4, payloadJson)
ps.executeUpdate() == 1
}
c.commit()
return InsertResult(inserted)
} catch (e: Exception) {
c.rollback()
throw e
}
}
}import io.ktor.server.application.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.serialization.json.*
fun Application.webhookRoutes(
ds: javax.sql.DataSource,
secrets: Map<String, ByteArray> // provider -> secret
) {
routing {
post("/webhooks/{provider}") {
val provider = call.parameters["provider"] ?: return@post call.respondText(
"provider required",
status = io.ktor.http.HttpStatusCode.BadRequest
)
val secret = secrets[provider] ?: return@post call.respondText(
"unknown provider",
status = io.ktor.http.HttpStatusCode.NotFound
)
val rawBody = call.receiveChannel().toByteArray()
// Пример: заголовок X-Signature = hex(hmacSha256(body))
val sig = call.request.headers["X-Signature"]
?: return@post call.respondText("missing signature", status = io.ktor.http.HttpStatusCode.Unauthorized)
val expected = hmacSha256Hex(secret, rawBody)
if (!sig.equals(expected, ignoreCase = true)) {
return@post call.respondText("bad signature", status = io.ktor.http.HttpStatusCode.Unauthorized)
}
// Минимальная проверка: тело - валидный JSON
val jsonText = rawBody.toString(Charsets.UTF_8)
val payload = try {
Json.parseToJsonElement(jsonText)
} catch (e: Exception) {
return@post call.respondText("invalid json", status = io.ktor.http.HttpStatusCode.BadRequest)
}
// event_id берем из payload (частая практика), но источник выбираете по контракту провайдера
val eventId = payload.jsonObject["id"]?.jsonPrimitive?.content
?: return@post call.respondText("missing payload.id", status = io.ktor.http.HttpStatusCode.BadRequest)
// headers тоже складываем как JSON (для аудита и разбора инцидентов)
val headersMap = call.request.headers.entries().associate { it.key to it.value.joinToString(",") }
val headersJson = Json.encodeToString(JsonObject.serializer(), JsonObject(headersMap.mapValues { JsonPrimitive(it.value) }))
val result = insertInbox(
ds = ds,
provider = provider,
eventId = eventId,
headersJson = headersJson,
payloadJson = jsonText
)
// Факт: для вебхуков почти всегда корректно отвечать 2xx и на дубли тоже
// (провайдеру важно "получили", а не "обработали бизнес-логику")
call.respondText(if (result.inserted) "ok" else "dup", status = io.ktor.http.HttpStatusCode.OK)
}
}
}
suspend fun io.ktor.utils.io.ByteReadChannel.toByteArray(): ByteArray {
val packet = readRemaining()
return packet.readBytes()
}Почему "dup" все равно 200:
- вебхук протоколы обычно устроены так, что неудачный код ответа вызывает повторную доставку
- если вы на дубль ответите 409/500, вы сами себе создадите шторм ретраев
Факт про PostgreSQL:
- SELECT ... FOR UPDATE SKIP LOCKED позволяет нескольким воркерам безопасно забирать задачи, не блокируя друг друга
data class InboxRow(
val id: Long,
val provider: String,
val eventId: String,
val payloadJson: String,
val attempt: Int
)
fun fetchBatchForProcessing(ds: DataSource, limit: Int): List<InboxRow> {
val sql = """
select id, provider, event_id, payload::text, attempt
from webhook_inbox
where status = 0
and next_attempt_at <= now()
order by received_at
limit ?
for update skip locked
""".trimIndent()
ds.connection.use { c ->
c.autoCommit = false
try {
val rows = mutableListOf<InboxRow>()
c.prepareStatement(sql).use { ps ->
ps.setInt(1, limit)
val rs = ps.executeQuery()
while (rs.next()) {
rows += InboxRow(
id = rs.getLong(1),
provider = rs.getString(2),
eventId = rs.getString(3),
payloadJson = rs.getString(4),
attempt = rs.getInt(5),
)
}
}
// Помечаем как processing, чтобы при долгой обработке это было видно
if (rows.isNotEmpty()) {
val upd = "update webhook_inbox set status = 1 where id = any(?)"
c.prepareStatement(upd).use { ps ->
val arr = c.createArrayOf("bigint", rows.map { it.id }.toTypedArray())
ps.setArray(1, arr)
ps.executeUpdate()
}
}
c.commit()
return rows
} catch (e: Exception) {
c.rollback()
throw e
}
}
}import kotlin.math.min
import java.time.Duration
fun markDone(ds: DataSource, id: Long) {
val sql = """
update webhook_inbox
set status = 2, processed_at = now(), last_error = null
where id = ?
""".trimIndent()
ds.connection.use { c ->
c.prepareStatement(sql).use { ps ->
ps.setLong(1, id)
ps.executeUpdate()
}
}
}
fun markFail(ds: DataSource, id: Long, attempt: Int, error: String, maxAttempts: Int = 12) {
// Факт: backoff можно хранить как next_attempt_at, чтобы воркер просто выбирал ready задачи
val nextDelaySeconds = min(3600, 2.0.pow(attempt.coerceAtLeast(1)).toInt()) // ограничим 1 час
val toDead = attempt + 1 >= maxAttempts
val sql = if (!toDead) """
update webhook_inbox
set status = 0,
attempt = attempt + 1,
next_attempt_at = now() + make_interval(secs => ?),
last_error = ?
where id = ?
""".trimIndent() else """
update webhook_inbox
set status = 3,
attempt = attempt + 1,
last_error = ?
where id = ?
""".trimIndent()
ds.connection.use { c ->
c.prepareStatement(sql).use { ps ->
var i = 1
if (!toDead) ps.setInt(i++, nextDelaySeconds)
ps.setString(i++, error.take(4000))
ps.setLong(i, id)
ps.executeUpdate()
}
}
}
private fun Double.pow(n: Int): Double = Math.pow(this, n.toDouble())Здесь важная мысль: inbox дает дедуп по событию, но бизнес-эффект тоже должен быть идемпотентным.
Пример по фактам: если событие “payment_succeeded” должно создать запись в payments, то:
- либо payments имеет уникальный ключ по внешнему event_id (или provider_payment_id)
- либо делаете отдельную таблицу “processed_events” и вставляете туда event_id с unique constraint
- либо используете “upsert” в целевую таблицу
База - ваш лучший арбитр идемпотентности.
Пример: фиксируем обработанный event в отдельной таблице, и это гарантирует “ровно один раз” на уровне бизнес-эффекта:
create table processed_events (
provider text not null,
event_id text not null,
processed_at timestamptz not null default now(),
primary key (provider, event_id)
);В обработчике:
- в транзакции сначала вставляем (provider, event_id) в processed_events
- если конфликт - значит это дубль, выходим “как успех”
- если вставка прошла - применяем бизнес-изменения
Это железно, потому что primary key не даст сделать это дважды.
import kotlinx.coroutines.*
import javax.sql.DataSource
class InboxWorker(
private val ds: DataSource,
private val handler: suspend (InboxRow) -> Unit,
private val batchSize: Int = 50
) {
suspend fun run() = coroutineScope {
while (isActive) {
val batch = fetchBatchForProcessing(ds, batchSize)
if (batch.isEmpty()) {
delay(250)
continue
}
// Можно параллелить внутри батча, но аккуратно (и лучше ограничить семафором)
for (row in batch) {
try {
handler(row)
markDone(ds, row.id)
} catch (e: Exception) {
markFail(ds, row.id, row.attempt, e.message ?: e.toString())
}
}
}
}
}Что здесь важно по фактам:
- SKIP LOCKED дает масштабирование на несколько инстансов
- status/attempt/next_attempt_at дает управляемые ретраи
- DLQ статус 3 дает место для ручного разбора
Вот запросы, которые дают правду о системе:
Сколько в очереди готовых задач:
select count(*)
from webhook_inbox
where status = 0 and next_attempt_at <= now();Сколько зависло “processing” (обычно сигнал, что воркер упал между fetch и markDone):
select count(*)
from webhook_inbox
where status = 1;Топ ошибок:
select left(last_error, 200) as err, count(*)
from webhook_inbox
where status in (0,3) and last_error is not null
group by 1
order by 2 desc
limit 20;Возраст самого старого необработанного события (backlog latency):
select now() - min(received_at) as oldest_age
from webhook_inbox
where status in (0,1);- Event id не всегда уникален “глобально”Решение: уникальность делайте по (provider, event_id).
- Вы не можете доверять порядку доставкиРешение: бизнес-логика должна уметь принимать “поздние” события (например, сначала пришел refund, потом succeeded).
- Нельзя делать тяжелую обработку в HTTPРешение: inbox + воркер. Иначе вы сами создадите ретрай-шторм у провайдера.
- Нельзя отвечать ошибкой на дубльРешение: 2xx на дубль, дедуп внутри.
- Воркер может упасть в серединеРешение: status + retry. Иногда добавляют “lease” (processing_started_at) и requeue по таймауту, если status=1 висит слишком долго.
Если коротко, но по делу:
- идемпотентность по вебхукам почти всегда проще всего делать через уникальные ограничения в PostgreSQL
- прием отделяем от обработки, иначе получаем шторм ретраев и дубли
- FOR UPDATE SKIP LOCKED - нормальный, практичный способ делать очередь в БД на нескольких воркерах
- "ровно один раз" достигается не словами, а транзакцией + уникальными ключами в месте, где происходит бизнес-эффект
Есть задача, которая появляется внезапно и всегда "прямо сейчас":
- партнер присылает вебхуки (платежи, статусы, KYC, доставка, что угодно)
- иногда присылает один и тот же event 2-5 раз (ретраи - это норма)
- иногда присылает события не по порядку
- иногда сеть моргает, ваш сервис перезапускается, база на миг тормозит
- а бизнес хочет одно: "чтобы не было дублей и чтобы ничего не терялось"
Наивный подход “просто обработаем в контроллере” ломается быстро:
- обработка заняла 2-3 секунды, партнер ждал 1 секунду и ретраит
- вы получили дубли
- часть запросов упала, часть успела списать деньги, часть нет
- и потом вы неделю объясняете, почему у клиента два списания
Ниже - разбор по фактам и рабочий шаблон на Kotlin back-end: идемпотентный прием вебхуков и обработка через PostgreSQL (inbox pattern). Без магии, без "у меня где-то было", только то, что реально работает в проде у многих команд.
- “Прием” вебхука должен быть быстрым и стабильным
- вернуть 2xx как можно раньше
- не делать тяжелую бизнес-логику в HTTP-обработчике
- Идемпотентность по event_id
- один и тот же event можно получить много раз
- итоговый бизнес-эффект должен быть ровно один раз
- Очередь должна переживать рестарты и падения
- поэтому "очередь в памяти" не подходит
- используем БД как надежный журнал
- Обработка параллельная, но без гонок
- несколько воркеров/инстансов сервиса
- каждое событие должен обрабатывать только один воркер
- Наблюдаемость и управляемость
- видим backlog
- видим ретраи, ошибки, DLQ
- можем "переиграть" проблемные события
Разделяем на две части:
- HTTP endpoint принимает вебхук и кладет запись в таблицу webhook_inbox
- Фоновый воркер читает webhook_inbox батчами через FOR UPDATE SKIP LOCKED и выполняет бизнес-обработку
Ключевой момент: дедуп делается в момент вставки в inbox через уникальный индекс.
create table webhook_inbox (
id bigserial primary key,
provider text not null,
event_id text not null,
received_at timestamptz not null default now(),
headers jsonb not null,
payload jsonb not null,
status smallint not null default 0, -- 0-ready, 1-processing, 2-done, 3-dead
attempt int not null default 0,
next_attempt_at timestamptz not null default now(),
last_error text null,
processed_at timestamptz null
);
-- Идемпотентность: один event_id на provider только один раз
create unique index ux_webhook_inbox_provider_event
on webhook_inbox(provider, event_id);
-- Для выборки очереди воркером
create index ix_webhook_inbox_ready
on webhook_inbox(status, next_attempt_at, received_at);Факт, на котором держится вся идемпотентность:
- уникальный индекс (provider, event_id) + INSERT ... ON CONFLICT DO NOTHING
Это гарантирует, что при повторной доставке того же события вы не создадите вторую запись.
Ниже пример на Ktor. Он делает только то, что должен делать HTTP слой:
- читает тело как bytes (это важно для подписи)
- валидирует HMAC (пример)
- парсит JSON как JsonElement (минимальная валидация “это JSON”)
- вставляет в webhook_inbox идемпотентно
- возвращает 200
import java.util.Base64
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
fun hmacSha256Hex(secret: ByteArray, message: ByteArray): String {
val mac = Mac.getInstance("HmacSHA256")
mac.init(SecretKeySpec(secret, "HmacSHA256"))
val digest = mac.doFinal(message)
return digest.joinToString("") { "%02x".format(it) }
}import java.sql.Connection
import javax.sql.DataSource
data class InsertResult(val inserted: Boolean)
fun insertInbox(
ds: DataSource,
provider: String,
eventId: String,
headersJson: String,
payloadJson: String
): InsertResult {
ds.connection.use { c ->
c.autoCommit = false
try {
val sql = """
insert into webhook_inbox(provider, event_id, headers, payload)
values (?, ?, ?::jsonb, ?::jsonb)
on conflict (provider, event_id) do nothing
""".trimIndent()
val inserted = c.prepareStatement(sql).use { ps ->
ps.setString(1, provider)
ps.setString(2, eventId)
ps.setString(3, headersJson)
ps.setString(4, payloadJson)
ps.executeUpdate() == 1
}
c.commit()
return InsertResult(inserted)
} catch (e: Exception) {
c.rollback()
throw e
}
}
}import io.ktor.server.application.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.serialization.json.*
fun Application.webhookRoutes(
ds: javax.sql.DataSource,
secrets: Map<String, ByteArray> // provider -> secret
) {
routing {
post("/webhooks/{provider}") {
val provider = call.parameters["provider"] ?: return@post call.respondText(
"provider required",
status = io.ktor.http.HttpStatusCode.BadRequest
)
val secret = secrets[provider] ?: return@post call.respondText(
"unknown provider",
status = io.ktor.http.HttpStatusCode.NotFound
)
val rawBody = call.receiveChannel().toByteArray()
// Пример: заголовок X-Signature = hex(hmacSha256(body))
val sig = call.request.headers["X-Signature"]
?: return@post call.respondText("missing signature", status = io.ktor.http.HttpStatusCode.Unauthorized)
val expected = hmacSha256Hex(secret, rawBody)
if (!sig.equals(expected, ignoreCase = true)) {
return@post call.respondText("bad signature", status = io.ktor.http.HttpStatusCode.Unauthorized)
}
// Минимальная проверка: тело - валидный JSON
val jsonText = rawBody.toString(Charsets.UTF_8)
val payload = try {
Json.parseToJsonElement(jsonText)
} catch (e: Exception) {
return@post call.respondText("invalid json", status = io.ktor.http.HttpStatusCode.BadRequest)
}
// event_id берем из payload (частая практика), но источник выбираете по контракту провайдера
val eventId = payload.jsonObject["id"]?.jsonPrimitive?.content
?: return@post call.respondText("missing payload.id", status = io.ktor.http.HttpStatusCode.BadRequest)
// headers тоже складываем как JSON (для аудита и разбора инцидентов)
val headersMap = call.request.headers.entries().associate { it.key to it.value.joinToString(",") }
val headersJson = Json.encodeToString(JsonObject.serializer(), JsonObject(headersMap.mapValues { JsonPrimitive(it.value) }))
val result = insertInbox(
ds = ds,
provider = provider,
eventId = eventId,
headersJson = headersJson,
payloadJson = jsonText
)
// Факт: для вебхуков почти всегда корректно отвечать 2xx и на дубли тоже
// (провайдеру важно "получили", а не "обработали бизнес-логику")
call.respondText(if (result.inserted) "ok" else "dup", status = io.ktor.http.HttpStatusCode.OK)
}
}
}
suspend fun io.ktor.utils.io.ByteReadChannel.toByteArray(): ByteArray {
val packet = readRemaining()
return packet.readBytes()
}Почему "dup" все равно 200:
- вебхук протоколы обычно устроены так, что неудачный код ответа вызывает повторную доставку
- если вы на дубль ответите 409/500, вы сами себе создадите шторм ретраев
Факт про PostgreSQL:
- SELECT ... FOR UPDATE SKIP LOCKED позволяет нескольким воркерам безопасно забирать задачи, не блокируя друг друга
data class InboxRow(
val id: Long,
val provider: String,
val eventId: String,
val payloadJson: String,
val attempt: Int
)
fun fetchBatchForProcessing(ds: DataSource, limit: Int): List<InboxRow> {
val sql = """
select id, provider, event_id, payload::text, attempt
from webhook_inbox
where status = 0
and next_attempt_at <= now()
order by received_at
limit ?
for update skip locked
""".trimIndent()
ds.connection.use { c ->
c.autoCommit = false
try {
val rows = mutableListOf<InboxRow>()
c.prepareStatement(sql).use { ps ->
ps.setInt(1, limit)
val rs = ps.executeQuery()
while (rs.next()) {
rows += InboxRow(
id = rs.getLong(1),
provider = rs.getString(2),
eventId = rs.getString(3),
payloadJson = rs.getString(4),
attempt = rs.getInt(5),
)
}
}
// Помечаем как processing, чтобы при долгой обработке это было видно
if (rows.isNotEmpty()) {
val upd = "update webhook_inbox set status = 1 where id = any(?)"
c.prepareStatement(upd).use { ps ->
val arr = c.createArrayOf("bigint", rows.map { it.id }.toTypedArray())
ps.setArray(1, arr)
ps.executeUpdate()
}
}
c.commit()
return rows
} catch (e: Exception) {
c.rollback()
throw e
}
}
}import kotlin.math.min
import java.time.Duration
fun markDone(ds: DataSource, id: Long) {
val sql = """
update webhook_inbox
set status = 2, processed_at = now(), last_error = null
where id = ?
""".trimIndent()
ds.connection.use { c ->
c.prepareStatement(sql).use { ps ->
ps.setLong(1, id)
ps.executeUpdate()
}
}
}
fun markFail(ds: DataSource, id: Long, attempt: Int, error: String, maxAttempts: Int = 12) {
// Факт: backoff можно хранить как next_attempt_at, чтобы воркер просто выбирал ready задачи
val nextDelaySeconds = min(3600, 2.0.pow(attempt.coerceAtLeast(1)).toInt()) // ограничим 1 час
val toDead = attempt + 1 >= maxAttempts
val sql = if (!toDead) """
update webhook_inbox
set status = 0,
attempt = attempt + 1,
next_attempt_at = now() + make_interval(secs => ?),
last_error = ?
where id = ?
""".trimIndent() else """
update webhook_inbox
set status = 3,
attempt = attempt + 1,
last_error = ?
where id = ?
""".trimIndent()
ds.connection.use { c ->
c.prepareStatement(sql).use { ps ->
var i = 1
if (!toDead) ps.setInt(i++, nextDelaySeconds)
ps.setString(i++, error.take(4000))
ps.setLong(i, id)
ps.executeUpdate()
}
}
}
private fun Double.pow(n: Int): Double = Math.pow(this, n.toDouble())Здесь важная мысль: inbox дает дедуп по событию, но бизнес-эффект тоже должен быть идемпотентным.
Пример по фактам: если событие “payment_succeeded” должно создать запись в payments, то:
- либо payments имеет уникальный ключ по внешнему event_id (или provider_payment_id)
- либо делаете отдельную таблицу “processed_events” и вставляете туда event_id с unique constraint
- либо используете “upsert” в целевую таблицу
База - ваш лучший арбитр идемпотентности.
Пример: фиксируем обработанный event в отдельной таблице, и это гарантирует “ровно один раз” на уровне бизнес-эффекта:
create table processed_events (
provider text not null,
event_id text not null,
processed_at timestamptz not null default now(),
primary key (provider, event_id)
);В обработчике:
- в транзакции сначала вставляем (provider, event_id) в processed_events
- если конфликт - значит это дубль, выходим “как успех”
- если вставка прошла - применяем бизнес-изменения
Это железно, потому что primary key не даст сделать это дважды.
import kotlinx.coroutines.*
import javax.sql.DataSource
class InboxWorker(
private val ds: DataSource,
private val handler: suspend (InboxRow) -> Unit,
private val batchSize: Int = 50
) {
suspend fun run() = coroutineScope {
while (isActive) {
val batch = fetchBatchForProcessing(ds, batchSize)
if (batch.isEmpty()) {
delay(250)
continue
}
// Можно параллелить внутри батча, но аккуратно (и лучше ограничить семафором)
for (row in batch) {
try {
handler(row)
markDone(ds, row.id)
} catch (e: Exception) {
markFail(ds, row.id, row.attempt, e.message ?: e.toString())
}
}
}
}
}Что здесь важно по фактам:
- SKIP LOCKED дает масштабирование на несколько инстансов
- status/attempt/next_attempt_at дает управляемые ретраи
- DLQ статус 3 дает место для ручного разбора
Вот запросы, которые дают правду о системе:
Сколько в очереди готовых задач:
select count(*)
from webhook_inbox
where status = 0 and next_attempt_at <= now();Сколько зависло “processing” (обычно сигнал, что воркер упал между fetch и markDone):
select count(*)
from webhook_inbox
where status = 1;Топ ошибок:
select left(last_error, 200) as err, count(*)
from webhook_inbox
where status in (0,3) and last_error is not null
group by 1
order by 2 desc
limit 20;Возраст самого старого необработанного события (backlog latency):
select now() - min(received_at) as oldest_age
from webhook_inbox
where status in (0,1);- Event id не всегда уникален “глобально”Решение: уникальность делайте по (provider, event_id).
- Вы не можете доверять порядку доставкиРешение: бизнес-логика должна уметь принимать “поздние” события (например, сначала пришел refund, потом succeeded).
- Нельзя делать тяжелую обработку в HTTPРешение: inbox + воркер. Иначе вы сами создадите ретрай-шторм у провайдера.
- Нельзя отвечать ошибкой на дубльРешение: 2xx на дубль, дедуп внутри.
- Воркер может упасть в серединеРешение: status + retry. Иногда добавляют “lease” (processing_started_at) и requeue по таймауту, если status=1 висит слишком долго.
Если коротко, но по делу:
- идемпотентность по вебхукам почти всегда проще всего делать через уникальные ограничения в PostgreSQL
- прием отделяем от обработки, иначе получаем шторм ретраев и дубли
- FOR UPDATE SKIP LOCKED - нормальный, практичный способ делать очередь в БД на нескольких воркерах
- "ровно один раз" достигается не словами, а транзакцией + уникальными ключами в месте, где происходит бизнес-эффект