Open32

CosmosDB Specialty

しろくましろくま

スループット要件(RU)

https://learn.microsoft.com/ja-jp/training/modules/plan-resource-requirements/3-evaluate-throughput-requirements

Azure Cosmos DB でデータベースまたはコンテナーを作成するときに、要求ユニットを 1 秒あたりの要求ユニット数 (略して RU/s) ずつプロビジョニングすることができます。 400 RU/s 未満をプロビジョニングすることはできません。また、プロビジョニングは 100 単位です。

概念実証アプリケーションを実行し、SDK の request charge プロパティを使い、Azure Cosmos DB に対して行う予定の操作を実行した場合の実際の RU 料金を測定することもできます。

https://learn.microsoft.com/ja-jp/azure/cosmos-db/nosql/estimate-ru-with-capacity-planner

しろくましろくま

CosmosDBのプラン

https://learn.microsoft.com/ja-jp/training/modules/configure-azure-cosmos-db-sql-api/3-compare-serverless-provisioned-throughput

サーバーレス

  • 予測できない、あるいはバースト性トラフィックのあるアプリケーションに適している
    • ユーザ負荷の予測が難しい
    • プロトタイプのアプリ
    • サーバレス(Azure Functions)などのサービスが統合されている
    • 検証用
    • 大量データの送受信を行わない低トラフィックアプリ
  • 制限
    • 複数リージョンの配布はできない
    • コンテナに格納できるデータは50GBまで

プロビジョニング

  • 継続的で予測可能なパフォーマンスを必要とするワークロード
  • 各コンテナに対して1秒ごとの最大要求ユニットを制限する
  • 無制限なリージョンにデータ配布する
  • コンテナに無制限量のデータを格納できる
しろくましろくま

スケーリング

https://learn.microsoft.com/ja-jp/training/modules/configure-azure-cosmos-db-sql-api/5-compare-autoscale-standard-throughput

標準スループット

  • トラフィックが安定しているワークロード
  • 設定
    • 静的な要求ユニット数を設定する
    • 設定した要求ユニット数を超える要求が来た場合は、待機することを促すレスポンスが返される

自動スケーリングスループット

  • 向いているケース
    • 予測できないトラフィック
  • 設定
    • 最大値のみを設定する
しろくましろくま

Data Factory を使ったデータ移動

https://learn.microsoft.com/ja-jp/training/modules/move-data-azure-cosmos-db-sql-api/2-move-data-by-using-azure-data-factory

Data FactoryでCosmosDBのデータを読み取り変換し、別のCosmosDBアカウントにデータ登録できる。

CosmosDBとの接続

以下は接続文字列を使った接続。
マネージドIDやサービスプリンシパルを使った接続も可能。

{
    "name": "<example-name-of-linked-service>",
    "properties": {
        "type": "CosmosDb",
        "typeProperties": {
            "connectionString": "AccountEndpoint=<cosmos-endpoint>;AccountKey=<cosmos-key>;Database=<cosmos-database>"
        }
    }
}

CosmosDBからデータ取得

クエリを記述する。

{
    "source": {
        "type": "CosmosDbSqlApiSource",
        "query": "SELECT id, categoryId, price, quantity, name FROM products WHERE price > 500",
        "preferredRegions": [
            "East US",
            "West US"
        ]        
    }
}

CosmosDBへのデータ書き込み

sinkとしてCosmosDBを構成する.
以下は、常にデータを挿入する場合や、データをアップサートして、一致する一意識別子 (id フィールド) を持つ可能性がある項目を上書きする場合。

"sink": {
    "type": "CosmosDbSqlApiSink",
    "writeBehavior": "upsert"
}
しろくましろくま

Kafkaコネクタを使ったデータ移動

Apache Kafka は、分散された方法でイベントをストリーミングするために使用されるオープンソース プラットフォーム。大規模なハイフォーマンスのデータ統合シナリオに利用することが多い。

設定

  • connect.cosmos.connection.endpoint
    • アカウント エンドポイント URI
  • connect.cosmos.master.key
    • アカウント キー
  • connect.cosmos.databasename
    • データベース リソースの名前
  • connect.cosmos.containers.topicmap
    • CSV 形式を使用した、コンテナーへの Kafka トピックのマッピング
    • 例)productsコンテナをprodlisnterトピックにマップし、customersコンテナーをcustlistnerトピックにマップする場合、CSVマッピング文字列は以下。
prodlistener#products,custlistener#customers

