🌷

PostgreSQLクライアント自作した.ts

2024/09/21に公開

役に立ちません

このチュートリアルをやってもあなたの生涯収入は1円も上がりません。
鼻毛でも抜いてた方が鼻毛一本分ぐらいは儲かるんじゃないでしょうか。

AI に生成させたコードをいじりながら雰囲気を味わってみて、ついでにチュートリアル形式の記事にしてみました。
人に読んでもらうためというより自分の日記感覚で書いている。

前回と違いすかしっぺのような内容なのであまり期待しないでください。まあ前回はデカすぎて引火して大変なことになったが。

twitterやってます。
https://x.com/uncode_jp

準備

nodejs18 以上入れとけばいいんじゃないですか。(適当)
docker で postgres を動かす。
tsx は typescript のコードを直接実行するやつ。ts-node より使いやすいのでよく使っているがなんでもいいです。

package.json
{
  "name": "unko",
  "version": "1.0.0",
  "main": "index.js",
  "type": "module",
  "scripts": {
    "start": "tsx src/main.ts"
  },
  "dependencies": {
    "@types/node": "^22.5.5",
    "readline": "^1.3.0",
    "tsx": "^4.19.1",
    "typescript": "^5.6.2"
  }
}

面倒なので、POSTGRES_HOST_AUTH_METHOD=trust で認証をスキップする。
別で使っている postgresql と競合するのが面倒でポートを変えているので注意。

docker-compose.yml
services:
  postgres:
    image: postgres:16
    ports:
      - 54321:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=postgres
      - POSTGRES_HOST_AUTH_METHOD=trust

    volumes:
      - postgres-data:/var/lib/postgresql/data

volumes:
  postgres-data:
tsconfig.json
{
  "compilerOptions": {
    "target": "ESNext",
    "moduleResolution": "Bundler",
    "module": "ESNext",
    "esModuleInterop": true,
    "forceConsistentCasingInFileNames": true,
    "strict": true,
    "types": [
      "node"
    ],
    "skipLibCheck": true
  }
}
main.ts

const main = async () => {
  console.log("Hello World");
};

main();

ここまで書いたら動くことを確認します。

npm i
docker compose up -d
npm run start

動かなかったらおしえて

実装

する

疎通

繋げる。
とりあえず一度繋がれば愛が生まれるらしいです。単純ですね。
Socket を使う。

main.ts
import net from "node:net";

const main = async () => {
  const socket = new net.Socket();
  socket.connect(54321, "localhost", () => {
    console.log("❤️");
  });
};

main();

愛が生まれたら成功 ❤️
生まれたからには責任を持ってゴールインまで頑張ろう。

体裁を整える

一応読みやすく実装していくためにクラスに書いていく。
宗教上の理由でクラスが使えない人は関数でもよい。
メッセージのフォーマットはここにある。まあ別に今真剣に読む必要ないが、メッセージタイプ(1byte)、データ長(4byte)、コンテンツ(nbyte)という形式になっているようだ。
各タイプに応じた処理を実装していけばよい。
https://www.postgresql.org/docs/current/protocol-message-formats.htmlhttps://www.postgresql.org/docs/current/protocol-message-formats.html

JavaScript に慣れていない人のため一応説明すると、コールバックを promise に変換するために new Promise(...)となにやらごちゃごちゃ書いている。

流れとしては、最初にスタートアップメッセージを投げて、タイプ R のメッセージを待ち、認証タイプによってその後の処理をする。
今回は面倒なので認証無しでいく。

client.ts
import net from "node:net";

export class PgClient {
  private socket: net.Socket;
  private host: string;
  private port: number;
  private user: string;
  private database: string;

  constructor({
    host,
    port,
    user,
    database,
  }: {
    host: string;
    port: number;
    user: string;
    database: string;
  }) {
    this.socket = new net.Socket();
    this.host = host;
    this.port = port;
    this.user = user;
    this.database = database;
  }

  async connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.socket.connect(this.port, this.host, () => {
        const startupMessage = buildStartupMessage(this.user, this.database);
        this.socket.write(startupMessage);
      });

      const handleAuthMessage = (data: Buffer) => {
        const [message, rest] = parseMessage(data);
        console.log("メッセージタイプ:", message.type);
        if (message.type !== "R") {
          reject(
            new Error(`サポートされていないメッセージタイプ: ${message.type}`),
          );
          return;
        }

        const authType = message.content.readInt32BE(0);
        console.log("認証タイプ:", authType);

        if (authType !== 0) {
          this.socket.end();
          reject(new Error(`サポートされていない認証タイプ: ${authType}`));
          return;
        }

        console.log("認証成功");
        resolve();
      };

