Step Functionsのステートマシンを利用した正規表現エンジンを作成する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

通常、Step Functionsを利用する際には複雑な作業を行うタスク部分をLambda関数やアクティビティなどで実装し、全体のワークフローをステートマシンによって管理します。

一方で、情報科学においては正規表現はステートマシンからなる有限オートマトンと等価であり、任意の正規表現はそれと同等の有限オートマトンによって表現できることが知られています。

そこで、本記事では本来Step Functionsにおいて処理を担う「タスク(Lambda関数、アクティビティ)」を用いず、ステートマシンのみを用いて正規表現マッチングを実現する正規表現エンジンを作成します。

おことわり

正規表現エンジンを作成するにあたって、hiratara様の以下の記事を大いに参考にしています。

本記事ではエンジンを作成する上で必要な知識の簡単な紹介に留めていますが、詳細な解説や実装などが気になる方はこちらの記事をご覧ください。

また、引用している内容に問題がありましたら、コメントにてご連絡いただければと思います。

対象となる正規表現について

本記事における正規表現の規則を以下にまとめます。

表記 説明
A|B 文字列の和集合(A or B)
AB 文字列の連結
A* 文字列の繰り返し

引用 : 正規表現エンジンを作ろう (1)

私たちが関わる一般的な正規表現においては、上記以外にも様々な正規表現の規則が存在しますが、その多くは上記3つの規則によって代替することが可能となります。例えば、a+という正規表現はaa*に、a{1,2}a|aaによって代替することができます。実装を簡易にするため、最低限の要素である上記3つの規則のみを今回は対象とします。

また、部分置換は今回の正規表現エンジンの対象としていないため、これらに関する規則についても対象外としています。

正規表現エンジンの概要

作成する正規表現エンジンは、指定された正規表現を入力として受け取り、最終的にはState Languageで表現されたステートマシンを出力します。

正規表現エンジンの内部では、与えられた正規表現に対して、字句解析・構文解析・NFA変換・DFA変換という4つの処理を行います。これらの処理を順次行うことで、正規表現は最終的にステートマシンへと変換することができます。

正規表現のマッチングを行う際は、エンジンの出力結果をもとにStep Function上にステートマシンを作成し、そこに文字列を入力することでマッチングの判定を行います。

以降では、正規表現エンジンの内部で行われる4つの処理(字句解析・構文解析・NFA変換・DFA変換)について内容を説明し、最後にState Languageによるステートマシンの出力を行います。

字句解析

字句解析処理では、入力された正規表現を1文字ずつ分解し、それぞれを「トークン」と呼ばれる単位に変換します。アルファベットなどの通常の文字のほか、和集合を示す|や繰り返しを示す*などの特殊な文字についても同様にトークンに変換されます。

トークンに変換する際は、文字の種別を分類し、トークン内部に記録しておきます。種別に関する情報は、後の構文解析処理の際に利用します。また、エスケープ文字が入力された場合には、その文字はスキップした上で次の1文字を文字を表すトークン(CHARACTER)として扱います。

トークンの種別を以下の表にまとめます。

トークン種別 説明
CHARACTER 文字を表すトークン。'a'~'z'、等
OPE_UNION 和集合演算 '|'
OPE_STAR 繰り返し演算 '*'
LPAREN 左括弧 '('
RPAREN 右括弧 ')'
EOF 文末を表す

引用 : 正規表現エンジンを作ろう (3)

構文解析

構文解析処理では、字句解析処理で作成されたトークンをもとに構文的なチェックを行い、同時に構文木の作成を行います。

構文解析を行うにあたって、事前に正規表現の文法規則を整理しておく必要があります。ここではバッカス・ナウア記法(BNF)と呼ばれる記法を用いて、文法規則を記述します。この文法規則に従って正規表現を分解することで、正規表現として与えられた文字列が構文として正しいかどうかを確認することができます。

<expression> ::= <subexpr> <EOF>
<subexpr> ::= <seq> "|" <subexpr> | <seq>
<seq> ::= <subseq> | ""
<subseq> ::= <star> <subseq> | <star>
<star> ::= <factor> "*" | <factor>
<factor> ::= "(" <subexpr> ")" | <CHARACTER>

また、上記の文法規則に照らし合わせる作業と同時に、構文木の作成も行います。構文木の節点ははじめに正規表現の規則として示した「和集合(Union)」、「連結(Concat)」、「繰り返し(Star)」の3つのいずれかが該当し、葉の部分にあたるのが文字となります。

例えば、a|(bc)*という正規表現を変換した場合、以下のような構文木が作成されます。

引用 : 正規表現エンジンを作ろう (3)

NFA変換

NFA変換処理では、構文解析によって作成された構文木をもとに、非決定性有限オートマトン(NFA)の作成を行います。

NFAは入力文字を受け取らない遷移(ε-遷移)や、入力文字から状態が一意に定まらないことを特徴に持つ有限オートマトンの一種であり、正規表現からの変換が容易に行いやすいという特徴があります。NFAにおいて、入力文字を受け取らない遷移はεで表現されます。

