Closed24

postgresの`COPY FROM STDIN`をRustで使う

tunamagurotunamaguro

tokio-postgresとsqlxで、COPY FROMの使い方を調べる

tunamagurotunamaguro

使うスキーマ

CREATE TABLE authors (
          id   BIGSERIAL PRIMARY KEY,
          name text      NOT NULL,
          bio  text
);
tunamagurotunamaguro

TEXT形式での書き込みは、こんな感じでできた

#[cfg(test)]
mod tests {
    use super::queries;
    use test_context::{futures::SinkExt, test_context};
    use test_utils::PgTokioTestContext;
    use tokio_postgres::CopyInSink;

    async fn migrate_db(client: &tokio_postgres::Client) {
        client
            .batch_execute(include_str!("../schema.sql"))
            .await
            .unwrap();
    }

    #[test_context(PgTokioTestContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut PgTokioTestContext) {
        let client = &ctx.client;

        migrate_db(client).await;

        let sink: CopyInSink<bytes::Bytes> = client
            .copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT TEXT)")
            .await
            .unwrap();
        let mut sink = Box::pin(sink);
        let data = [
            ["123", "Foo", "bio"].join("\t"),
            ["456", "Bar", "\\N"].join("\t"),
        ]
        .join("\n");

        sink.send(data.into()).await.unwrap();
        sink.close().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }
}
tunamagurotunamaguro

csv形式も同様にSinkExtから書き込む

    #[test_context(PgTokioTestContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut PgTokioTestContext) {
        let client = &ctx.client;

        migrate_db(client).await;

        let sink: CopyInSink<bytes::Bytes> = client
            .copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT CSV, HEADER)")
            .await
            .unwrap();
        let mut sink = Box::pin(sink);
        let data = [
            ["id", "name", "bio"].join(","),
            ["123", "Foo", "bio"].join(","),
            ["456", "Bar", ""].join(","),
        ]
        .join("\n");

        sink.send(data.into()).await.unwrap();
        sink.close().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }
    ```
tunamagurotunamaguro

Binary版

    #[test_context(PgTokioTestContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut PgTokioTestContext) {
        use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
        let client = &ctx.client;

        migrate_db(client).await;

        let sink = client
            .copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();
        let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TEXT, Type::TEXT]);
        tokio::pin!(writer);

        writer
            .as_mut()
            .write(&[&123_i64, &"Foo", &"bio"])
            .await
            .unwrap();
        writer
            .as_mut()
            .write(&[&456_i64, &"Bar", &None::<&str>])
            .await
            .unwrap();

        writer.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }
    ```
tunamagurotunamaguro

postgres-typesによるとBIGSERIALはRustのi64と対応しているため、型指定をしないとi32が使われてエラーになる

https://docs.rs/tokio-postgres/latest/tokio_postgres/types/trait.ToSql.html#types

エラーになるパターン

    #[test_context(PgTokioTestContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut PgTokioTestContext) {
        use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
        let client = &ctx.client;

        migrate_db(client).await;

        let sink = client
            .copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();
        let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TEXT, Type::TEXT]);
        tokio::pin!(writer);

        writer
            .as_mut()
            .write(&[&123, &"Foo", &"bio"]) // i64のアノテーションが必須
            .await
            .unwrap();
        writer
            .as_mut()
            .write(&[&456, &"Bar", &None::<&str>]) // i64のアノテーションが必須
            .await
            .unwrap();

        writer.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }
running 1 test
test tests::test_copy_from ... FAILED

failures:

---- tests::test_copy_from stdout ----

thread 'tests::test_copy_from' panicked at examples/authors/src/lib.rs:37:14:
called `Result::unwrap()` on an `Err` value: Error { kind: ToSql(0), cause: Some(WrongType { postgres: Int8, rust: "i32" }) }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    tests::test_copy_from

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.13s
tunamagurotunamaguro

