Closed13
Salesforce Change Data Capture - Get StartedやPub/Sub APIクライアントのJavaサンプルをトレースしながら試してみる
参考
Get Started with Pub/Sub API
SalesforceのPub/Sub APIクライアントサンプルのリポジトリ
gRPC - Language - Java - Quick start
grpc/grpc-java
Create Maven Project
- IntelliJ IDEAでMaven Projectを作成
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
Verify that the Maven project is working properly
- dependecyにjunitを追加
pom.xml - dependencies
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
</dependencies>
- 適当なクラス、およびテストクラスを作成
Demo.java
public class Demo {
public String hello() {
return "Hello";
}
}
DemoTest.java
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class DemoTest {
@Test
void hello() {
// given:
Demo sut = new Demo();
// when:
String actual = sut.hello();
// then:
assertEquals("Hello", actual);
}
}
-
Run DemoTest
をして正常に動作することを確認
Generate Code from the Proto File
参考
proto ファイルを配備する
- pub-sub-api リポジトリからprotoファイルをダウンロードする
- Mavenプロジェクトにフォルダ
src/main/proto
を作成する - protoファイルを
src/main/proto
フォルダに配備する
pom.xml
を編集する
-
build
を追加 - バージョンは現在(2023/8/9時点)での最新バージョンに設定
pom.xml - build
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.23.4:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.57.1:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
-
mvn clean compile
を実行 - エラーになった
- 生成されたJavaソースで以下のようなエラーが出力されている
パッケジcom.google.protobufは存在しません
- 他にもいくつか
- 生成されたJavaソースで以下のようなエラーが出力されている
- エラー内容から何回か試して、以下のdependencyを追加
- propertyに
grpc.version
を追加
pom.xml
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.57.1</grpc.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
~~~ 省略 ~~~
- 現時点での
pom.xml
は以下のようになった
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.57.1</grpc.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.23.4:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- ビルドに成功すると、
target/generated-sources/protobuf
フォルダ配下にJavaソースが生成されている
Coding according to trace ProcessChangeEventHeader in pub-sub-api
- 必要な部分のみ
Demo.java
に記述 - 必要なdependencyのみ
pom.xml
に定義 - 接続情報は環境変数で設定するように変更
- ログは
System.out.println
やThrowable.printStackTrace
で代替 - 動作することは確認済み
- gist - Coding according to trace ProcessChangeEventHeader in pub-sub-api に記載
The following is a checklist of areas of concern
TBD
io.grpc:grpc-netty
in pom.xml
?
why -
pom.xml
のdependencyにio.grpc:grpc-netty
が定義されている
pom.xml
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
- このdependencyをコメントアウトして、実行してみると以下のようなエラーが出力される
output
Exception in thread "main" io.grpc.ManagedChannelProvider$ProviderNotFoundException: No functional channel service provider found. Try adding a dependency on the grpc-okhttp, grpc-netty, or grpc-netty-shaded artifact
at io.grpc.ManagedChannelProvider.provider(ManagedChannelProvider.java:45)
at io.grpc.ManagedChannelBuilder.forAddress(ManagedChannelBuilder.java:39)
at Demo$Subscriber.<init>(Demo.java:218)
at Demo.<init>(Demo.java:55)
at Demo.main(Demo.java:39)
- channelを生成する際にchannel service providerというものが見つからないというエラーの様子
- メッセージに提示されたどれかをdependencyに追加して、と書いてある
-
pub-sub-api
のpom.xml
に定義されているio.grpc:grpc-netty
を追加することで、このエラーは解消した -
scope
はruntime
でも良さそう
pom.xml
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
<scope>runtime</scope>
</dependency>
- 動いた
io.grpc:grpc-netty-shaded
Replace -
io.grpc:grpc-netty-shaded
に置き換えることもできる
pom.xml
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
<scope>runtime</scope>
</dependency>
-
io.grpc:grpc-netty
の依存を含んだshaded jarのみに依存するようになる - 置き換えても動いた
Some bitmap fields in the ChangeEventHeader of a Change event need to be decoded
List<String> changedFields = getFieldListFromBitmap(schema, (GenericData.Record) eventPayload.get("ChangeEventHeader"), "changedFields");
-
ChangeEventHeader
のchangedFields
は上記のようにデコードしている - 例えば3.4.3. 複数レコードに対して同じ内容で更新 では、
"changedFields": ["0xA0"]
のようになっている - Bitmap Fields in ChangeEventHeader in Change Events にその旨が記載されている
変更データ キャプチャ イベントには、印刷時に内容を読み取ることができないビットマップ フィールドが含まれており、サブスクライバー アプリのロジックで処理するにはデコードする必要があります。Pub/Sub API は、これらのフィールドを変換せずに、生の Avro バイナリ形式でイベントを配信します。ビットマップフィールドは次のとおりです。
changedFields
変更されたフィールドが含まれます。diffFields
値がデータ差分として送信されたフィールドが含まれます。nulledFields
null に設定されたフィールドが含まれます。
diffFields in ChangeEventHeader
-
値がデータ差分として送信されたフィールドが含まれます。
とはどういうことか? - Sending Data Differences for Fields of Updated Records に記載がある
- 大きなテキストデータ項目に対して、イベントペイロードサイズの削減のために差分のみを送るようだ
- サイズの削減が見込めない場合には、差分ではなくデータ全体が送られる
- 扱いが難しそう
contains("-")
in expandBitmap?
What are you doing after
if (values.get(values.size() - 1).contains("-")) {
for (ListIterator<String> itr = values.listIterator(); itr.hasNext();) {
String[] bitmapMapString = itr.next().split("-");
if (bitmapMapString.length < 2) {
continue;
}
Schema.Field parentField = schema.getFields().get(Integer.valueOf(bitmapMapString[0]));
Schema childSchema = getValueSchema(parentField.schema());
if (childSchema.getType().equals(Schema.Type.RECORD)) {
int nestedSize = childSchema.getFields().size();
String parentFieldName = parentField.name();
List<String> fullFiledNames = fieldNamesFromBitmap(childSchema, bitmapMapString[1]).stream()
.map(col -> parentFieldName + "." + col)
.collect(Collectors.toList());
if (!fullFiledNames.isEmpty()) {
itr.remove();
if (fullFiledNames.size() == nestedSize) {
itr.add(parentFieldName);
} else {
fullFiledNames.stream()
.forEach(itr::add);
}
}
}
}
}
-
parentField
やchildSchema
という変数名から何かしらの構造を持った項目を扱っているようだ - Bitmap Fields in ChangeEventHeader in Change Events に以下の記載がある
ビットマップ フィールドは文字列の配列です。配列の最初の要素には、個々のフィールドのビットマップが含まれています。複合フィールドは、ネストされたフィールドを示すビットマップとともに配列の追加要素に配置されます。追加の配列要素の形式は、「{ParentFieldPosition}-{NestedFieldBitmap}」です。
- 複合フィールドについては以下のページに記載がある
- 複合項目
- 複合項目は、
住所複合項目
と地理位置情報の複合項目
の2種類のようだ - 試しに地理位置情報のカスタム項目を追加して、変更データイベントを見てみる
- 緯度、経度の両方を更新してみる
- 以下のような内容が取得できた
"changedFields": ["0x20", "9-0x03"]
-
"0x20"
はLastModifiedDate
-
"9-0x03"
がgeo_location
のようだ -
"9-0x03"
のうち、9
はgeo_location__c
-
"9-0x03"
のうち、0x03
のbitSet
は{0, 1}
でSchemaから[Latitude, Longitude]
に対応付けられている - 結果、
geo_location__c.Latitude
とgeo_location__c.Longitude
が変更されたことが分かる
- 全ての項目を変更してみた場合、
"changedFields": ["0x01A4", "9-0x03"]
となり、複合項目でないものは0x01A4
で表現されていて、最後に複合項目が9-0x03
で表現されていた
io.grpc:grpc-bom
Use -
io.grpc:grpc-bom
をimportすることで関係するdependencyそれぞれでversionの定義は不要になる
pom.xml
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
org.eclipse.jetty:jetty-client
Remove
pom.xml
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>11.0.15</version>
</dependency>
- pub-sub-apiリポジトリのコードをトレースした際に、記載したもの
- 今回は既に取得済みのaccess_tokenを利用した認証を行っているため、jetty-clientは不要だった
- pub-sub-apiリポジトリでは、ユーザー名/パスワード認証を行う際にjetty-clientを利用していた
- そのため、この依存を削除して、コードからも関連部分を削除した
このスクラップは2023/08/10にクローズされました