ScalaMatsuri 2017参加報告 #scalamatsuri

2017.03.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

先日行われたScalaMatsuri2017に参加してきました。
私のScala経験はHello Worldレベルですが、ScalaMasuriに参加して本当に良かったです。
以下、参加したセッションについてまとめます。

レポート

ストリームデータ処理入門

  • 無限に発生するデータに対して、無限に続く処理にストリーム処理が向いている
  • ストリーム処理の、バッチとの違いは、低レイテンシであること
  • 分類としては、1:バッチ、2:対話型クエリ、3:ストリーム処理
    • ストリーム処理のみ、開始時点でデータが到着していない
  • Out Of Order: ストリーム処理ではデータが発生順に到着しない。EventTime/ProcessingTimeという複数の時刻概念が必要
    • 例えば、不正ログイン検知では発生順である必要がある
    • watermark: event timeとprocessing timeのズレを扱って対処->近似に過ぎないけど、有用
    • trigger: いつ集計結果を出力するかを定義するしくみ。watermarkを柔軟に扱う
    • accumulation: triggerで結果が複数回出力されたときにどう扱うか
  • ストリーム処理のトレードオフは、watermarkなら完全性とレイテンシ、triggerならストレージ容量
    • 完全性、低レイテンシ、低コストを全て達成することはできない
    • 用途により分かれる
  • メッセージバスでバッファリングしてストリーム処理エンジン、出力システムで処理
    • NiFI: UIで定義できるがその分構成管理が手間になりがち。小さなものを沢山作るなら良い
    • Kinesis Analytics: AWSのフルマネージドメッセージバスと言える
    • 最初はFlinkかApexがオススメ。GearpumpはAkkaユーザ向けで難しいが機能充実。
    • SparkユーザならSpark Streams。
    • Kafka Streams: Kafkaを使っている人が補助的に使う
  • ストリーム処理の検討ポイント:問題領域。可用性、システム管理、開発方式
  • バッチ処理はストリーム処理のサブセットなので、リアルタイム以外でも適用可能

※私の文章では、バッチ処理に比べて処理の周期が短く、より連続に近いという意味で、リアルタイムと表現しています

ゼロからサービスを作るのに必要なこと

  • マイクロサービスでは1機能がサービスになる
  • TwitterのGIFボタンとステッカー機能を開発したときの話
  • User.find(id)の中身をARからUser Serviceへのリクエストで置き換えた
    • 分けたものをまとめてシングルサービスにする必要がある
    • 関数的アプローチ:リクエストは関数呼び出しみたいに見える
    • 関数的なアーキテクチャ:複雑なものを単純に分けてFutureで呼び出して返す
  • リポ分けてるとダイヤモンド依存が発生するのでTwitterではモノレポGit(高速でclone特殊なしくみ)+Pantsビルドシステム
  • Finagle: RPCライブラリ、Finatra
  • Mesos: Aurora
  • zookeeper(サービス登録)


制作時のコミュニケーション

  • P(roduct)Mとデザイナで企画 -> PMとエンジニアが設計 -> エンジニアとデザイナで実装
  • 設計(PMとのやりとり)
    • PMはリリースに責任を持つが、エンジニアは技術的負債にも責任を持つ
    • 設計段階の推測はteck-debt直結なので、まずは繋いで作ってみる。まずは動く状態にする。動く状態のインスタンスをたくさんつくる
    • aurora役立った
    • Dtab: Delegation Table。開発時、名前解決を上書きして特定のサービスのみローカルを参照するようにできる。DNSみたいに名前からIPを引く
  • 実装(デザイナとのやりとり)
    • 状態を管理して例外を減らすことが、クライアントの開発に影響する。
    • UIの内部状態の管理が大事。Flux的な一方通行。コードレビューの基準になってる。これで例外的な動作を減らせる。
  • エンジニアどうしのやりとり
    • ドキュメントはすぐ古くなるので、信じられるのはソースだけ
    • モノレポだと特定の状態をgitのSHA-1で特定できる。ソース読みやすくなる。
    • 正しい関数定義、インタフェースを作るとドキュメントを減らせる
    • アプリとして性能を出す(=時間管理)が大事。99%ileのため調整が必要。呼び出し側の過度なタイムアウトが無いかという視点も。

