🍢

INSERT INTO ON CONFLICT DO UPDATEでDeadlockした話

2024/01/27に公開

背景

INSERT INTO ON CONFLICT DO UPDATEを使ったSQLでDeadlockが発生しました。最初このSQLがINSERTとUPDATEが絡まってておかしな挙動をしているので、使わない方が良いのかと悩みましたが完全に勘違いしていました。

紐解いてみると以下の2つのトランザクションが同時に実行されていただけでした。

BEGIN;
UPDATE t_tests SET val = val + 1 WHERE id = 1;
UPDATE t_tests SET val = val + 1 WHERE id = 2;
COMMIT;
BEGIN;
UPDATE t_tests SET val = val + 1 WHERE id = 2;
UPDATE t_tests SET val = val + 1 WHERE id = 1;
COMMIT;

INSERT INTO ON CONFLICT DO UPDATEは本家のドキュメントにあるように原子的な結果を保証しますので安心して使えます。

起こった状況

問題を起こしたストアードプロシージャーは以下のようなものです。

CREATE TABLE t_tests (
    id BIGINT PRIMARY KEY,
    val BIGINT
);

CREATE OR REPLACE FUNCTION sample_test(
  p_id_array BIGINT[] DEFAULT NULL
) RETURNS VOID AS $FUNCTION$
DECLARE
BEGIN
  FOR i IN 1..COALESCE(array_length(p_id_array, 1), 0)
  LOOP
    INSERT INTO t_tests AS t1 (id, val) VALUES (p_id_array[i], 1)
    ON CONFLICT (id) DO UPDATE SET val = t1.val + 1;
  END LOOP;
END;
$FUNCTION$ LANGUAGE plpgsql;

ここでアプリケーションからはp_id_arrayが順不同で呼び出されます。単純化してidは1か2とすると、同時に(1, 2), (2, 1)として呼び出されることがあります。
既にINSERT済の場合どちらも通常のUPDATE文として作用します。

よって先程記述した状況が発生します。

BEGIN;
UPDATE t_tests SET val = val + 1 WHERE id = 1;
UPDATE t_tests SET val = val + 1 WHERE id = 2;
COMMIT;
BEGIN;
UPDATE t_tests SET val = val + 1 WHERE id = 2;
UPDATE t_tests SET val = val + 1 WHERE id = 1;
COMMIT;

絵に書いたようなDeadlockの状況が発生しました。

まとめ

INSERT INTO ON CONFLICT DO UPDATE自体は普通のUPDATE文と同じように動作します。複数行のUPDATE文が行われる場合はDeadlockの可能性があるので注意して使う必要があります。一見UPDATE文がなさそうでも油断しないのが良さそうです。

おまけ

動かしたコード

docker-compose.yaml
services:
  db:
    image: postgres:12
    environment:
      - POSTGRES_DB=web
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - TZ=Asia/Tokyo
      - PGTZ=Asia/Tokyo
    ports:
      - 5432:5432
    volumes:
      - insert_postgresql_data:/var/lib/postgresql/data
      - ./sql/max_conns.sql:/docker-entrypoint-initdb.d/max_conns.sql

  service:
    build: 
      context: .
    environment:
      CARGO_TARGET_DIR: /tmp/target
      CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
      PG_URL: postgres://user:pass@db:5432/web
    volumes:
      - .:/app
      - insert_cargo_cache:/usr/local/cargo/registry
      - insert_target_cache:/tmp/target
    ports:
      - 3000:3000
    depends_on:
      - db
    tty: true
    working_dir: /app
    security_opt:
      - seccomp:unconfined
  

volumes:
  insert_cargo_cache:
  insert_target_cache:
  insert_postgresql_data:
sql/max_conns.sql
ALTER SYSTEM SET max_connections = 1000;
Dockerfile
FROM rust:1.75

RUN apt -y update && apt -y install musl-tools libssl-dev pkg-config build-essential

RUN rustup update && \
  cargo install cargo-watch && \
  rustup component add rustfmt clippy
table.sql
CREATE TABLE t_tests (
    id BIGINT PRIMARY KEY,
    val BIGINT
)
sp.sql
DROP TYPE IF EXISTS type_sample_test CASCADE;
CREATE TYPE type_sample_test AS (
);

CREATE OR REPLACE FUNCTION sample_test(
  p_id_array BIGINT[] DEFAULT NULL
) RETURNS SETOF type_sample_test AS $FUNCTION$
DECLARE
BEGIN
  FOR i IN 1..COALESCE(array_length(p_id_array, 1), 0)
  LOOP
    INSERT INTO t_tests AS t1 (id, val) VALUES (p_id_array[i], 1)
    ON CONFLICT (id) DO UPDATE SET val = t1.val + 1;
  END LOOP;
END;
$FUNCTION$ LANGUAGE plpgsql;
Cargo.toml
[package]
name = "insert"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
deadpool-postgres = "0.12.1"
postgres-types = {version="0.2", features=[
    "derive",
    "with-serde_json-1",
]}
serde = {version="1", features=["derive"]}
serde_json = "1"
tokio = {version="1", features=["macros", "rt-multi-thread"]}
tokio-postgres = "0.7"
url = "2.5.0"
rand = "0.8.5"
main.rs
use rand::{Rng, SeedableRng};

async fn execute(
    client: &tokio_postgres::Client,
) -> anyhow::Result<()> {
    let sql = r#"
        SELECT sample_test(p_id_array := $1)
    "#;
    let mut rng = rand::rngs::StdRng::from_entropy();
    let id_array: Vec<i64> = if (rng.gen::<u8>() % 2) == 0 {
        vec![1,2]
    } else {
        vec![2,1]
    };
    let _ = client
        .execute(sql, &[&id_array])
        .await?;
    Ok(())
}

pub fn get_postgres_pool(url: &str) -> anyhow::Result<deadpool_postgres::Pool> {
    let pg_url = url::Url::parse(url)?;
    let dbname = match pg_url.path_segments() {
        Some(mut res) => res.next(),
        None => Some("web"),
    };
    let cfg = deadpool_postgres::Config {
        user: Some(pg_url.username().to_string()),
        password: pg_url.password().map(|password| password.to_string()),
        dbname: dbname.map(|dbname| dbname.to_string()),
        host: pg_url.host_str().map(|host| host.to_string()),
        ..Default::default()
    };
    let res = cfg.create_pool(Some(deadpool_postgres::Runtime::Tokio1), tokio_postgres::NoTls)?;
    Ok(res)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut handlers = vec![];
    for _ in 0..100 {
        let handler = tokio::spawn(async move {
            let pg_url = std::env::var("PG_URL").unwrap();
            let pool = get_postgres_pool(pg_url.as_str()).unwrap();
            if let Ok(client) = pool.get().await {
                match execute(&client).await {
                    Ok(_) => {}
                    Err(e) => println!("error: {}", e),
                }
            }
        });
        handlers.push(handler);
    }
    for handler in handlers {
        handler.await?;
    }
    Ok(())
}

Discussion