      this.socket.once("data", handleAuthMessage);
    });
  }
}

type Message = {
  type: string;
  length: number;
  content: Buffer;
};

function parseMessage(data: Buffer): [Message, Buffer] {
  const type = data.slice(0, 1).toString("utf8");
  const length = data.readInt32BE(1); // メッセージの長さ(自身を含む)
  const contentLength = length - 4; // メッセージ本体の長さ
  const content = data.slice(5, 5 + contentLength); // メッセージ本体
  return [{ type, length, content }, data.slice(5 + contentLength)];
}


// AIが勝手に生成したのでしらない。
function buildStartupMessage(user: string, database: string): Buffer {
  const params = ["user", user, "database", database, "", ""];
  const paramsBuffer = Buffer.from(params.join("\0"), "utf8");
  const length = 4 + 4 + paramsBuffer.length;
  const buffer = Buffer.alloc(length);

  buffer.writeInt32BE(length, 0);
  buffer.writeInt32BE(196608, 4);
  paramsBuffer.copy(buffer, 8);

  return buffer;
}


main.ts
import { PgClient } from "./client";

const main = async () => {
  const pgClient = new PgClient({
    host: "localhost",
    port: 54321,
    user: "postgres",
    database: "postgres",
  });
  await pgClient.connect();
};

main();

npm run start で認証成功と出れば勝ち

クエリを投げる

ここからが本番。
新たに query メソッドを作る。
Query の場合もメッセージのときとルールは一緒
メッセージをハンドリングするが、一旦3つのタイプのみ実装する。

client.ts

export class PgClient {
  /** 省略 **/

  public async query(query: string): Promise<unknown> {
    return new Promise<unknown>((resolve, reject) => {
      const queryBuffer = Buffer.from(`${query}\0`, "utf8");
      const length = 4 + queryBuffer.length;
      const buffer = Buffer.alloc(1 + 4 + queryBuffer.length);

      buffer.write("Q", 0);
      buffer.writeInt32BE(length, 1);
      queryBuffer.copy(buffer, 5);

      this.socket.once("data", (data) => {
        this.handleQueryMessage(data, resolve, reject);
      });
      this.socket.write(buffer);
    });
  }

  private handleQueryMessage(
    data: Buffer,
    resolve: (value: unknown) => void,
    reject: (reason?: Error) => void,
  ) {
    let currentData = data;

    while (currentData.length > 0) {
      const [message, rest] = parseMessage(currentData);
      currentData = rest;
      console.log("メッセージタイプ:", message.type);
      switch (message.type) {
        case "C":
        case "N": {
          const m = message.content.toString("utf8");
          console.log("message:", m);
          break;
        }
        case "Z":
          resolve(undefined);
          break;
      }
    }
  }
}

/** 省略 **/
main.ts
import { PgClient } from "./client";

const CREATE_HOGE_TABLE_QUERY = `
CREATE TABLE IF NOT EXISTS hoge (
  id SERIAL PRIMARY KEY,
  name VARCHAR(255) NOT NULL
);
`;

const main = async () => {
  const pgClient = new PgClient({
    host: "localhost",
    port: 54321,
    user: "postgres",
    database: "postgres",
  });
  await pgClient.connect();

  await pgClient.query(CREATE_HOGE_TABLE_QUERY);
  console.log("完了");
};

main();

ここでは hoge というテーブルを作成している。
実行してみると

メッセージタイプ: R
認証タイプ: 0
認証成功
メッセージタイプ: C
message: CREATE TABLE
メッセージタイプ: Z

と表示された。C はクエリ完了。Z は次のクエリの受付けが可能になったことを表すらしい。
もっかい実行してみると。

メッセージタイプ: R
認証タイプ: 0
認証成功
メッセージタイプ: N
message: SNOTICEVNOTICEC42P07Mrelation "hoge" already exists, skippingFparse_utilcmd.cL207RtransformCreateStmt
メッセージタイプ: C
message: CREATE TABLE
メッセージタイプ: Z

今度は N(Notification)が返ってきた。
hoge がすでにあるのでスキップされたようだ。

## 対話形式にする

今のままではハードコーディングで不便なので、対話形式で実行できるようにする。
対話不能なのは postgresql クライアントでも人間でも面倒なものである。
ここでは readline というライブラリを使用する。

repl.ts
import * as readline from "readline";

export class Repl {
  private rl: readline.Interface;
  private onInput: (input: string) => Promise<void>;

