Apache Beam の自動テスト環境を 1 から Maven で構築してみた

2023.11.16

西田@CX事業部です

今回はDataflowで使われてる、Apache Beamを実行、テストできる環境を、 Mavenを使って 1 から構築していきたいと思います

※ 全てのコードは Github に Push しています

Mavenのプロジェクト作成

MavenでApacheBeamが最低限の依存関係で実行できるプロジェクトを作成していきます

空のプロジェクトを作成

mvnコマンドを使って、空のJavaプロジェクトを作成します

mvn -B archetype:generate \
 -DarchetypeGroupId=org.apache.maven.archetypes \
 -DgroupId=com.example.myapp \
 -DartifactId=com.example.myapp

ライブラリのインストール

pom.xml にApache Beamのバージョンをプロパティに定義します

<!-- ...省略... -->
<properties>
  <beam.version>2.50.0</beam.version>
</properties>
<!-- ...省略... -->

pom.xml に依存関係にApache BeamとDataflow関連のライブラリを追加します

<dependencies>
  <!-- ...省略... -->
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>${beam.version}</version>
  </dependency>
	
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>${beam.version}</version>
    <scope>runtime</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
  </dependency>   
  <!-- ...省略... -->
</dependencies>

依存関係をインストールします

mvn install

Apache Beamのコードを作成

簡単な Apache Beam のコードを作成します。以下は渡された文字列のリストを標準出力するだけのPipelineです

public static void main( String[] args ) {
    Pipeline p = Pipeline.create();
    List<String> words = Arrays.asList("aaa", "bbb ccc", "ccc", "", "c", "\n", "aa");

    p.apply(Create.of(words))
        .apply(ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext context) {
                System.out.println(context.element());
                context.output(context.element());
            }
        }));

    p.run().waitUntilFinish();
}

上記コードをMavencompile exec:java で実行します

mvn compile exec:java -Dexec.mainClass=com.example.myapp.Example

出力結果

c
bbb ccc


aaa
aa
ccc

※ 順番が保証されない点に留意が必要です

テスト作成

テスト対象のDoFnを作成

今回のテストの対象になるDoFnを作成します。以下のDnFnの主な動作は、空白で文字列を分割し、空白のみの要素を取り除きます

public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

static class ExtractWordsFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> receiver) {
        String[] words = element.split(TOKENIZER_PATTERN, -1);

        for (String word : words) {
            if (!word.isEmpty()) {
                receiver.output(word);
            }
        }
    }
}

テストを作成

テスト対象に作成したDoFnのテストを作成します

@RunWith(JUnit4.class)
public class AppTest 
{
    @Rule public final transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    @Test public void testApp() {
        List<String> words = Arrays.asList(" some input words", "", "\n", "foo");
        Create.of(words);

        PCollection<String> input = p.apply(Create.of(words));
        PCollection<String> output = input.apply(ParDo.of(new ExtractWordsFn()));

        // 並列処理する都合でPCollectionの順番は変わる
        PAssert.that(output).containsInAnyOrder("some", "input", "words", "foo");

        p.run().waitUntilFinish();        
    }
}

いくつかピックアップして説明します

@Rule public final transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

テスト用のパイプラインです。 筆者の環境では、 enableAbandonedNodeEnforcement(false) を明示的に設定しないとエラーになりました

List<String> words = Arrays.asList(" some input words", "", "\n", "foo");
Create.of(words);

文字列のArrayListからテストの入力になる、 PCollection を作成してます

PCollection<String> input = p.apply(Create.of(words));
PCollection<String> output = input.apply(ParDo.of(new ExtractWordsFn()));

// 並列処理する都合でPCollectionの順番は変わる
PAssert.that(output).containsInAnyOrder("some", "input", "words", "foo");

Pipeline を実行し、 PAssert で、最終アウトプットの PCollection の検査をしています

テストを実行する

mvn test
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.297 s -- in com.example.myapp.AppTest
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  2.676 s
[INFO] Finished at: 2023-11-16T13:48:17+09:00
[INFO] ------------------------------------------------------------------------

参考