CosmosDBへの書き込み

トピックを作成してCosmosDBにデータを書き込む

  1. トピックを作成(kafka-topicsコマンド)
kafka-topics --create \
    --zookeeper localhost:2181 \
    --topic prodlistener \
    --replication-factor 1 \
    --partitions 1
  1. プロデューサーを開始し、トピックにレコードを書き込む
kafka-console-producer \
    --broker-list localhost:9092 \
    --topic prodlistener
  1. CosmosDBのコンテナにコミットされる

CosmosDBからの読み込み

Azure Cosmos DB 変更フィードのデータが Kafka トピックに発行される

{
  "name": "cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "connect.cosmos.task.poll.interval": "100",
    "connect.cosmos.connection.endpoint": "https://dp420.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "cosmicworks",
    "connect.cosmos.containers.topicmap": "prodlistener#products",
    "connect.cosmos.offset.useLatest": false,
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false"
  }
}
しろくましろくま

Stream Analytics を使用したデータ移動

Azure Stream Analyticsは、複数のソースから高速ストリーミングデータを同時に処理するように設計されたリアルタイムイベント処理エンジン。
サポート対象はNoSQLのみ

設定

CosmosDBのアカウントURLやキー、データベース名とコンテナ名で接続

CosmosDBへの書き込み

  • Azure Stream Analyticsからのクエリ結果は、JsonでCosmosDBNoSQLに書き込まれる。
  • 同じIDを持つ場合、項目がupsertされる。
しろくましろくま

Azure CosmosDB Sparkコネクタを使用したデータ移動

Azure Synapse Analytics と Azure Synapse Link for Azure Cosmos DB を使用して、Azure Cosmos DB for NoSQL のデータを分析できる。
この接続により、データ環境の両端 (Azure Cosmos DB と Azure Synapse Analytics) をデータ パイプラインによって統合できる。

設定

  • CosmosDBのアカウントレベルでSynapse Linkを有効にする
az cosmosdb create --name <name> --resource-group <resource-group> --enable-analytical-storage true
  • コンテナレベルで分析ストレージを有効にする
az cosmosdb sql container create --resource-group <resource-group> --account <account> --database <database> --name <name> --partition-key-path <partition-key-path> --throughput <throughput> --analytical-storage-ttl -1

CosmosDBからSparkDataFrameへの書き込み

方法①
メタデータがキャッシュされている Spark DataFrame から読み込む

productsDataFrame = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "cosmicworks_serv")\
    .option("spark.cosmos.container", "products")\
    .load()

方法②
CosmosDBを直接ポイントするSparkテーブルを作成し、そのSparkテーブルに対してSparkSQLクエリで読み込み

create table products_qry using cosmos.olap options (
    spark.synapse.linkedService 'cosmicworks_serv',
    spark.cosmos.container 'products'
)

SparkDataFrameからCosmosDBへの書き込み

DataFrame内のデータを既存のコンテナに追加

productsDataFrame.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "cosmicworks_serv")\
    .option("spark.cosmos.container", "products")\
    .mode('append')\
    .save()

DataFrameからデータをストリーミングして、ストリーミングデータを既存のコンテナに追加可能。

query = productsDataFrame\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "cosmicworks_serv")\
    .option("spark.cosmos.container", "products")\
    .option("checkpointLocation", "/tmp/runIdentifier/")\
    .outputMode("append")\
    .start()

query.awaitTermination()
しろくましろくま

クライアントシングルトン

CosmosClientには、以下の機能がある。

  • スレッドセーフのインスタンス
  • インスタンスでは接続を効率よく管理してくれる
  • 直接モードで動作しているときはインスタンスによってアドレスがキャッシュされる

そのため、インスタンスを破棄して再作成するたびに、キャッシュと接続管理のメリットが失われてしまうため、
アプリケーションが実行している間は1つのインスタンスのみを使用するのが推奨。

しろくましろくま

CosmosClientの接続オプション

CosmosClientOptionsクラスには、以下の設定ができる

  • アカウントへの接続モード
  • 整合性レベル
  • 優先するアカウントリージョン

上記の設定をしたクライアントを構成する場合は、CosmosClientOptionsクラスのインスタンスを作成し、そのクラスをCosmosClientコンストラクタのパラメータに引き渡す。

CosmosClientOptions options = new ();

CosmosClient client = new (endpoint, key, options);