  constructor({
    onInput,
  }: {
    onInput: (input: string) => Promise<void>;
  }) {
    this.rl = readline.createInterface({
      input: process.stdin,
      output: process.stdout,
    });
    this.onInput = onInput;
  }

  public async start() {
    let exit = false;
    while (!exit) {
      const input = await new Promise<string>((resolve) => {
        this.rl.question("コマンドを入力してください: ", (input) => {
          resolve(input);
        });
      });
      await this.onInput(input);
      if (input === "exit" || input === "quit") {
        exit = true;
        console.log("Bye!");
      }
    }

    this.rl.close();
  }
}

main.ts
import { PgClient } from "./client";
import { Repl } from "./repl";

const CREATE_HOGE_TABLE_QUERY = `
CREATE TABLE IF NOT EXISTS hoge (
  id SERIAL PRIMARY KEY,
  name VARCHAR(255) NOT NULL
);
`;

const main = async () => {
  const pgClient = new PgClient({
    host: "localhost",
    port: 54321,
    user: "postgres",
    database: "postgres",
  });
  await pgClient.connect();

  await pgClient.query(CREATE_HOGE_TABLE_QUERY);

  const repl = new Repl({
    onInput: async (input) => {
      await pgClient.query(input);
    },
  });
  await repl.start();
};

main();


これを実行してコマンドを入力してくださいと表示されれば勝ち。

データの取得

試しに seelct * from hoge;を実行すると、メッセージタイプ T が返ってくるはず。
(ただ私の場合 Z が返ってこない場合があったので、その場合は何度か実行してみてほしい。)
T はカラム情報、それと後で登場する D は行の実データなので、これらに対する処理を書く。

client.ts
export class PgClient {
  private socket: net.Socket;
  private host: string;
  private port: number;
  private user: string;
  private database: string;

  constructor({
    host,
    port,
    user,
    database,
  }: {
    host: string;
    port: number;
    user: string;
    database: string;
  }) {
    this.socket = new net.Socket();
    this.host = host;
    this.port = port;
    this.user = user;
    this.database = database;
  }

  async connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.socket.connect(this.port, this.host, () => {
        const startupMessage = buildStartupMessage(this.user, this.database);
        this.socket.write(startupMessage);
      });

      const handleAuthMessage = (data: Buffer) => {
        const [message, rest] = parseMessage(data);
        console.log("メッセージタイプ:", message.type);
        if (message.type !== "R") {
          reject(
            new Error(`サポートされていないメッセージタイプ: ${message.type}`),
          );
          return;
        }

        const authType = message.content.readInt32BE(0);
        console.log("認証タイプ:", authType);

        if (authType !== 0) {
          this.socket.end();
          reject(new Error(`サポートされていない認証タイプ: ${authType}`));
          return;
        }

        console.log("認証成功");
        resolve();
      };