Types::INT4で送ろうとしても当然エラーになる

    #[test_context(PgTokioTestContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut PgTokioTestContext) {
        use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
        let client = &ctx.client;

        migrate_db(client).await;

        let sink = client
            .copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();
        let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT, Type::TEXT]);
        tokio::pin!(writer);

        writer
            .as_mut()
            .write(&[&123, &"Foo", &"bio"])
            .await
            .unwrap();
        writer
            .as_mut()
            .write(&[&456, &"Bar", &None::<&str>])
            .await
            .unwrap();

        writer.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(client)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }

running 1 test
test tests::test_copy_from ... FAILED

failures:

---- tests::test_copy_from stdout ----

thread 'tests::test_copy_from' panicked at examples/authors/src/lib.rs:44:31:
called `Result::unwrap()` on an `Err` value: Error { kind: Db, cause: Some(DbError { severity: "ERROR", parsed_severity: Some(Error), code: SqlState(E08P01), message: "insufficient data left in message", detail: None, hint: None, position: None, where_: Some("COPY authors, line 1, column id"), schema: None, table: None, column: None, datatype: None, constraint: None, file: Some("pqformat.c"), line: Some(531), routine: Some("pq_copymsgbytes") }) }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    tests::test_copy_from

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.13s
tunamagurotunamaguro

binary形式オプションにより、すべてのデータはテキストではなくバイナリ形式で書き込み/読み取りされるようになります。 テキストやCSV形式よりも多少高速になりますが、バイナリ形式のファイルはマシンアーキテクチャやPostgreSQLのバージョンをまたがる移植性が落ちます。 またバイナリ形式はデータ型に非常に依存します。 たとえば、smallint列からバイナリデータを出力し、それをinteger列として読み込むことはできません。同じことをテキスト形式で実行すれば動作するのですが。

binaryファイルの形式は、ファイルヘッダ、行データを含む0以上のタプル、ファイルトレーラから構成されます。 ヘッダとデータはネットワークバイトオーダです。

確かにドキュメントにはそんな感じのことが書いてある

tunamagurotunamaguro

sqlxにはPgCopyIn構造体経由で書き込みができる

https://docs.rs/sqlx/latest/sqlx/postgres/struct.PgCopyIn.html

tunamagurotunamaguro

とりあえずText形式の書き込みを試したらできた

    #[test_context(SqlxPgContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut SqlxPgContext) {
        let pool = &mut ctx.pool;

        sqlx::raw_sql(include_str!("../schema.sql"))
            .execute(&*pool)
            .await
            .unwrap();

        let mut copy_in = pool
            .copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT TEXT)")
            .await
            .unwrap();

        let data = [
            ["123", "Foo", "bio"].join("\t"),
            ["456", "Bar", "\\N"].join("\t"),
        ]
        .join("\n");

        copy_in.send(data.as_bytes()).await.unwrap();
        copy_in.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }

tunamagurotunamaguro

