西田@CX事業部です
今回はDataflowで使われてる、Apache Beamを実行、テストできる環境を、 Mavenを使って 1 から構築していきたいと思います
※ 全てのコードは Github に Push しています
Mavenのプロジェクト作成
MavenでApacheBeamが最低限の依存関係で実行できるプロジェクトを作成していきます
空のプロジェクトを作成
mvn
コマンドを使って、空のJavaプロジェクトを作成します
mvn -B archetype:generate \
-DarchetypeGroupId=org.apache.maven.archetypes \
-DgroupId=com.example.myapp \
-DartifactId=com.example.myapp
ライブラリのインストール
pom.xml
にApache Beamのバージョンをプロパティに定義します
<!-- ...省略... -->
<properties>
<beam.version>2.50.0</beam.version>
</properties>
<!-- ...省略... -->
pom.xml
に依存関係にApache BeamとDataflow関連のライブラリを追加します
<dependencies>
<!-- ...省略... -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- ...省略... -->
</dependencies>
依存関係をインストールします
mvn install
Apache Beamのコードを作成
簡単な Apache Beam のコードを作成します。以下は渡された文字列のリストを標準出力するだけのPipelineです
public static void main( String[] args ) {
Pipeline p = Pipeline.create();
List<String> words = Arrays.asList("aaa", "bbb ccc", "ccc", "", "c", "\n", "aa");
p.apply(Create.of(words))
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
System.out.println(context.element());
context.output(context.element());
}
}));
p.run().waitUntilFinish();
}
上記コードをMavenの compile exec:java
で実行します
mvn compile exec:java -Dexec.mainClass=com.example.myapp.Example
出力結果
c
bbb ccc
aaa
aa
ccc
※ 順番が保証されない点に留意が必要です
テスト作成
テスト対象のDoFnを作成
今回のテストの対象になるDoFnを作成します。以下のDnFnの主な動作は、空白で文字列を分割し、空白のみの要素を取り除きます
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
static class ExtractWordsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
String[] words = element.split(TOKENIZER_PATTERN, -1);
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
テストを作成
テスト対象に作成したDoFnのテストを作成します
@RunWith(JUnit4.class)
public class AppTest
{
@Rule public final transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Test public void testApp() {
List<String> words = Arrays.asList(" some input words", "", "\n", "foo");
Create.of(words);
PCollection<String> input = p.apply(Create.of(words));
PCollection<String> output = input.apply(ParDo.of(new ExtractWordsFn()));
// 並列処理する都合でPCollectionの順番は変わる
PAssert.that(output).containsInAnyOrder("some", "input", "words", "foo");
p.run().waitUntilFinish();
}
}
いくつかピックアップして説明します
@Rule public final transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
テスト用のパイプラインです。
筆者の環境では、 enableAbandonedNodeEnforcement(false)
を明示的に設定しないとエラーになりました
List<String> words = Arrays.asList(" some input words", "", "\n", "foo");
Create.of(words);
文字列のArrayListからテストの入力になる、 PCollection を作成してます
PCollection<String> input = p.apply(Create.of(words));
PCollection<String> output = input.apply(ParDo.of(new ExtractWordsFn()));
// 並列処理する都合でPCollectionの順番は変わる
PAssert.that(output).containsInAnyOrder("some", "input", "words", "foo");
Pipeline を実行し、 PAssert で、最終アウトプットの PCollection の検査をしています
テストを実行する
mvn test
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.297 s -- in com.example.myapp.AppTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.676 s
[INFO] Finished at: 2023-11-16T13:48:17+09:00
[INFO] ------------------------------------------------------------------------