      this.socket.once("data", handleAuthMessage);
    });
  }

  public async query(query: string): Promise<unknown> {
    return new Promise<unknown>((resolve, reject) => {
      const queryBuffer = Buffer.from(`${query}\0`, "utf8");
      const length = 4 + queryBuffer.length;
      const buffer = Buffer.alloc(1 + 4 + queryBuffer.length);

      buffer.write("Q", 0);
      buffer.writeInt32BE(length, 1);
      queryBuffer.copy(buffer, 5);

      this.socket.once("data", (data) => {
        this.handleQueryMessage(data, resolve, reject);
      });
      this.socket.write(buffer);
    });
  }

  private handleQueryMessage(
    data: Buffer,
    resolve: (value: unknown) => void,
    reject: (reason?: Error) => void,
  ) {
    let currentData = data;
    let columnMetaData: Array<Record<string, number | string>> = [];
    const rows: Array<Record<string, string>> = [];

    while (currentData.length > 0) {
      const [message, rest] = parseMessage(currentData);
      currentData = rest;
      switch (message.type) {
        case "T": {
          columnMetaData = this.handleRowDescription(message.content);
          console.log("columnMetaData:", columnMetaData);
          break;
        }
        case "D": {
          const row = this.handleDataRow(message.content, columnMetaData);
          rows.push(row);
          break;
        }
        case "E":
          reject(new Error(message.content.toString("utf8")));
          break;
        case "C":
        case "N": {
          const m = message.content.toString("utf8");
          console.log("message:", m);
          break;
        }
        case "Z":
          resolve(rows);
          break;
        default:
          throw new Error("なんかよくわからんメッセージ")
      }
    }
  }

  private handleDataRow(
    data: Buffer,
    currentFields: Array<Record<string, number | string>>,
  ): Record<string, string> {
    const numberOfColumns = data.readInt16BE(0);
    let offset = 2;

    const row: Record<string, string> = {};

    for (let i = 0; i < numberOfColumns; i++) {
      const columnLength = data.readInt32BE(offset);
      offset += 4;

      if (columnLength === -1) {
        // @ts-expect-error
        row[currentFields[i].fieldName] = null;
      } else {
        const value = data.toString("utf8", offset, offset + columnLength);
        row[currentFields[i].fieldName] = value;
        offset += columnLength;
      }
    }

    return row;
  }

  private handleRowDescription(data: Buffer) {
    let offset = 0;
    const fieldCount = data.readInt16BE(offset);
    offset += 2;

    const fields: Array<Record<string, number | string>> = [];

    for (let i = 0; i < fieldCount; i++) {
      const nullTerminator = data.indexOf(0, offset);
      const fieldName = data.toString("utf8", offset, nullTerminator);
      offset = nullTerminator + 1;

      const tableOID = data.readInt32BE(offset);
      offset += 4;

      const columnIndex = data.readInt16BE(offset);
      offset += 2;

      const dataTypeOID = data.readInt32BE(offset);
      offset += 4;

      const dataTypeSize = data.readInt16BE(offset);
      offset += 2;

      const typeModifier = data.readInt32BE(offset);
      offset += 4;

      const formatCode = data.readInt16BE(offset);
      offset += 2;

      const fieldInfo = {
        fieldName,
        tableOID,
        columnIndex,
        dataTypeOID,
        dataTypeSize,
        typeModifier,
        formatCode: formatCode === 0 ? "text" : "binary",
      };

      fields.push(fieldInfo);
    }

    return fields;
  }
}

再び select * from hoge;を実行。データがないので T のみが返ってくるが、カラムの情報が含まれている

[
  {
    "fieldName": "id",
    "tableOID": 16385,
    "columnIndex": 1,
    "dataTypeOID": 23,
    "dataTypeSize": 4,
    "typeModifier": -1,
    "formatCode": "text"
  },
  {
    "fieldName": "name",
    "tableOID": 16385,
    "columnIndex": 2,
    "dataTypeOID": 1043,
    "dataTypeSize": -1,
    "typeModifier": 259,
    "formatCode": "text"
  }
]

雑に ChatGPT に投げたら以下の説明が返ってきた。

各フィールドの説明

  • fieldName:
    カラム名を表します。
    例: "id", "name"

  • tableOID:
    カラムが属するテーブルの**OID(Object Identifier、オブジェクト識別子)**を表します。PostgreSQL では各テーブルに一意の OID が割り当てられています。
    例: 16385(テーブル ID)

  • columnIndex:
    テーブル内でのカラムの順番を表します。カラムがテーブルの何番目に定義されているかを示しています。
    例: 1(id は 1 番目のカラム), 2(name は 2 番目のカラム)

  • dataTypeOID:
    カラムのデータ型の OID を表します。各データ型にも固有の OID が割り当てられており、PostgreSQL のシステムカタログからデータ型の情報を取得できます。
    例: 23(int4 型、整数), 1043(varchar 型、可変長文字列)

  • dataTypeSize:
    カラムのデータ型のサイズを表します。通常、バイト数で表されます。サイズが -1 の場合、可変長データ型であることを意味します。
    例: 4(int4 型のサイズは 4 バイト), -1(varchar 型のサイズは可変長)

  • typeModifier:
    データ型に対する修飾子を表します。例えば、varchar(255) のように長さが指定されている場合、259 などの値を持つことがあります。この値は、システム内部でデータ型に対して適用される修飾情報を保持しています。
    例: -1(修飾子なし), 259(varchar 型の制約)

  • formatCode:
    データがテキスト形式またはバイナリ形式で送られているかを示します。0 はテキスト形式、1 はバイナリ形式を示します。
    例: "text"(テキスト形式)

そうですか。(興味無し)

最後に適当なデータを入れる。

INSERT INTO hoge (name) VALUES ('Alice');
INSERT INTO hoge (name) VALUES ('Bob');
INSERT INTO hoge (name) VALUES ('Charlie');

SELECT * FROM hoge;

3番目お前誰?

[
  { "id": "1", "name": "Alice" },
  { "id": "2", "name": "Bob" },
  { "id": "3", "name": "Charlie" }
]

ちゃんと返ってきたのでこれで眠れる。
おやすみなさい。

Discussion