接続モードの種類

  • ダイレクト
    • デフォルト
    • データノードに直接接続するので、初期化の際とアドレスキャッシュでのみ使用される。
CosmosClientOptions options = new ()
{
    ConnectionMode = ConnectionMode.Direct
};
  • ゲートウェイ
    • すべてのリクエストはCosmosDBゲートウェイ経由でルーティングされる
CosmosClientOptions options = new ()
{
    ConnectionMode = ConnectionMode.Gateway
};

整合性レベルの変更

CosmosDBのアカウントにはデフォルトの整合性レベルが構成されている。個々のクライアントでは、クライアントで行われた読み取りリクエストに対して異なる整合性レベルを構成できる。
以下は最終的な整合性レベルを使用するように構成されたクライアント設定

CosmosClientOptions options = new ()
{
    ConsistencyLevel = ConsistencyLevel.Eventual
};

書き込みリージョンの設定

ApplicationRegionプロパティで、書き込みリージョンを変更できる

CosmosClientOptions options = new ()
{
    ApplicationRegion = "westus"
};
しろくましろくま

CosmosDBエミュレータ

  • Windows, Linux, Dockerイメージとしても実行可能
しろくましろくま

接続エラー

400 (無効な要求)、401 (未承認)、403 (許可されていません)、404 (見つかりません) などの HTTP エラーコードはアプリ側で対応する必要がある。

  • 429: 要求が多すぎる
  • 449: 同時実行エラー
  • 500: 予期しないサービスエラー
  • 503: サービスが利用できない
しろくましろくま

タイムアウトの回避

  • 要求タイムアウトは、クライアント側でのCPUやポートの使用率が高いことで発生することが多い。

非同期実行

Database database = await client.CreateDatabaseIfNotExistsAsync("cosmicworks");

組み込みのIteratorを使う

ToList などの LINQ メソッドは、他のすべての呼び出しをブロックして大量データが取得される可能性がある。
例えば以下。

container.GetItemLinqQueryable<T>()
    .Where(i => i.categoryId == 2)
    .ToList<T>();

SDK には、他の呼び出しをブロックせずにクエリの結果を非同期に取得するToFeedIterator<T> などのメソッドが用意されている。

container.GetItemLinqQueryable<T>()
    .Where(i => i.categoryId == 2)
    .ToFeedIterator<T>();

最大項目数

Azure Cosmos DB for NoSQL のすべてのクエリ結果は、結果の "ページ" として返される。
規定値は100。-1にすると動的ページサイズを設定できる。

QueryRequestOptions options = new ()
{
    MaxItemCount = 500
};

最大コンカレンシー

並列クエリの実行中にクライアント側で実行される同時実行数。物理パーティションの数に設定するのが理想。

  • 1にすると実質並列処理無効
  • -1にするとSDKが設定を決める。
QueryRequestOptions options = new ()
{
    MaxConcurrency = 5
};

最大バッファー項目数

MaxBufferedItemCountプロパティで、並列クエリの実行中にクライアント側でバッファされるItem最大数を設定する。

  • -1に設定するとSDKが管理する。
QueryRequestOptions options = new ()
{
    MaxBufferedItemCount = 5000
};
しろくましろくま

CosmosDB のHTTP Status Code

コード タイトル 説明
400 不正な要求 要求の文法の誤りが原因で処理を行えません。
403 Forbidden コンテンツへの権限がないため閲覧を拒否します。
409 Conflict コンテンツへの権限がないため閲覧を拒否します。
413 RequestEntityTooLarge 要求エンティティが大きすぎるため処理を行えません。
429 TooManyRequests 短期間に、エンドポイントに対してリクエストが多すぎます。
try
{
    await container.CreateItemAsync<Product>(saddle);
}
catch(CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
    // Add logic to handle conflicting ids
}
catch(CosmosException ex) 
{
    // Add general exception handling logic
}
しろくましろくま

SDKを使用したトランザクションバッチ

Microsoft.Azure.Cosmos.Container クラスには、fluent 構文をサポートする TransactionalBatch 型の新しいインスタンスを作成する CreateTransactionalBatch メンバー メソッドがある。
このバッチと fluent CreateItem メソッドを使用すると、同じパーティション キー値内に 2 つの項目を挿入する 2 つのステップから成るトランザクションを作成できる。

バッチを実行するには、ExecuteAsync メソッドを非同期に呼び出す。

PartitionKey partitionKey = new ("accessories-used");
TransactionalBatch batch = container.CreateTransactionalBatch(partitionKey)
    .CreateItem<Product>(saddle)
    .CreateItem<Product>(handlebar);