CSV形式もそのまま入る

    #[test_context(SqlxPgContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut SqlxPgContext) {
        let pool = &mut ctx.pool;

        sqlx::raw_sql(include_str!("../schema.sql"))
            .execute(&*pool)
            .await
            .unwrap();

        let mut copy_in = pool
            .copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT CSV)")
            .await
            .unwrap();

        let data = [
            ["123", "Foo", "bio"].join(","),
            ["456", "Bar", ""].join(","),
        ]
        .join("\n");

        copy_in.send(data.as_bytes()).await.unwrap();
        copy_in.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }
    ```
tunamagurotunamaguro

sqlxでバイナリ形式のデータを送る方法が分からない。Encodeがそれ用だと思ったけど違う?
とりあえず、ヘッダとトレイラだけ実装

    #[test_context(SqlxPgContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut SqlxPgContext) {
        use sqlx::Encode;
        let pool = &mut ctx.pool;

        sqlx::raw_sql(include_str!("../schema.sql"))
            .execute(&*pool)
            .await
            .unwrap();

        let mut copy_in = pool
            .copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();

        let mut buf = Vec::new();
        const COPY_SIGNATURE: &[u8] = &[
            b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
            0xFF,  // \377 (8進数) = 0xFF (16進数)
            b'\r', b'\n', // "\r\n"
            0x00,  // \0
        ];
        const COPY_TRAILER: &[u8] = &((-1_i16).to_be_bytes());

        assert_eq!(COPY_SIGNATURE.len(), 11);
        buf.extend_from_slice(COPY_SIGNATURE); // 署名
        buf.extend_from_slice(&(0_i32.to_be_bytes())); // フラグフィールド
        buf.extend_from_slice(&(0_i32.to_be_bytes())); // ヘッダ拡張領域長

        let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];

        buf.extend_from_slice(COPY_TRAILER);
        copy_in.send(buf.as_ref()).await.unwrap();
        copy_in.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }
tunamagurotunamaguro

手動で組み立てして送れた

    #[test_context(SqlxPgContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut SqlxPgContext) {
        let pool = &mut ctx.pool;

        sqlx::raw_sql(include_str!("../schema.sql"))
            .execute(&*pool)
            .await
            .unwrap();

        let mut copy_in = pool
            .copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();

        let mut buf = Vec::new();
        const COPY_SIGNATURE: &[u8] = &[
            b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
            0xFF,  // \377 (8進数) = 0xFF (16進数)
            b'\r', b'\n', // "\r\n"
            0x00,  // \0
        ];
        const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();

        assert_eq!(COPY_SIGNATURE.len(), 11);
        buf.extend_from_slice(COPY_SIGNATURE); // 署名
        buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
        buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長

        let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];

        for (id, name, bio) in data {
            buf.extend_from_slice(&3_i16.to_be_bytes()); // フィールド数

            buf.extend_from_slice(&8_i32.to_be_bytes()); // データ長
            buf.extend_from_slice(&id.to_be_bytes()); // データ

            let name_bytes = name.as_bytes();
            buf.extend_from_slice(&(name_bytes.len() as i32).to_be_bytes()); // データ長
            buf.extend_from_slice(name_bytes); // データ

            match bio {
                Some(bio) => {
                    let bio_bytes = bio.as_bytes();
                    buf.extend_from_slice(&(bio_bytes.len() as i32).to_be_bytes());
                    buf.extend_from_slice(bio_bytes);
                }
                None => {
                    buf.extend_from_slice(&((-1_i32).to_be_bytes()));
                }
            }
        }

        buf.extend_from_slice(COPY_TRAILER);
        copy_in.send(buf.as_ref()).await.unwrap();
        copy_in.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }

tunamagurotunamaguro

PgArgumentsBufferを使えばできた

    #[test_context(SqlxPgContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut SqlxPgContext) {
        let pool = &mut ctx.pool;

        sqlx::raw_sql(include_str!("../schema.sql"))
            .execute(&*pool)
            .await
            .unwrap();

        let mut copy_in = pool
            .copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();

        let mut buf = Vec::new();
        const COPY_SIGNATURE: &[u8] = &[
            b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
            0xFF,  // \377 (8進数) = 0xFF (16進数)
            b'\r', b'\n', // "\r\n"
            0x00,  // \0
        ];
        const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();

        assert_eq!(COPY_SIGNATURE.len(), 11);
        buf.extend_from_slice(COPY_SIGNATURE); // 署名
        buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
        buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長

        let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];

        let mut data_buf = PgArgumentBuffer::default();
        for (id, name, bio) in data {
            buf.extend(3_i16.to_be_bytes());

            // id
            let is_null =
                sqlx::Encode::<sqlx::Postgres>::encode_by_ref(&id, &mut data_buf).unwrap();

            match is_null {
                sqlx::encode::IsNull::Yes => {
                    buf.extend((-1_i32).to_be_bytes());
                }
                sqlx::encode::IsNull::No => {
                    buf.extend((data_buf.len() as i32).to_be_bytes());
                    buf.extend_from_slice(data_buf.as_slice());
                }
            }

            data_buf.clear();

            // name
            let is_null =
                sqlx::Encode::<sqlx::Postgres>::encode_by_ref(&name, &mut data_buf).unwrap();

            match is_null {
                sqlx::encode::IsNull::Yes => {
                    buf.extend((-1_i32).to_be_bytes());
                }
                sqlx::encode::IsNull::No => {
                    buf.extend((data_buf.len() as i32).to_be_bytes());
                    buf.extend_from_slice(data_buf.as_slice());
                }
            }

            data_buf.clear();

            //  bio
            let is_null =
                sqlx::Encode::<sqlx::Postgres>::encode_by_ref(&bio, &mut data_buf).unwrap();

            match is_null {
                sqlx::encode::IsNull::Yes => {
                    buf.extend((-1_i32).to_be_bytes());
                }
                sqlx::encode::IsNull::No => {
                    buf.extend((data_buf.len() as i32).to_be_bytes());
                    buf.extend_from_slice(data_buf.as_slice());
                }
            }

            data_buf.clear();
        }

        buf.extend_from_slice(COPY_TRAILER);
        copy_in.send(buf.as_ref()).await.unwrap();
        copy_in.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }

tunamagurotunamaguro

適当にラップしてみる。

    struct CopyDataBuffer {
        inner: PgArgumentBuffer,
        data_buf: Vec<u8>,
    }

    type BoxError = Box<dyn std::error::Error + 'static + Send + Sync>;

    impl CopyDataBuffer {
        fn new() -> Self {
            CopyDataBuffer {
                inner: Default::default(),
                data_buf: Default::default(),
            }
        }

        fn add<'q, T>(&mut self, value: T) -> Result<(), BoxError>
        where
            T: Encode<'q, Postgres> + Type<Postgres>,
        {
            let is_null = value.encode_by_ref(&mut self.inner)?;

            match is_null {
                sqlx::encode::IsNull::Yes => {
                    self.data_buf.extend((-1_i32).to_be_bytes());
                }
                sqlx::encode::IsNull::No => {
                    self.data_buf
                        .extend((self.inner.len() as i32).to_be_bytes());
                    self.data_buf.extend_from_slice(self.inner.as_slice());
                }
            }

            self.inner.clear();

            Ok(())
        }

        fn clear(&mut self) {
            self.inner.clear();
            self.data_buf.clear();
        }

        fn data_slice(&self) -> &[u8] {
            &self.data_buf
        }
    }

    #[test_context(SqlxPgContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut SqlxPgContext) {
        let pool = &mut ctx.pool;

        sqlx::raw_sql(include_str!("../schema.sql"))
            .execute(&*pool)
            .await
            .unwrap();

        let mut copy_in = pool
            .copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();

        let mut buf = Vec::new();
        const COPY_SIGNATURE: &[u8] = &[
            b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
            0xFF,  // \377 (8進数) = 0xFF (16進数)
            b'\r', b'\n', // "\r\n"
            0x00,  // \0
        ];
        const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();

        assert_eq!(COPY_SIGNATURE.len(), 11);
        buf.extend_from_slice(COPY_SIGNATURE); // 署名
        buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
        buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長

        let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];

        let mut data_buf = CopyDataBuffer::new();
        for (id, name, bio) in data {
            data_buf.clear();
            buf.extend(3_i16.to_be_bytes());

            data_buf.add(id).unwrap();
            data_buf.add(name).unwrap();
            data_buf.add(bio).unwrap();

            buf.extend_from_slice(data_buf.data_slice());
        }

        buf.extend_from_slice(COPY_TRAILER);
        copy_in.send(buf.as_ref()).await.unwrap();
        copy_in.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);
    }
    ```
