postgresの`COPY FROM STDIN`をRustで使う
tokio-postgresとsqlxで、COPY FROMの使い方を調べる
tokio-postgresはCopyInSinkという構造体を経由して書き込むらしい
使うスキーマ
CREATE TABLE authors (
id BIGSERIAL PRIMARY KEY,
name text NOT NULL,
bio text
);
TEXT形式での書き込みは、こんな感じでできた
#[cfg(test)]
mod tests {
use super::queries;
use test_context::{futures::SinkExt, test_context};
use test_utils::PgTokioTestContext;
use tokio_postgres::CopyInSink;
async fn migrate_db(client: &tokio_postgres::Client) {
client
.batch_execute(include_str!("../schema.sql"))
.await
.unwrap();
}
#[test_context(PgTokioTestContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut PgTokioTestContext) {
let client = &ctx.client;
migrate_db(client).await;
let sink: CopyInSink<bytes::Bytes> = client
.copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT TEXT)")
.await
.unwrap();
let mut sink = Box::pin(sink);
let data = [
["123", "Foo", "bio"].join("\t"),
["456", "Bar", "\\N"].join("\t"),
]
.join("\n");
sink.send(data.into()).await.unwrap();
sink.close().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
}
CopyInSinkに対して、futures::SinkExtを経由してデータを書き込める
SinkExtはSinkが実装されているものに対して自動実装
csv形式も同様にSinkExtから書き込む
#[test_context(PgTokioTestContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut PgTokioTestContext) {
let client = &ctx.client;
migrate_db(client).await;
let sink: CopyInSink<bytes::Bytes> = client
.copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT CSV, HEADER)")
.await
.unwrap();
let mut sink = Box::pin(sink);
let data = [
["id", "name", "bio"].join(","),
["123", "Foo", "bio"].join(","),
["456", "Bar", ""].join(","),
]
.join("\n");
sink.send(data.into()).await.unwrap();
sink.close().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
```Binary版
#[test_context(PgTokioTestContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut PgTokioTestContext) {
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
let client = &ctx.client;
migrate_db(client).await;
let sink = client
.copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TEXT, Type::TEXT]);
tokio::pin!(writer);
writer
.as_mut()
.write(&[&123_i64, &"Foo", &"bio"])
.await
.unwrap();
writer
.as_mut()
.write(&[&456_i64, &"Bar", &None::<&str>])
.await
.unwrap();
writer.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
```postgres-typesによるとBIGSERIALはRustのi64と対応しているため、型指定をしないとi32が使われてエラーになる
エラーになるパターン
#[test_context(PgTokioTestContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut PgTokioTestContext) {
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
let client = &ctx.client;
migrate_db(client).await;
let sink = client
.copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TEXT, Type::TEXT]);
tokio::pin!(writer);
writer
.as_mut()
.write(&[&123, &"Foo", &"bio"]) // i64のアノテーションが必須
.await
.unwrap();
writer
.as_mut()
.write(&[&456, &"Bar", &None::<&str>]) // i64のアノテーションが必須
.await
.unwrap();
writer.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
running 1 test
test tests::test_copy_from ... FAILED
failures:
---- tests::test_copy_from stdout ----
thread 'tests::test_copy_from' panicked at examples/authors/src/lib.rs:37:14:
called `Result::unwrap()` on an `Err` value: Error { kind: ToSql(0), cause: Some(WrongType { postgres: Int8, rust: "i32" }) }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
tests::test_copy_from
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.13s
Types::INT4で送ろうとしても当然エラーになる
#[test_context(PgTokioTestContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut PgTokioTestContext) {
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
let client = &ctx.client;
migrate_db(client).await;
let sink = client
.copy_in("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::INT4, Type::TEXT, Type::TEXT]);
tokio::pin!(writer);
writer
.as_mut()
.write(&[&123, &"Foo", &"bio"])
.await
.unwrap();
writer
.as_mut()
.write(&[&456, &"Bar", &None::<&str>])
.await
.unwrap();
writer.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(client)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
running 1 test
test tests::test_copy_from ... FAILED
failures:
---- tests::test_copy_from stdout ----
thread 'tests::test_copy_from' panicked at examples/authors/src/lib.rs:44:31:
called `Result::unwrap()` on an `Err` value: Error { kind: Db, cause: Some(DbError { severity: "ERROR", parsed_severity: Some(Error), code: SqlState(E08P01), message: "insufficient data left in message", detail: None, hint: None, position: None, where_: Some("COPY authors, line 1, column id"), schema: None, table: None, column: None, datatype: None, constraint: None, file: Some("pqformat.c"), line: Some(531), routine: Some("pq_copymsgbytes") }) }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
tests::test_copy_from
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.13s
バイナリ形式で送られるデータに関して
binary形式オプションにより、すべてのデータはテキストではなくバイナリ形式で書き込み/読み取りされるようになります。 テキストやCSV形式よりも多少高速になりますが、バイナリ形式のファイルはマシンアーキテクチャやPostgreSQLのバージョンをまたがる移植性が落ちます。 またバイナリ形式はデータ型に非常に依存します。 たとえば、smallint列からバイナリデータを出力し、それをinteger列として読み込むことはできません。同じことをテキスト形式で実行すれば動作するのですが。
binaryファイルの形式は、ファイルヘッダ、行データを含む0以上のタプル、ファイルトレーラから構成されます。 ヘッダとデータはネットワークバイトオーダです。
確かにドキュメントにはそんな感じのことが書いてある
sqlxにはPgCopyIn構造体経由で書き込みができる
PgCopyInはPgPoolCopyInExt経由で取得できる。Pool<Postgres>にだけ実装されてるっぽい?
tokio-postgresはClientとTransactionの両方に実装されていた
とりあえずText形式の書き込みを試したらできた
#[test_context(SqlxPgContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut SqlxPgContext) {
let pool = &mut ctx.pool;
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&*pool)
.await
.unwrap();
let mut copy_in = pool
.copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT TEXT)")
.await
.unwrap();
let data = [
["123", "Foo", "bio"].join("\t"),
["456", "Bar", "\\N"].join("\t"),
]
.join("\n");
copy_in.send(data.as_bytes()).await.unwrap();
copy_in.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
CSV形式もそのまま入る
#[test_context(SqlxPgContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut SqlxPgContext) {
let pool = &mut ctx.pool;
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&*pool)
.await
.unwrap();
let mut copy_in = pool
.copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT CSV)")
.await
.unwrap();
let data = [
["123", "Foo", "bio"].join(","),
["456", "Bar", ""].join(","),
]
.join("\n");
copy_in.send(data.as_bytes()).await.unwrap();
copy_in.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
```sqlxでバイナリ形式のデータを送る方法が分からない。Encodeがそれ用だと思ったけど違う?
とりあえず、ヘッダとトレイラだけ実装
#[test_context(SqlxPgContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut SqlxPgContext) {
use sqlx::Encode;
let pool = &mut ctx.pool;
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&*pool)
.await
.unwrap();
let mut copy_in = pool
.copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let mut buf = Vec::new();
const COPY_SIGNATURE: &[u8] = &[
b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
0xFF, // \377 (8進数) = 0xFF (16進数)
b'\r', b'\n', // "\r\n"
0x00, // \0
];
const COPY_TRAILER: &[u8] = &((-1_i16).to_be_bytes());
assert_eq!(COPY_SIGNATURE.len(), 11);
buf.extend_from_slice(COPY_SIGNATURE); // 署名
buf.extend_from_slice(&(0_i32.to_be_bytes())); // フラグフィールド
buf.extend_from_slice(&(0_i32.to_be_bytes())); // ヘッダ拡張領域長
let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];
buf.extend_from_slice(COPY_TRAILER);
copy_in.send(buf.as_ref()).await.unwrap();
copy_in.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
手動で組み立てして送れた
#[test_context(SqlxPgContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut SqlxPgContext) {
let pool = &mut ctx.pool;
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&*pool)
.await
.unwrap();
let mut copy_in = pool
.copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let mut buf = Vec::new();
const COPY_SIGNATURE: &[u8] = &[
b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
0xFF, // \377 (8進数) = 0xFF (16進数)
b'\r', b'\n', // "\r\n"
0x00, // \0
];
const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();
assert_eq!(COPY_SIGNATURE.len(), 11);
buf.extend_from_slice(COPY_SIGNATURE); // 署名
buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長
let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];
for (id, name, bio) in data {
buf.extend_from_slice(&3_i16.to_be_bytes()); // フィールド数
buf.extend_from_slice(&8_i32.to_be_bytes()); // データ長
buf.extend_from_slice(&id.to_be_bytes()); // データ
let name_bytes = name.as_bytes();
buf.extend_from_slice(&(name_bytes.len() as i32).to_be_bytes()); // データ長
buf.extend_from_slice(name_bytes); // データ
match bio {
Some(bio) => {
let bio_bytes = bio.as_bytes();
buf.extend_from_slice(&(bio_bytes.len() as i32).to_be_bytes());
buf.extend_from_slice(bio_bytes);
}
None => {
buf.extend_from_slice(&((-1_i32).to_be_bytes()));
}
}
}
buf.extend_from_slice(COPY_TRAILER);
copy_in.send(buf.as_ref()).await.unwrap();
copy_in.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
テストケースにEncodeを使った例がないかと思ったが、ないっぽい?
PgArgumentsBufferを使えばできた
#[test_context(SqlxPgContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut SqlxPgContext) {
let pool = &mut ctx.pool;
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&*pool)
.await
.unwrap();
let mut copy_in = pool
.copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let mut buf = Vec::new();
const COPY_SIGNATURE: &[u8] = &[
b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
0xFF, // \377 (8進数) = 0xFF (16進数)
b'\r', b'\n', // "\r\n"
0x00, // \0
];
const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();
assert_eq!(COPY_SIGNATURE.len(), 11);
buf.extend_from_slice(COPY_SIGNATURE); // 署名
buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長
let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];
let mut data_buf = PgArgumentBuffer::default();
for (id, name, bio) in data {
buf.extend(3_i16.to_be_bytes());
// id
let is_null =
sqlx::Encode::<sqlx::Postgres>::encode_by_ref(&id, &mut data_buf).unwrap();
match is_null {
sqlx::encode::IsNull::Yes => {
buf.extend((-1_i32).to_be_bytes());
}
sqlx::encode::IsNull::No => {
buf.extend((data_buf.len() as i32).to_be_bytes());
buf.extend_from_slice(data_buf.as_slice());
}
}
data_buf.clear();
// name
let is_null =
sqlx::Encode::<sqlx::Postgres>::encode_by_ref(&name, &mut data_buf).unwrap();
match is_null {
sqlx::encode::IsNull::Yes => {
buf.extend((-1_i32).to_be_bytes());
}
sqlx::encode::IsNull::No => {
buf.extend((data_buf.len() as i32).to_be_bytes());
buf.extend_from_slice(data_buf.as_slice());
}
}
data_buf.clear();
// bio
let is_null =
sqlx::Encode::<sqlx::Postgres>::encode_by_ref(&bio, &mut data_buf).unwrap();
match is_null {
sqlx::encode::IsNull::Yes => {
buf.extend((-1_i32).to_be_bytes());
}
sqlx::encode::IsNull::No => {
buf.extend((data_buf.len() as i32).to_be_bytes());
buf.extend_from_slice(data_buf.as_slice());
}
}
data_buf.clear();
}
buf.extend_from_slice(COPY_TRAILER);
copy_in.send(buf.as_ref()).await.unwrap();
copy_in.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
適当にラップしてみる。
struct CopyDataBuffer {
inner: PgArgumentBuffer,
data_buf: Vec<u8>,
}
type BoxError = Box<dyn std::error::Error + 'static + Send + Sync>;
impl CopyDataBuffer {
fn new() -> Self {
CopyDataBuffer {
inner: Default::default(),
data_buf: Default::default(),
}
}
fn add<'q, T>(&mut self, value: T) -> Result<(), BoxError>
where
T: Encode<'q, Postgres> + Type<Postgres>,
{
let is_null = value.encode_by_ref(&mut self.inner)?;
match is_null {
sqlx::encode::IsNull::Yes => {
self.data_buf.extend((-1_i32).to_be_bytes());
}
sqlx::encode::IsNull::No => {
self.data_buf
.extend((self.inner.len() as i32).to_be_bytes());
self.data_buf.extend_from_slice(self.inner.as_slice());
}
}
self.inner.clear();
Ok(())
}
fn clear(&mut self) {
self.inner.clear();
self.data_buf.clear();
}
fn data_slice(&self) -> &[u8] {
&self.data_buf
}
}
#[test_context(SqlxPgContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut SqlxPgContext) {
let pool = &mut ctx.pool;
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&*pool)
.await
.unwrap();
let mut copy_in = pool
.copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let mut buf = Vec::new();
const COPY_SIGNATURE: &[u8] = &[
b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
0xFF, // \377 (8進数) = 0xFF (16進数)
b'\r', b'\n', // "\r\n"
0x00, // \0
];
const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();
assert_eq!(COPY_SIGNATURE.len(), 11);
buf.extend_from_slice(COPY_SIGNATURE); // 署名
buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長
let data = [(123_i64, "Foo", Some("bio")), (456_i64, "Bar", None)];
let mut data_buf = CopyDataBuffer::new();
for (id, name, bio) in data {
data_buf.clear();
buf.extend(3_i16.to_be_bytes());
data_buf.add(id).unwrap();
data_buf.add(name).unwrap();
data_buf.add(bio).unwrap();
buf.extend_from_slice(data_buf.data_slice());
}
buf.extend_from_slice(COPY_TRAILER);
copy_in.send(buf.as_ref()).await.unwrap();
copy_in.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
}
```実装を見て、buffer以外の要素もどんどん大きくなるかと思ったが、Encode::encode_by_refはextendを呼び出してるので、そういうわけではなさそう
tokio-postgresのBinaryCopyWriter風
struct CopyDataSink {
encode_buf: PgArgumentBuffer,
data_buf: Vec<u8>,
copy_in: PgCopyIn<PoolConnection<Postgres>>,
num_column: i16,
}
type BoxError = Box<dyn std::error::Error + 'static + Send + Sync>;
impl CopyDataSink {
fn new(copy_in: PgCopyIn<PoolConnection<Postgres>>, num_column: i16) -> Self {
let mut data_buf = Vec::with_capacity(1024);
const COPY_SIGNATURE: &[u8] = &[
b'P', b'G', b'C', b'O', b'P', b'Y', b'\n', // "PGCOPY\n"
0xFF, // \377 (8進数) = 0xFF (16進数)
b'\r', b'\n', // "\r\n"
0x00, // \0
];
assert_eq!(COPY_SIGNATURE.len(), 11);
data_buf.extend_from_slice(COPY_SIGNATURE); // 署名
data_buf.extend_from_slice(&0_i32.to_be_bytes()); // フラグフィールド
data_buf.extend_from_slice(&0_i32.to_be_bytes()); // ヘッダ拡張領域長
CopyDataSink {
encode_buf: Default::default(),
data_buf,
copy_in,
num_column,
}
}
async fn send(&mut self) -> Result<(), BoxError> {
let _copy_in = self.copy_in.send(self.data_buf.as_slice()).await?;
self.data_buf.clear();
Ok(())
}
async fn finish(mut self) -> Result<u64, BoxError> {
const COPY_TRAILER: &[u8] = &(-1_i16).to_be_bytes();
self.data_buf.extend(COPY_TRAILER);
self.send().await?;
self.copy_in.finish().await.map_err(|e| e.into())
}
fn insert_row(&mut self) {
self.data_buf.extend(self.num_column.to_be_bytes());
}
async fn add<'q, T>(&mut self, value: T) -> Result<(), BoxError>
where
T: Encode<'q, Postgres> + Type<Postgres>,
{
let is_null = value.encode_by_ref(&mut self.encode_buf)?;
match is_null {
sqlx::encode::IsNull::Yes => {
self.data_buf.extend((-1_i32).to_be_bytes());
}
sqlx::encode::IsNull::No => {
self.data_buf
.extend((self.encode_buf.len() as i32).to_be_bytes());
self.data_buf.extend_from_slice(self.encode_buf.as_slice());
}
}
self.encode_buf.clear();
if self.data_buf.len() > 4096 {
self.send().await?;
}
Ok(())
}
}
#[test_context(SqlxPgContext)]
#[tokio::test]
async fn test_copy_from(ctx: &mut SqlxPgContext) {
let pool = &mut ctx.pool;
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&*pool)
.await
.unwrap();
let copy_in = pool
.copy_in_raw("COPY authors (id,name,bio) FROM STDIN (FORMAT BINARY)")
.await
.unwrap();
let data = [
(123_i64, "Foo", Some("bio")),
(456_i64, "Bar", None),
(789_i64, "Hoge", Some("Fuga")),
];
let mut sink = CopyDataSink::new(copy_in, 3);
for (id, name, bio) in data {
sink.insert_row();
sink.add(id).await.unwrap();
sink.add(name).await.unwrap();
sink.add(bio).await.unwrap();
}
sink.finish().await.unwrap();
let author0 = queries::GetAuthor::builder()
.id(123)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author0.id, 123);
assert_eq!(author0.name, "Foo");
assert_eq!(author0.bio, Some("bio".into()));
let author1 = queries::GetAuthor::builder()
.id(456)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author1.id, 456);
assert_eq!(author1.name, "Bar");
assert_eq!(author1.bio, None);
let author2 = queries::GetAuthor::builder()
.id(789)
.build()
.query_one(&*pool)
.await
.unwrap();
assert_eq!(author2.id, 789);
assert_eq!(author2.name, "Hoge");
assert_eq!(author2.bio, Some("Fuga".into()));
}
```tokio-postgresとsqlxでの動作確認ができたので、これで終わり