using TransactionalBatchResponse response = await batch.ExecuteAsync();
しろくましろくま

オプティミスティック同時実行制御

https://learn.microsoft.com/ja-jp/training/modules/perform-cross-document-transactional-operations-azure-cosmos-db-sql-api/5-implement-optimistic-concurrency-control
データを取得してから更新するまでに、クライアントからの別の操作でデータが上書きされる可能性がある。

データ取得後のresponseには、ETag項目がある。アイテム更新でこのETagの値も更新される。if-match ルールを使用して、ETag が更新要求の一部として項目サーバー側の現在の ETag ヘッダーと引き続き一致している場合にUpsertすることで、オプティミスティック同時実行制御を実装できる。

string categoryId = "9603ca6c-9e28-4a02-9194-51cdb7fea816";
PartitionKey partitionKey = new (categoryId);

ItemResponse<Product> response = await container.ReadItemAsync<Product>("01AC0", partitionKey);
Product product = response.Resource;
string eTag = response.ETag;

product.price = 50d;

ItemRequestOptions options = new ItemRequestOptions { IfMatchEtag = eTag };
await container.UpsertItemAsync<Product>(product, partitionKey, requestOptions: options);
しろくましろくま

一括操作

CosmosClientOptions クラスの新しいインスタンスを作成し、そのインスタンスの AllowBulkExecution プロパティを true に設定することにより、一括実行を有効にする。

CosmosClientOptions options = new () 
{ 
    AllowBulkExecution = true 
};

この options インスタンスは、CosmosClient コンストラクター パラメーターに最後のパラメーターとして渡す。

// 一括操作するoptionを引き渡す
CosmosClient client = new (endpoint, key, options);
await container.CreateItemAsync<Product>(product, partitionKey);

//登録するデータのリスト
List<Product> productsToInsert = GetOurProductsFromSomeWhere();

List<Task> concurrentTasks = new List<Task>();

//登録するデータをCosmosDBに登録するアイテムにする。
foreach(Product product in productsToInsert)
{
    concurrentTasks.Add(
        container.CreateItemAsync<Product>(
            product, 
            new PartitionKey(product.partitionKeyValue))
    );
}

Task.WhenAll を呼び出すと、SDK が作動してバッチが作成され、物理パーティション別に操作をグループ化してから、同時に実行するように要求が分散される。
グループ化操作によって、バックエンド要求の数を減らし、バッチを別々の物理パーティションに並列でディスパッチできるようにすることで、効率が大幅に向上する。

Task.WhenAll(concurrentTasks);

注意事項

ベストプラクティス

  • パーティションキーの設定
    • 一括処理のシナリオでは、パーティションをまたぐデータが多いとオーバーヘッドが増加してしまう可能性があるため、パーティションキーを指定すべき。
  • シリアル化と逆シリアル化ではstreamAPIを使う
    • オーバーヘッドになるため、Streamバリアントを使用すべき
  • パーティションキーごとにワーカータスクを構成する
    • アイテムが論理パーティションキーに分割できる場合は、パーティションキーごとにワーカータスクのリストを作成する。そのリスト内の各ワーカー タスクから、その論理パーティション キー内の各操作に対して子タスクを生成できる。 このセットアップによって、事実上、項目ごとの操作の調整を行うタスクの階層が作成される。
しろくましろくま

クエリでの型チェック

IS_DEFINED関数

  • アイテムのプロパティが定義されているかどうかを確認できる。

tagsプロパティが定義されているかどうか

SELECT
    IS_DEFINED(p.tags) AS tags_exist
FROM
    products p

IS_ARRAY関数

  • プロパティが配列かどうか

tagsプロパティが配列かどうか

SELECT
    IS_ARRAY(p.tags) AS tags_is_array
FROM
    products p

IS_NULL関数

  • プロパティがnullか

tagsプロパティがnullかどうか

SELECT
    IS_NULL(p.tags) AS tags_is_null
FROM
    products p

IS_NUMBER関数

  • 異なるデータ ストアで一貫性のない状態で価格情報を保持している場合
    • 文字列データを使用して価格情報を保持する場合と、数値を使用して価格情報を格納する場合がある。
    • IS_NUMBER 関数は、クエリの WHERE 式で使用できる。
SELECT
    p.id,
    p.price, 
    (p.price * 1.25) AS priceWithTax
