Open7

Apache Iceberg

kwikwi

pyspark で Iceberg にアクセスする際は、pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 を使うようにドキュメントに案内されている。

ところが手元が proxy 下の環境で、処理が全く進まない。ivyという文字列がちらちら見えて、maven repositoryのように見えるけれども、proxy の設定方法が全く分からなかった。http_proxyhttps_proxy は効かない。

結局グローバル設定(環境変数 ANT_OPTS やHOME既定のファイル名 ~/.ivy2/ivysettings.xml )で設定しようとするのは、筋が悪いということのようで、pyspark のコマンド引数で設定するのが正攻法のようだ。Java ではどこでコードが実行されているか意識しないでよいようにされていることが多く、local か remote か、誰の権限のどの $HOME か、などなどがバラバラにまざる状況で設定値が混乱を招くと良くないのだろう。pyspark は job を裏側で別プロセスに投げる。結局 pyspark の引数を使って、Java の標準ライブラリの設定値が該当箇所に渡るようにすると解決できた。

pyspark \
  --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \
  --driver-java-options="-Dhttp.proxyHost=proxy.example.com -Dhttp.proxyPort=8080 -Dhttps.proxyHost=proxy.example.com -Dhttps.proxyPort=8080 -Dhttp.nonProxyHosts=minio|polaris.minio -Dhttps.nonProxyHosts=minio|polaris.minio"

設定するオプションspark.driver.extraJavaOptionsになる。Apache Polarisのgetting-startedでのJupyter notebookの中で指定するには、同様にSparkSessionのオプションを追加する。

SparkSession.builder
  .config(
    "spark.driver.extraJavaOptions",
    ("-Dhttp.proxyHost=proxy.example.com -Dhttp.proxyPort=8080"
    + " -Dhttps.proxyHost=proxy.example.com -Dhttps.proxyPort=8080"
    + " -Dhttp.nonProxyHosts=minio|polaris.minio"
    + " -Dhttps.nonProxyHosts=minio|polaris.minio")
  )

ちなみに、AWS の設定値に関しても spark job として裏側に入った後で利用されるので、ファイルに保存する方法はおすすめできない。これらも Session 変数として入れること。

kwikwi

OpenAPI Generator, quarks/gradle で、摩訶不思議 jakarta DI(CDI) で魔法のほうに docker image がビルドされる。サーバ定義的には quarks で http と management の位置まで用意されている。OpenAPI Generator でテンプレートを使ってコードを wiring してソースコードを jaxrs-resteasy ジェネレータで生成している。コードを読むときにどこがエントリポイントか全く予測不能で分からないので注意する。

OAuth2 で bearer token を使うことになるが、Quarkus の JWT認証に inject している。
https://quarkus.io/guides/security-customization#httpauthenticationmechanism-customization

Two APIs

polaris は 2 つの API を持っている。quarkus で実装されていて、application.propertiesquarkus.http.portquarkus.management.port が開かれている。

  • http=8181
    • catalog API (Iceberg REST spec + polaris native spec)
    • spark からはもっぱら REST spec を使う
  • management=8182
    • management service API
    • polaris cli から操作できる
    • create catalog は spec/polaris-management-service.ymlcreateCatalog を使う。

Apache Polaris Quick Start

root credential がログに書かれるとあるが、実際は docker-compose.yml の環境変数 POLARIS_BOOTSTRAP_CREDENTIALS にある。クライアント側で使うユーザも、この root から生成している。

warehouse は /tmp/polaris になっていて、次のようになっている。

  • polaris 側には metadata.json が書き込まれている。
  • jupyter 側には avro と parquet が書き込まれる。

polaris.authentication.token-broker.max-token-generation=PT1HPT1H の意味は、Period Time 1 Hour ということだそうだ。ISO8601 記法の時間(時刻ではない)。

その他特記事項

Confluent cloud の Tableflow からは AWS Glue と Apache Polaris にカタログ同期できる。

kwikwi

REST Catalog Spec Authz

Iceberg Table Spec と並行して Iceberg REST Catalog Spec がある。