tunamagurotunamaguro

tokio-postgresBinaryCopyWriter

    struct CopyDataSink {
        encode_buf: PgArgumentBuffer,
        data_buf: Vec<u8>,
        copy_in: PgCopyIn<PoolConnection<Postgres>>,
        num_column: i16,
    }

    type BoxError = Box<dyn std::error::Error + 'static + Send + Sync>;

    impl CopyDataSink {
        fn new(copy_in: PgCopyIn<PoolConnection<Postgres>>, num_column: i16) -> Self {
            let mut data_buf = Vec::with_capacity(1024);
            const COPY_SIGNATURE: &[u8] = &[
                b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
                0xFF,  // \377 (8進数) = 0xFF (16進数)
                b'\r', b'\n', // "\r\n"
                0x00,  // \0
            ];

            assert_eq!(COPY_SIGNATURE.len(), 11);
            data_buf.extend_from_slice(COPY_SIGNATURE); // 署名
            data_buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
            data_buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長

            CopyDataSink {
                encode_buf: Default::default(),
                data_buf,
                copy_in,
                num_column,
            }
        }

        async fn send(&mut self) -> Result<(), BoxError> {
            let _copy_in = self.copy_in.send(self.data_buf.as_slice()).await?;

            self.data_buf.clear();
            Ok(())
        }

        async fn finish(mut self) -> Result<u64, BoxError> {
            const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();

            self.data_buf.extend(COPY_TRAILER);
            self.send().await?;
            self.copy_in.finish().await.map_err(|e| e.into())
        }

        fn insert_row(&mut self) {
            self.data_buf.extend(self.num_column.to_be_bytes());
        }

        async fn add<'q, T>(&mut self, value: T) -> Result<(), BoxError>
        where
            T: Encode<'q, Postgres> + Type<Postgres>,
        {
            let is_null = value.encode_by_ref(&mut self.encode_buf)?;

            match is_null {
                sqlx::encode::IsNull::Yes => {
                    self.data_buf.extend((-1_i32).to_be_bytes());
                }
                sqlx::encode::IsNull::No => {
                    self.data_buf
                        .extend((self.encode_buf.len() as i32).to_be_bytes());
                    self.data_buf.extend_from_slice(self.encode_buf.as_slice());
                }
            }

            self.encode_buf.clear();

            if self.data_buf.len() > 4096 {
                self.send().await?;
            }

            Ok(())
        }
    }

    #[test_context(SqlxPgContext)]
    #[tokio::test]
    async fn test_copy_from(ctx: &mut SqlxPgContext) {
        let pool = &mut ctx.pool;

        sqlx::raw_sql(include_str!("../schema.sql"))
            .execute(&*pool)
            .await
            .unwrap();

        let copy_in = pool
            .copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
            .await
            .unwrap();

        let data = [
            (123_i64, "Foo", Some("bio")),
            (456_i64, "Bar", None),
            (789_i64, "Hoge", Some("Fuga")),
        ];

        let mut sink = CopyDataSink::new(copy_in, 3);
        for (id, name, bio) in data {
            sink.insert_row();
            sink.add(id).await.unwrap();
            sink.add(name).await.unwrap();
            sink.add(bio).await.unwrap();
        }
        sink.finish().await.unwrap();

        let author0 = queries::GetAuthor::builder()
            .id(123)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author0.id, 123);
        assert_eq!(author0.name, "Foo");
        assert_eq!(author0.bio, Some("bio".into()));

        let author1 = queries::GetAuthor::builder()
            .id(456)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author1.id, 456);
        assert_eq!(author1.name, "Bar");
        assert_eq!(author1.bio, None);

        let author2 = queries::GetAuthor::builder()
            .id(789)
            .build()
            .query_one(&*pool)
            .await
            .unwrap();
        assert_eq!(author2.id, 789);
        assert_eq!(author2.name, "Hoge");
        assert_eq!(author2.bio, Some("Fuga".into()));
    }
    ```
tunamagurotunamaguro

tokio-postgressqlxでの動作確認ができたので、これで終わり

このスクラップは4ヶ月前にクローズされました