FROM
    products p
WHERE
    IS_NUMBER(p.price)

IS_STRING関数

  • 文字列かどうか
SELECT
    p.id,
    p.price
FROM
    products p
WHERE
    IS_STRING(p.price)
しろくましろくま

SQLクエリの組み込み関数

CONCAT関数

  • フィールドの連携

例)名前とカテゴリの列を連結

SELECT VALUE
    CONCAT(p.name, ' | ', p.categoryName)
FROM
    products p

LOWER関数

  • 小文字にする関数
SELECT VALUE 
    LOWER(p.sku) 
FROM 
    products p

GetCurrentDateTime関数

  • 現在の日時を取得する関数

例)まだ廃止すべきではない製品を除外する

SELECT 
    *
FROM
    products p
WHERE
    p.retirementDate >= GetCurrentDateTime()
しろくましろくま

外積クエリ

以下のようなJsonに対して

{
    "id": "80D3630F-B661-4FD6-A296-CD03BB7A4A0C",
    "categoryId": "629A8F3C-CFB0-4347-8DCC-505A4789876B",
    "categoryName": "Clothing, Vests",
    "sku": "VE-C304-L",
    "name": "Classic Vest, L",
    "description": "A worn brown classic vest that was a trade-in apparel item",
    "price": 32.4,
    "tags": [
        {
            "id": "2CE9DADE-DCAC-436C-9D69-B7C886A01B77",
            "name": "apparel",
            "class": "group"
        },
        {
            "id": "CA170AAD-A5F6-42FF-B115-146FADD87298",
            "name": "worn",
            "class": "trade-in"
        },
        {
            "id": "CA170AAD-A5F6-42FF-B115-146FADD87298",
            "name": "no-damaged",
            "class": "trade-in"
        }
    ]
}

以下のクエリを実行すると、(JOIN)

SELECT 
    p.id,
    p.name,
    t.name AS tag
FROM 
    products p
JOIN
    t IN p.tags

外積されたフラットなオブジェクトが返却される

[
    {
        "id": "80D3630F-B661-4FD6-A296-CD03BB7A4A0C",
        "name": "Classic Vest, L",
        "tag": "apparel"
    },
    {
        "id": "80D3630F-B661-4FD6-A296-CD03BB7A4A0C",
        "name": "Classic Vest, L",
        "tag": "worn"
    },
    {
        "id": "80D3630F-B661-4FD6-A296-CD03BB7A4A0C",
        "name": "Classic Vest, L",
        "tag": "no-damaged"
    }
]
しろくましろくま

相関サブクエリ

以下のJsonに対し。tags.classがtrade-inのデータのみを取得したい場合

{
    "id": "80D3630F-B661-4FD6-A296-CD03BB7A4A0C",
    "categoryId": "629A8F3C-CFB0-4347-8DCC-505A4789876B",
    "categoryName": "Clothing, Vests",
    "sku": "VE-C304-L",
    "name": "Classic Vest, L",
    "description": "A worn brown classic vest that was a trade-in apparel item",
    "price": 32.4,
    "tags": [
        {
            "id": "2CE9DADE-DCAC-436C-9D69-B7C886A01B77",
            "name": "apparel",
            "class": "group"
        },
        {
            "id": "CA170AAD-A5F6-42FF-B115-146FADD87298",
            "name": "worn",
            "class": "trade-in"
        },
        {
            "id": "CA170AAD-A5F6-42FF-B115-146FADD87298",
            "name": "no-damaged",
            "class": "trade-in"
        }
    ]
}

以下のSQLにより、

SELECT 
    p.id,
    p.name,
    t.name AS tag
FROM 
    products p
JOIN
    (SELECT VALUE t FROM t IN p.tags WHERE t.class = 'trade-in') AS t

以下のようにtrade-inのみを抽出できる

[
    {
        "id": "80D3630F-B661-4FD6-A296-CD03BB7A4A0C",
        "name": "Classic Vest, L",
        "tag": "worn"
    },
    {
        "id": "80D3630F-B661-4FD6-A296-CD03BB7A4A0C",
        "name": "Classic Vest, L",
        "tag": "no-damaged"
    }
]
しろくましろくま

クエリで変数を実装

  • Fluentメソッドを使う。
string sql = "SELECT p.name, t.name AS tag FROM products p JOIN t IN p.tags WHERE p.price > @lower"
QueryDefinition query = new (sql)
    .WithParameter("@lower", 500);