Iceberg REST Catalog では、Catalog に対する操作が出てくるが、現時点では認証が決まっていない。実装側では思い思いに実装されている。Server, Client 両方を合わせないといけないので、調べるのが結構大変。
https://github.com/apache/iceberg/issues/10537

Gravitino

  • namespace, table, view に対して操作は内部で IcebergCatalogWrapper で抽象化されている
  • catalog に対しては simple(=default), OAuth2, kerberos の3つの認証方式をサポートしている
  • simple と呼んでいるものの内容は HTTP Basic 認証で、パスワード "dummy" 固定の credential となっている。
    • OAuth2 を使う際は外部認証(token発行)サーバを立ち上げて、対応する鍵を gravitino 側にセットしておく。
    • 追加の認可制御は存在せず、シングルテナントである。
  • gravitino での s3-access-key-id などの設定値は、Iceberg 仕様の s3.access-key-id にマップされる。

Polaris

  • REST API の操作については RBAC が設定されている。QuickStart の解説がこれに費やされている。
  • Principal や Role は内部に抱え込んでいて、外部連携は存在しない。
    • deprecated とされてしまったが、現時点では tokens endpoint を使った OAuth2 client_credentials 認証をして Bearer token を得る。
  • S3 接続部分は AWS SDK java client が素直に使われている。

Iceberg kafka-connect

  • Apache Iceberg repo に同梱されている kafka-connect
  • 認証方式としては次のものがある
    • oauth2
      • credential : client credential から token を取得する
      • token : Authorization bearer token としてヘッダにセットされる。もし credential もあれば それは token refresh 時に使用される。
    • none : 何もしない
    • sigv4 : AWS V4 Signer
    • basic : rest.auth.basic.usernamerest.auth.basic.password を使う(BasicAuthManager.java)
kwikwi

Polaris backed by Postgres

Polaris ではデフォルトで postgresql がバンドルされるようになった(以前はオンメモリの軽量版イメージがビルドされていた)。gradlewで PostgreSQL が有効になった image をビルドする。
https://github.com/apache/polaris/pull/1411

postgres backend を使う時には、最初に bootstrap が必要で、apache/polaris-admin-tool:postgres-latest イメージを使う。https://github.com/apache/polaris/blob/0983911cdc886604ac99c558b602644f01c15459/getting-started/jdbc/docker-compose-bootstrap-db.yml

Polaris が起動して、namespace や table が作成されたら直接 SQL で調べることができる。polaris_schema に格納されているので、search_path を設定しないと見えないことに注意。

polaris=# \dn
    List of schemas
      Name      | Owner
----------------+-------
 polaris_schema | post2
 public         | post2
(2 rows)

polaris=# set search_path to polaris_schema;
SET
polaris=# \dt
                       List of relations
     Schema     |             Name              | Type  | Owner
----------------+-------------------------------+-------+-------
 polaris_schema | entities                      | table | post2
 polaris_schema | grant_records                 | table | post2
 polaris_schema | policy_mapping_record         | table | post2
 polaris_schema | principal_authentication_data | table | post2
(4 rows)
kwikwi

Polaris では Principal (User) を生成してアクセスする。client_id, client_secret は生成時の応答にしか入っていないので、メモしておく(DB には sha256 hash が入っている)。

この client_id, client_secret を使って OAuth2 client_credentials 認証すると access_token が手に入る。この access_token を Bearer token として利用する。

(下の実行例では httpie コマンドを使っている)

$ T=$(http -f localhost:8181/api/catalog/v1/oauth/tokens \
  client_id=${CLIENT_ID} \
  client_secret=${CLIENT_SECRET} \
  grant_type=client_credentials \
  scope=PRINCIPAL_ROLE:ALL | jq -r .access_token)

$ http localhost:8181/api/catalog/v1/polaris_demo/namespaces Authorization:"Bea
rer $T"
HTTP/1.1 200 OK
Content-Type: application/json;charset=UTF-8
content-encoding: gzip
content-length: 82