正規表現での文字列やその連結、和集合、繰り返しなどの規則をNFAに変換した場合、下記のように各規則に対応する形で変換できることがわかります。

正規表現は通常複数の規則の組み合わせによって表現されますが、構文木の構造に従って入れ子の様に規則を展開していけば、最終的なNFAを得ることができます。

例として、a|(bc)*をNFAに変換した結果を以下に示します。

NFAは正規表現からの変換は容易に行えますが、入力がなくとも遷移することを許しているため、与えられた文字入力に対して結果の状態を一意に定めることが出来ません。このため、NFAのままでは機械的に入力文字が正規表現にマッチしているかを判定することが困難です。そこで、作成されたNFAをもとに結果が一意に定まるDFAへの変換を行います。

DFA変換

DFA変換処理では、部分集合構成法という手法を用いて、NFAを決定性有限オートマトン(DFA)に変換します。

DFAは有限オートマトンの一種ですが、NFAとは異なりε-遷移や入力に対して複数の状態を持つことを許していません。このため、与えられた入力に対して結果の状態が一意に定まるという特徴があります。

NFAは任意の入力文字列に対する受理状態が常に等しいDFAへの変換が可能であることが知られています。ここでは変換手法の一つである部分集合構成法によって、DFAへの変換を行います。

部分集合構成法による変換の要点を以下に簡単にまとめます。

  1. NFAの状態のべき集合をDFAの状態とする。
  2. NFAの初期状態からε-遷移のみで遷移する状態の組み合わせを、DFAの開始状態とする。
  3. DFAの状態を構成する各状態に入力を1単位与えた場合にNFA上で遷移しうる状態の集合を、DFAでの遷移先状態とする。
  4. NFAにおける受理状態をDFAの状態が含んでいる場合、DFAでも同様に受理状態として扱う。

変換を行う際は、まず初めにDFAにおける状態の列挙を行います。べき集合とは、ある集合の元となる要素の組み合わせによって作成されるすべての部分集合の集合です。例えば、NFAにS1,S2,S3の3つの状態があるとき、部分集合構成法によって列挙されるDFAの状態は{S1}, {S2}, {S3}, {S1,S2}, {S2,S3}, {S1,S3}, {S1,S2,S3}の7つになります。

べき集合による状態を作成したあとは、DFAにおける初期状態の決定を行います。NFA上で初期状態からε-遷移を新しい状態が現れなくなるまで繰り返し、到達した全ての状態を含んだDFA上での状態を初期状態とします。

次に、DFAにおける各状態からの状態遷移について決定していきます。DFAでのある状態を構成する各要素に対して、NFA上に1単位入力を与えた場合に、ε-遷移をふくめて遷移しうる状態を全て洗い出し、その状態の集合をDFA上における入力に対する状態遷移とします。

最後に、受理状態の決定を行います。NFA上での受理状態を含んでいる集合を全てDFA上における受理状態として扱います。

正規表現a|(bc)*のNFAをDFAに変換した場合の例を以下に示します。DFAには図示した状態以外にも変換の過程で様々な状態が列挙されますが、開始状態から到達しないものについては省略しています。

State LanguageでDFAを表現する

DFAへの変換処理を行なった後、最後に作成したDFAの構成を出力します。今回はStep Function上でのステートマシンの作成が目的であるため、DFAを出力する際にState Languageの形式に合わせる必要があります。

ここでは、State Languageを用いてDFAを表現する際の要点についてまとめていきます。

入力の変換

正規表現とマッチングを行う入力文字列は、ステートマシンに入力する前に1文字ずつ分解し、末尾に"EOL"を追加した配列に変換します。

["a", "b", "c", "EOL"]

Step FunctionsではJsonPath形式に基づいた入力の選択機能が提供されており、配列やオブジェクトの一部を指定して取得することができますが、文字列そのものを操作することはできないため、事前に配列へと変換しています。

また、提供されている選択機能では空の配列を検知することができないため、"EOL"を末尾につけておき、これが取得された場合を入力の終端と判定するようにします。

状態遷移の再現

DFAにおける入力の受理や遷移は、以下の様にState Languageで状態を定義することで再現することができます。