Akkaで分散システムの障害に備える

  • 機能横断要件をまず定義する。その際にビジネスが必要とする応答性と可用性と遅延。非機能要件みたいなもの。
  • サーキットブレーカー、隔壁などのツールキットがAkkaにはある
  • まずtell: fire and forgetを使えると楽。askはタイムアウト。
  • ask時の障害のハンドリングは、送り元に返さず、supervisorに返してそこで解決してもらう。
  • サーキットブレーカー
    • akkaのステートで負荷・失敗の計測・カウントを行い、一定時間のメッセージ送信抑制やリトライを行なう
    • 深いサービスの遅延は、呼び出し元がブロッキング処理だと伝搬してしまう
    • ブレークしたときのデータの扱いは、CAP定理やビジネスモデルを考慮して決める
  • 隔壁(bulkhead)
    • 危ないactor(DBアクセスなどでブロッキングする)とはdispatcherを分け、スレッドプールを分けることで、リソースを共有しない。
    • CQRSも一種の隔壁。コマンドで問題があったときもクエリに影響しない設計。
  • ハード障害に備える
    • 単一障害点の最小化と状態の永続化
    • akka clusterとakka persistence
    • backoff supervisor: 闇雲に永続状態からリトライしない。exponential backoff。
  • ネットワーク障害に備える
    • akkaのsplit brain resolver。戦略に基づき片方のクラスタを落とす。
    • 4つの戦略が実装されている。
    • Split Brain ResolverはOSSに含まれておらず、Lightbend Reactive Platformで提供さている。

ストリーミング・ワールドのためのサバイバル・ガイド

  • akkaにかぎらず様々な種類のストリームがあり、比較しにくい。
    • ストリームとは無限のデータセット
  • fast data: big dataのうち遅延なく処理するアーキテクチャ
  • 遅延が認められるならSparkでも良い
  • kafkaはpub-subみたいなもの。
    • akka streamを使ってkafka topickを消化できる
  • 最も大きな違いはレイテンシ
  • reactive appは全体の一部。それ以外のシステムもreactiveにして恩恵を得たい。
    • reactiveでないシステムとやりとりするところが課題。
  • APIではなくSPI(Service Provider Interface)。
    • 
play/alla/benそれぞれで話し合った結果がJDK9にマージされる

  • バックプレッシャー
    • 呼び出されたがわの処理能力が少ない場合、呼び出し側が大量にメッセージを送ってきても処理できない。オーバーフローする。
    • なのでスループットを要求する。これがバックプレッシャー
  • alpakka: ストリームコネクタのcommunity
  • TCPはストリーミングプロトコルと言える。
    • データ量に限りがあるフローコントロール。
  • 登壇者の本->why-reactive

DMMのAPI GatewayをAkka StreamsとAkka HTTPで作り込んでみた

  • APIGW開発の目的は認可パフォーマンスの向上
    • 外部からはAkka HTTPサーバで受けて、バックエンドにAKka HTTPクライアントとしてリクエスト
    • 全てのストレージドライバはFuture対応
  • Akka HTTP Low-level APIを採用したのは、HighはルーティングのDSLがハードコードで要件を満たせなかったため
    • 設定のリロードをgracefulにできる。トリガーはJVMへのSIGHUP
    • HOCON形式だとJSONよりも記述量が減って良い
  • Routingフローグラフ。NofFoundやAbortのときはエンドポイントへのリクエストが不要なので、短絡フローですぐ終わらせる。
    • StreamsのGraph SKLパーティション: FanOutさせる。入力に対して出力をn分割。その後のマージが必要。
  • 独自のHTTPヘッダ付与やJSON RPC<=>REST変換でIncomming/Outgoing Filter Flowを開発した。ここでAPIGWの差分吸収の責務を実装する
    • 動的にルートする
  • エンドポイントもプラグインとして実装。

Flow graphのルーティング処理。なんでGraph Flow APIになったのか

  • リクエスト毎にパーティションを繋げる -> リクエストによってFlowの長さが異なる
  • 索引を比較
  • 問題:ヒープによるパフォーマンス劣化。10routeだと100Mだったのが100routeで1G
    • 修正:リクエストに対応するrouteが見つかるか否かの2つのみに修正した。
  • パーティションを小さくしたらroutingのCPU時間が減り、ヒープが減少。FanIn/Outは多量の処理(リクエスト)をさばく処理には向かない。
  • テストは、単純な負荷だけでなく、遅いエンドポイントへのリクエストを混ぜて、影響が波及しないかも確認した

Q&A

  • Q: 自社開発の理由は?OSSじゃだめだった?
    • A: システムの独自仕様に対応するため
  • Q: エンドポイントではなく、グラフの中のパーティションなどで重い処理が発生したら?
    • A: 事前にdispatcherを分けることで、重い処理をそっちにふることで対応
  • Q: グラフの中で例外が発生したときのハンドリングは?ドキュメント見るとエラーはグラフ内部で処理して、外に出さないようにすると書いているが
    • A: 開発当初はすぐSupervisorが例外を拾う設計になっていたが、最終的にはAbortFlowで拾えるものはそこで、拾えないものはSupervisorが拾ってAkka HTTP Serverから返している