{
    "namespaces": [
        [
            "COLLADO_TEST"
        ]
    ],
    "next-page-token": null
}

# child namespace は parent で指定して取得する。
$ http localhost:8181/api/catalog/v1/polaris_demo/namespaces parent==COLLADO_TEST Authorization:"Bearer $T"
HTTP/1.1 200 OK
Content-Type: application/json;charset=UTF-8
content-encoding: gzip
content-length: 91

{
    "namespaces": [
        [
            "COLLADO_TEST",
            "PUBLIC"
        ]
    ],
    "next-page-token": null
}

# 区切り文字は %1F になることに注意
$ http localhost:8181/api/catalog/v1/polaris_demo/namespaces/COLLADO_TEST%1FPUBLIC Authorization:"Bearer $T"
HTTP/1.1 200 OK
Content-Type: application/json;charset=UTF-8
content-encoding: gzip
content-length: 126

{
    "namespace": [
        "COLLADO_TEST",
        "PUBLIC"
    ],
    "properties": {
        "location": "s3://polaris/COLLADO_TEST/PUBLIC",
        "owner": "jovyan"
    }
}

$ http localhost:8181/api/catalog/v1/polaris_demo/namespaces/COLLADO_TEST%1FPUBLIC/tables Authorization:"Bearer $T"
HTTP/1.1 200 OK
Content-Type: application/json;charset=UTF-8
content-encoding: gzip
content-length: 118

{
    "identifiers": [
        {
            "name": "TEST_TABLE",
            "namespace": [
                "COLLADO_TEST",
                "PUBLIC"
            ]
        }
    ],
    "next-page-token": null
}

# credential vendoring で warehouse(S3) の認証情報が手に入る
$ http localhost:8181/api/catalog/v1/polaris_demo/namespaces/COLLADO_TEST%1FPUBLIC/tables/TEST_TABLE/credentials Authorization:"Bearer $T"
HTTP/1.1 200 OK
Content-Type: application/json;charset=UTF-8
content-encoding: gzip
content-length: 1070

{
    "storage-credentials": [
        {
            "config": {
                "expiration-time": "1747720220000",
                "s3.access-key-id": "....................",
                "s3.secret-access-key": "........................................",
                "s3.session-token": "ey.....",
                "s3.session-token-expires-at-ms": "1747720220000"
            },
            "prefix": "s3://polaris/COLLADO_TEST/PUBLIC/TEST_TABLE"
        }
    ]
}

/v1/config で返却される defaultoverride は、それぞれ個別の設定値を設定する「前」に適用すべき値と、「後」に適用すべき値という意味になっている。つまり override で指定された値は「必ずその値を使え」という指示になっている。それに対して default は「特に指定がなければこの値を使え」という指示になっている。

kwikwi

Catalog 作成時は、Polaris management API を使う。POST /catalogs で送信する中身は、 Catalog(storage_config_info=StorageInfo()) という入れ子の形になっている。

S3 プロトコル使う場合は storage_config_info では AwsStorageConfigInfo を使うが、実はリファレンスは不完全である。完全なものは次の通り:

  • storageType : "S3" 固定
  • allowedLocations : 古いデータが前の bucket の場所に残っている時に使う。基本的には default-base-location 側の値を使う。
  • roleArn
  • externalId
  • userArn
  • region

Catalog レベルでは既定の属性値以外には、次のプロパティがある。

  • default-base-location : "s3://bucketname/path" を指定する。

SparkSQL

SparkSQL の CREATE TABLE 構文では LOCATION パラメータがあり、bucket の位置をデフォルト位置から変更できる。

kwikwi

iceberg-kaka-connect のプロパティはわかりにくい。

credential プロパティの説明は次のようになっていて、説明がないとよくわからない。

"iceberg.catalog.credential": "<credential>"

"<credential>" の部分は "<oauth2_client_id>:<oauth2_client_secret>" か "<oauth2_client_secret>" かである。後者の場合は client_id を送信しない。

"iceberg.catalog.rest.auth.type" を設定しなければ、oauth2 となる。