"S1": {
	"Type": "Choice",
	"Choices": [
        {
          "Variable": "$[0]",
          "StringEquals": "a",
          "Next": "S2"
        }
	"OutputPath": "$[1:]",
	"Default": "error"
},
"S2": {
...

Choices内の"Variable": "$[0]"で入力の先頭文字を受理し、"StringEquals"によって入力に応じた遷移先状態を指定します。対応する文字列がなければ、"error"状態へと遷移し、そのまま失敗となります。

遷移を行う際には、"OutputPath": "$[1:]"により先頭文字を除去した上で入力が次の状態へと引き継がれます。

受理状態の再現

DFAにおいて、受理状態は通常の状態と同じく遷移を持つことができますが、State Languageでは、ChoiceSucceedのタイプを1つの状態が同時に持つことは出来ません。よって、通常のChoice"EOL"が入力された場合にSucceed状態へと遷移する分岐を追加し、受理状態の代替とします。

受理状態を再現した例を以下に示します。

"S9": {
    "Type": "Choice",
    "OutputPath": "$[1:]",
    "Default": "error",
    "Choices": [
      {
        "Variable": "$[0]",
        "StringEquals": "EOL",
        "Next": "succeed"
      },
      {
        "Variable": "$[0]",
        "StringEquals": "a",
        "Next": "S1"
      },
      ...
      ]
    },
...

作成例

最後に、a|(bc)*のDFAをState Languageで表現した例とStep Functions上での遷移図を以下に示します。

{
  "States": {
    "error": {
      "Type": "Fail",
      "Cause": "invalid input"
    },
    "succeed": {
      "Type": "Succeed"
    },
    "8-1-3-7": {
      "Type": "Choice",
      "OutputPath": "$[1:]",
      "Default": "error",
      "Choices": [
        {
          "Variable": "$[0]",
          "StringEquals": "EOL",
          "Next": "succeed"
        },
        {
          "Variable": "$[0]",
          "StringEquals": "a",
          "Next": "2"
        },
        {
          "Variable": "$[0]",
          "StringEquals": "b",
          "Next": "4-5"
        }
      ]
    },
    "2": {
      "Type": "Choice",
      "OutputPath": "$[1:]",
      "Default": "error",
      "Choices": [
        {
          "Variable": "$[0]",
          "StringEquals": "EOL",
          "Next": "succeed"
        }
      ]
    },
    "4-5": {
      "Type": "Choice",
      "OutputPath": "$[1:]",
      "Default": "error",
      "Choices": [
        {
          "Variable": "$[0]",
          "StringEquals": "c",
          "Next": "3-6"
        }
      ]
    },
    "3-6": {
      "Type": "Choice",
      "OutputPath": "$[1:]",
      "Default": "error",
      "Choices": [
        {
          "Variable": "$[0]",
          "StringEquals": "EOL",
          "Next": "succeed"
        },
        {
          "Variable": "$[0]",
          "StringEquals": "b",
          "Next": "4-5"
        }
      ]
    }
  },
  "StartAt": "8-1-3-7"
}

Step Functionsで実行する

ここでは今回実装したエンジンを利用し、Step Functionsでのステートマシンの作成と入力文字列のマッチングを行います。実装したエンジンについては下記のものを用います。

ステートマシンの作成

ステートマシンの作成は、以下のスクリプトを用いて実行します。

# compile.py
import sys

import boto3

from regex_sfn.lexer  import Lexer
from regex_sfn.parser import Parser
from regex_sfn.dump import Dumper
from regex_sfn.nfa2dfa import nfa2dfa

ROLE_ARN = "arn:aws:iam::XXXXXXXXXX:role/regex-sfn"

def compile2sfn(name, regex, sfn):
    # 正規表現をDFAに変換
    lexer = Lexer(regex)
    parser = Parser(lexer)
    nfa = parser.expression()
    dfa = nfa2dfa(nfa)

    # DFAをState Languageで出力
    dumper = Dumper(dfa)
    definition = dumper.dump_sf_definition()

    # ステートマシンの作成
    resp = sfn.create_state_machine(name=name,
                                    definition=definition,
                                    roleArn=ROLE_ARN)
    return resp

def compile(regex):
    sfn = boto3.client("stepfunctions")
    resp = compile2sfn("regex-sfn", regex, sfn)

    return resp

if __name__ == "__main__":
    compile(sys.argv[1])

スクリプトの引数に正規表現を与えることで、ステートマシンの作成が行われます。

$ compile.py "a|(bc)*"

マッチングの実施

作成されたステートマシンを用いた文字列のマッチングは、以下のスクリプトから実行します。

# match.py
import sys
from time import sleep
import json

import boto3

STATE_MACHINE_ARN = "arn:aws:states:ap-northeast-1:XXXXXXXXX:stateMachine:regex-sfn"

def start_exec(input_string, sfn):
    resp = sfn.start_execution(stateMachineArn=STATE_MACHINE_ARN,
                               input=input_string)
    exec_arn = resp["executionArn"]
    return exec_arn

def match(line):
    sfn = boto3.client("stepfunctions")
    # 文字列を配列に変換
    input_string = json.dumps(list(line) + ["EOL"])

    # ステートマシンに入力を与える
    exec_arn = start_exec(input_string, sfn)

    while True:
        status = sfn.describe_execution(executionArn=exec_arn)["status"]
        if status != "RUNNING":
            break
        sleep(1)

    # 遷移の結果を返す
    return status

if __name__ == "__main__":
    _status = match(sys.argv[1])
    print(_status)

引数に文字列を入力すると、マッチングの結果を取得することができます。

$ python match.py bcbcbcbc
SUCCEEDED
$ python match.py abc
FAILED

Step Functionsの画面からも、遷移が発生していることが確認できます。

おわりに

Step Functionsのステートマシンのみを用いて、正規表現による文字列マッチングができることを確認しました。

本手法が有効な場面等は特に思いつかないため、皆様におかれましてはこれまで通りStep FunctionsをLambda等と共にご利用いただければと思います。

参考資料