Akka Streams による Kafka の Reactive 化

  • ScalaTimesというニュースレターの人
  • kafkaの基礎
    • producerとconsumerが存在して、分散メッセージログをやりとりするようなもの
    • 重要なのはproducerとconsumerの分離。お互いの数を知らなくて良くする。マイクロサービスの他のサービスを知らなくて良くする。「マイクロサービスでもっとも重要なのはその間にある」
  • akka stream: データ変化パイプライン記述DSL
    • バックプレッシャーと非同期処理にフォーカス
    • source(input stage) -> flow(特別なstage) -> sink(output stage)
    • alpakka: より多くの技術のコネクタライブラリを作る取り組み。
  • Kafkaがsourceの場合
    • バックプレッシャーを考慮する。残りのメッセージを処理できる速度
    • Sinkが別のKafka。このスループットがバックプレッシャー
    • 間にはFlowでの処理がある
  • Consumer.plainSourceでサブスクライブして処理に繋げる。
    • ↑をplain java apiを使った場合、書き方的にloopやpollを書かなければならず、reactivityというかメンテナンス性が下がる
    • plain java apiと速さの違いも少ない
    • commitable sourceでもplain javaだと非同期のコミットをするためにコールバックを作ったりするので大変
  • At-least-once Deliveryができる
    • akka batched comsumerはまだ遅いけど、徐々にplain javaに近くなっている
  • streaming streams: streamのstream
    • パーティションごとのバックプレッシャーがあり、パーティション毎に処理をする
    • Consumer.committablePartitionedSOurceを使って生成する
  • Apacke Kafka製のKafka Streamsという別のものもある。Java。

Scala & Sparkによるデータ・エンジニアリング 7大レシピ

1: Organization(ヨーロッパの場合)

  • 問題を適切にツールで解決できれば良いが、新しいイノベーション(=ツール、技術)を評価するのが難しい
  • データへのアクセスを活性化するため、データエンジニアリングではさらにその傾向が強い。
  • 他のチームとデータの授受が発生する。マーケ、BI、製品チームなど。データをめぐる対立が発生する。データへのアクセスが活性化するとこの傾向が強くなる
  • 大きな組織になるほどこうなる。技術よりも組織の問題が大きい。
  • 先端技術を使って問題に取り組みつつ、ビジョンを共有しない他チームに依存して仕事しなければならない。データエンジニアはバーンアウトする。
  • 解決のコツ: 組織あたりHadoopクラスタは1つにする。ビジョンを明確に文書化する。チーム単体ではなく、チーム間で何を達成できるのかが大事。

2: 業務の最適化

  • データエンジニアの生産性は不安定。
  • リードタイム、インパクト、失敗の管理のうち、最後が重要。データエンジニアリングではアプリなど他チームよりも失敗しにくい。

  • 先を見越す。待ち状態が多くなりがち。

    • 1: 先手を打ってタスクのシミュレーションをする。起こりうる全てを挙げて見積り(経験、副産物)からリードタイムやコストを見積もり、実施するものや順番を選ぶ。これをしなければ失敗が起こるまで待つことになる。
    • 2: 失敗を先読みする。どこで失敗しそうかを考えて、予想頻度や予防策や復旧方法を先読みして考えておく。失敗による精神的な負担を減らせる。
  • これらを定期的に行って議論して、最適化する。

3: Staging Data: データの管理方法

  • まずは動いているデータを凍結する。データの動きを把握すると容易。
  • KafkaやHDFSでデータを長期間ステージングする。一時的なものではなく、長期のステージングを目指す。データエンジニアリングがシンプルになる。
  • そして永続的なステージングとして見えるようにすべき。Scalaのイミュータブルと同じように扱いたい。
  • 例えば、UUIDを使ってユニークディレクトリに書き込むことで、Persistenceを高くできる(上書き耐性?)

4: RDDとDataframe

  • 違いは、MapReduceとHiveの違いと一緒。
  • Dataframeのパフォーマンスは高い。
  • RDDはロバスト。
  • RDDは大型ETLに向いている。読み出しと書き込みが同じデータ量。ユニットテストを書きやすい。RDDはDataframe APIを使って読み書きできる。
  • Dataframeはデータ検索やSQLと組み合わせた軽量ジョブ、動的ジョブに向く。UDFがある。中間テーブルが大量にできるので、インメモリで性能を発揮する。

5: [最も重要]全てCogroupする

  • I/Oモナドに似ている。Cogroupによってカーディナリティの問題を解決する。
  • cogroupとgroupgyは任意のキーに対して単一のレコードを返す。
  • 効率よくデータアグリゲーション作業ができる。
  • 複数のデータフローをcogroupで1つのものとして扱い次のオペレーションにつなげる?
  • ビジネスロジックを分離して、分散されたオペレーションを分離することで、フィードバックループを素早く回せる。
  • 最小化されたケースをcogroupで見つけてテストを書けるのでデバッグしやすい。

6: インラインデータ品質

  • データそのものから発生するバグが多い。本番でのデバッグが必要になってしまう。
  • 前もってデータクオリティを上げるためのジョブ統合が大事。
  • Result: データクオリティのエレメント。値かEmptyの2値。
  • Annotation: 柔軟性がある。エラーメッセージも入る。
  • アノテーションは次元毎にアグリゲーションできる。
  • see also: ahoy-jon/autoBuildt

7: 業務で使うプログラムの設計

  • ステートをディスクから取ってくるのは良くない。別のところからstateを取るのが良い。

ここで時間切れでした。

レコメンデーションで Deep Learning を使う

執筆時点でスライドを見つけられていません

  • 従来の機械学習による実装方法がある。ディープラーニングと比較できるように進める。
    • LSTM: 時系列やパターンデータが出力できるNN
    • バッチノーム: 中間出力を正規化する仕組み。学習しやすくなる
    • ソフトマックス:確率分布を出す関数
  • 様々な種類のライブラリがある
    • DL4J。ディープラーニングフレームワーク。文字列キーではなくメソッドが多いのでIDEが良い
    • TF: 数学が分かる人に向く。自由度が高くサンプルが多い
    • Keras: DL4Jと同じくらいの高レベルAPIで使いやすい。
    • MXNet: ほとんどC++なのでAPIサポートが多い。ラッパー使ってる感があってイマイチ。IntelliJのインテリセンスが効かない
    • Chainer: 日本製
  • 今回はUCI Machine Leraning RepositoryのOnlineRetailデータを代用。
  • 文字や時刻を事前にベクトル化する必要がある。DL4JのDataVec
  • カラム毎にマッピングしてそれぞれのCSVに出力しておく。
  • レイヤ数的にシンプルなものからやるのがオススメ。目的を持って複雑にする。
  • 訓練の状況はGUIで見れる。GUIはPlay製
  • 訓練しながらちょっとずつレイヤを追加・変更する
  • 一番人気のものを見つけるのが得意。データエンジニアリングが必要。
  • レイヤが増えると学習時間が増える。
  • 訓練したモデルは保存する。読み込んで利用する。
    • スレッドセーフではない -> Akka RSでシングルスレッド処理
    • いじってドキュメント化しながらやると良い
  • sbt new wmeddile/dk4j-scala.g8
    • gitterでサポートしている。jp部屋ある

Q&Q

  • Q: 新しい製品が出たら?
    • A: トレーニングし直し。定期的にトレーニングプロセスを自動化して行なう。最後のレイヤを消して真っ白なレイヤを追加してトレーニングするってのもできる(がDL4Jで今はできないけど近いうちにできるようになる予定。Transfer Learning)
  • Q: 学習時の安定性の判断は自動化できる?
    • A: 正しいデータのバリデーションセットを用意する。望ましい結果と共に入力して正しければ本番利用する。

2日目: アンカンファレンス

参加したのがオフレコのセッションなので記載しません。

感想

クオリティが非常に高いイベントでした。満足しています。
一般枠の参加費1万5千円は安いと思います。もちろんスポンサー企業の支援があってこそのクオリティだと思います。
1/3くらいは英語のセッションでしたが、同時通訳が聴きやすかったことに驚きました。無い方がマシな同時通訳も知っている身からすると、非常にありがたいサービスでした。なお、2日目のアンカンファレンスでも同時通訳が提供されました。

国際感のあるイベントらしく、行動規範が策定され、イベント運営者によって執行されていました。
re:Inventのような企業主催のカンファレンスとは違い、コミュニティー主催のイベントを国際化するための良い試みだと思います。
一参加者としては、負担を一切感じませんでした。

発表はそれぞれ「初心者向け」「中級者向け」「上級者向け」とレベル分けされていますが、他のイベントと比べて、難度が一段階高いと感じました。
初心者向けセッションも興味を惹く内容でそこそこ難しく、中級者向けだとかなり難しかったと思います。上級者向けは「絶対についていけない」と思ったので参加していません。
私はScalaやAkkaを使ったことは無いのですが、他の言語やインフラを触った知識でなんとかついていけたと思います。

非常に勉強になるイベントでした。来年も参加したいですし、機会があればScalaやAkkaでシステムを作ってみたいです。