ちょっと話題の記事

心温まるSlackの投稿を抽出するためにサーバーレスなデータ分析基盤を構築しよう!!

俺の分報がこんなに治安が悪いわけがない!Comprehendで証明してみた
2020.02.24

CX事業本部@大阪の岩田です。

クラスメソッドでは社内標準のチャットツールとしてSlackを活用していますが、「分報」という形でSlackを活用しているメンバーも数多くいます。「分報」って何?という方は以下のリンクをご確認下さい。

人気の分報ともなると参加者が50人を超え、本人のいないところで好き勝手に雑談が繰り広げられていたりします。このレベルになると人気が高いのか、それとも単に治安が悪いだけなのかよく分からなくなってきます。

「俺の分報がこんなに治安が悪いわけがない!Comprehendで証明してみた」ブログはよ

?!!!

ということでSlackの投稿をComprehendの感情分析にかけつつ分析するための基盤をサーバーレスで構築してみました。

構成

こんな環境を作ります。

1日1回Lambdを起動し、指定されたチャンネルの1日分の投稿を収集し、各投稿に対してComprehendの感情分析結果をマージしてS3に保存。保存したデータをAthenaから分析します。

以前SlackのイベントでLTさせてもらった時に作った環境にComprehendの感情分析を追加した構成です。良ければ以下の資料もご覧ください。

環境構築手順

それでは早速環境を構築していきましょう。プロジェクトのディレクトリ構造は以下のような形になります。

.
├── layer
│   └── python
├── src
│   └── app.py
└── template.yaml

ソースコード

まずはLambdaのソースコードです。エラーハンドリングだったり大量データの考慮を無視した手抜き実装なので、業務利用を検討される場合は適宜修正をお願いします。Python3.7で実装しています。

from datetime import datetime, timedelta, timezone
import os
import logging
import json
import time

import boto3
from slack.web.client import WebClient
from slack.errors import SlackApiError


SLACK_CHANNEL_ID = os.environ['SLACK_CHANNEL_ID']
S3_BUCKET = os.environ['S3_BUCKET']
SLACK_TOKEN = os.environ['SLACK_TOKEN']
COMPREHEND_BATCH_SIZE = 25

s3 = boto3.resource('s3')
comprehend = boto3.client('comprehend')
logger = logging.getLogger()

def handler(event, context):
    try:
        print('start lambda')
        collect_messages()
        print('finish lambda')
    except Exception as e:
        logger.error(e)
        raise

def collect_messages(interval_days=1):

        JST = timezone(timedelta(hours=+9), 'JST')
        today = datetime.now(JST).replace(hour=0, minute=0, second=0, microsecond=0)
        target_date = today - timedelta(days=interval_days)
        unix_target_date_start = int(target_date.timestamp())
        unix_target_date_end = int((target_date + timedelta(days=1)).timestamp()) -1
        slack_clinet = WebClient(SLACK_TOKEN)
        latest = unix_target_date_end
        channels_histories = []

        while True:
            res = slack_clinet.channels_history(
                channel=SLACK_CHANNEL_ID, latest=latest,inclusive='false',
                oldest=unix_target_date_start)
            # 添付ファイルのみの投稿などをフィルタ ※空文字列をcomprehendに渡すとエラーになる
            messages = [msg for msg in res.data['messages'] if msg['text'] != '']

            # comprehendのバッチサイズに合わせて分割
            splited_messages = [messages[i:i + COMPREHEND_BATCH_SIZE] for i in range(0, len(messages), COMPREHEND_BATCH_SIZE)]
            for msgs in splited_messages:
                comprehend_res = comprehend.batch_detect_sentiment(
                    TextList=[msg['text'] for msg in msgs],
                    LanguageCode='ja'
                )
                # comprehendの分析結果をマージ
                sentiments = comprehend_res['ResultList']
                for m, c in zip(list(msgs), [{'Comprehend': sentiment } for sentiment in sentiments]):
                    channels_histories.append(json.dumps({
                        **m, **c
                    }, ensure_ascii=False))

            # channels.historyのレスポンスに続きがなくなるまでループ
            if res.data['has_more'] == False:
                break
            print('channels.history has more history continue collect messages...')
            latest = int(float(res.data['messages'][-1]['ts']))

        if len(channels_histories) == 0:
            print('no messages... skip upload file')
            return

        print('start upload s3')

        obj_key =  target_date.strftime('messages/year=%Y/month=%m/day=%d') + '/slack_messages.json'
        obj = s3.Object(S3_BUCKET, obj_key)
        body = '\n'.join(channels_histories)
        obj.put(Body=body.encode('utf-8'))

SAMテンプレート

続いてSAMテンプレートです。利用するS3バケットやGlueのクローラ等などを作成します。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  Collect and analyze Slack messages
Globals:
  Function:
    Timeout: 300
Parameters:
  SlackChannelId:
    Type: String
  SlackToken:
    NoEcho: true
    Type: String
Resources:
  ScrapeMessages:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: app.handler
      Runtime: python3.7
      MemorySize: 512
      Role: !GetAtt LambdaExecuteRole.Arn
      Environment:
        Variables:
          SLACK_CHANNEL_ID: !Ref SlackChannelId
          SLACK_TOKEN: !Ref SlackToken
          S3_BUCKET: !Ref S3Bucket
      Events:
        Cron:
          Type: Schedule
          Properties:
            Schedule: cron(0 16 * * ? *)
            Description: Example schedule
            Enabled: True
      Layers:
        - !Ref SlackLibLayer
  SlackLibLayer:
      Type: AWS::Serverless::LayerVersion
      Properties:
          Description: slack libraly layer
          ContentUri: layer
  LambdaExecuteRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - 'lambda.amazonaws.com'
          Action: sts:AssumeRole
      Policies:
        -
          PolicyName: "allow_cloud_watch_logs"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              -
                Effect: "Allow"
                Action:
                  - "logs:CreateLogGroup"
                  - "logs:CreateLogStream"
                  - "logs:PutLogEvents"
                Resource: "*"
        -
          PolicyName: "allow_s3_access"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              -
                Effect: "Allow"
                Action:
                  - 's3:GetObject'
                  - 's3:PutObject'
                Resource:
                  - !Sub "arn:aws:s3:::${S3Bucket}/*"
        -
          PolicyName: "allow_comprehend"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              -
                Effect: "Allow"
                Action:
                  - 'comprehend:BatchDetectSentiment'
                Resource:
                  - "*"
  GlueCrawlerRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service:
            - 'glue.amazonaws.com'
          Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Policies:
        - PolicyName: "allow_cloud_watch_logs"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              -
                Effect: "Allow"
                Action:
                  - "logs:CreateLogGroup"
                  - "logs:CreateLogStream"
                  - "logs:PutLogEvents"
                Resource: "*"
  S3Bucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
  SlackDB:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Description: "Slack Messages DB"
  SlackCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Configuration: |
        {"Version":1.0,"CrawlerOutput":{"Partitions":{"AddOrUpdateBehavior":"InheritFromTable"},"Tables":{"AddOrUpdateBehavior":"MergeNewColumns"}}}
      DatabaseName: !Ref SlackDB
      Role: !Ref GlueCrawlerRole
      Schedule:
        ScheduleExpression: "cron(30 16 * * ? *)"
      SchemaChangePolicy:
        UpdateBehavior: "UPDATE_IN_DATABASE"
        DeleteBehavior: "DEPRECATE_IN_DATABASE"
      Targets:
        S3Targets:
          - Path: !Sub "s3://${S3Bucket}/messages"
  ScrapeMessagesLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${ScrapeMessages}
      RetentionInDays: 14

デプロイ

デプロイします。今回の構成ではSlackのライブラリをLayerにパッケージングして利用しているので、まずはLayerの準備をします。

pip install slackclient -t layer/python/

レイヤーの準備ができたのでパッケージングしてデプロイします。

$ sam package --s3-bucket <適当なS3バケット> --template-file template.yaml --output-template-file output.yml 
$ sam deploy --stack-name <スタック名> --template-file output.yml --capabilities CAPABILITY_IAM --parame
ter-overrides SlackToken=<Slackのトークン> SlackChannelId=<分析対象のSlackチャンネルID>

Lambda&クローラの実行

デプロイできたらLambdaを実行して投稿を収集した後Glueのクローラを実行してテーブルを作成しましょう。

$ aws lambda invoke --function-name <作成されたLambdaFunctionの名前> -
$ aws glue start-crawler --name <作成されたクローラの名前>

過去データの収集

ここまでで一応環境構築は終了です。あとは日次処理でどんどんデータが貯まっていきます。が、日次処理で収集するデータは前日分のデータだけです。初回はある程度過去まで遡ってデータを収集したいところです。ということで、ローカル環境から以下のコードを実行して過去データを収集します。

import os

os.environ['SLACK_TOKEN'] = <Slackのトークン>
os.environ['SLACK_CHANNEL_ID'] = <分析対象のSlackチャンネルID>
os.environ['S3_BUCKET'] = <作成されたS3バケット>

from app import collect_messages

for i in range(2,51):
  collect_messages(i)

過去データが収集できたらもう一度Glueのクローラを実行しておきましょう。※今回の分析対象にはオーバースペックなのですがパーティションを設定しているため。

Positiveな投稿TOP10を抽出

分析の準備ができたので、Athenaで分析していきます。とりあえずPositive判定された投稿TOP10でも抽出してみましょう。

SELECT
    comprehend.sentimentscore.positive,
    text
FROM
    messages
WHERE
    comprehend.sentiment = 'POSITIVE'
ORDER BY
    comprehend.sentimentscore.positive DESC
LIMIT 10

シンプルなSQLですね。実行してみます。

なんも問題ないです!!!むしろ最高です!!!!視線を落としたらさっきまでそこになかったうどん札が発生する体験最高でした!!!!!ありがとうございます!!!

一番ポジティブな投稿は丸亀製麺のうどん札に関する投稿でした。なんとスコア0.9996950626373291を叩き出しています。

Positiveな投稿に対するリアクションとして使われる絵文字TOP5を分析

せっかくなのでもう1パターン分析してみましょう。Slackの特徴の1つとして、書き込みに対して絵文字を使ってリアクションするという機能が挙げられます。Positve判定された書き込みに対するリアクションとして利用頻度の高い絵文字を抽出してみましょう。

リアクションのデータ構造

投稿に対するリアクションの情報はchannels.historyAPIのレスポンスに以下の形式で含まれています。

...
"reactions": [
  {
    "name": "arigato",
    "users": [
      "xxx"
    ],
    "count": 1
  },
  {
    "name": "atodeyomu",
    "users": [
      "yyy"
    ],
    "count": 1
  },
  {
    "name": "mapicon-onsen",
    "users": [
      "zzz"
    ],
    "count": 1
  }
]
...

この情報をAthenaで分析してみます。

リアクションの配列を行に変換する

リアクションの情報は前述の通り配列になっています。そのため単純にSELECTすると配列として取得されます。例えば、このSQLを実行すると

SELECT
    reactions
FROM
    messages
WHERE
    client_msg_id = 'xxxxx'

結果は...

reactions
1 [{name=arigato, users=[UEN3NS8S1], count=1}, {name=atodeyomu, users=[UEN3NS8S1], count=1}, {name=mapicon-onsen, users=[UFE2V64CR], count=1}]

このように生データそのままです。SQLで集計するためにはデータの構造を配列から行形式に変換したいところです。こういったケースではunnestCORSS JOINを使うことで配列を行に展開することが可能です。以下のSQLを実行してみましょう。

SELECT
    react.react,
    react.react.name,
    react.react.count
FROM
    messages
CROSS JOIN UNNEST(reactions) AS react(react)
WHERE
    client_msg_id = 'xxxxx'

実行結果です。

react name count
1 {name=arigato, users=[UEN3NS8S1], count=1} arigato 1
2 {name=atodeyomu, users=[UEN3NS8S1], count=1} atodeyomu 1
3 {name=mapicon-onsen, users=[UFE2V64CR], count=1} mapicon-onsen 1

あとは普通にnameとcountで集計してやれば良さそうですね。というわけで以下のSQLを実行してみましょう。

WITH positive AS (
SELECT
    reactions
FROM
    messages
WHERE
    comprehend.sentiment = 'POSITIVE' )
SELECT
    react.name,
    SUM(react.count)
FROM
    positive
CROSS JOIN UNNEST(reactions) AS react(react)
GROUP BY
    react.name
ORDER BY
    SUM(react.count) DESC

さてさて結果は...

1位... kawaii

2位... omedeto

3位... aa

4位... e

5位... tensaideha

課題

さて、ここまでの作業で一見うまくデータ分析基盤が構築できたように見えます。しかしながら、この構成は1つ大きな課題を抱えています。Slackの特徴であるカスタム絵文字はあくまで :<絵文字の名前>: という文字列でしかないので、投稿内でカスタム絵文字を利用していた場合は適切な感情分析ができないのです。例えば、以下の投稿はいかにもポジティブな投稿に見えます。

が、Comprehendの感情分析にかけると...

$ aws comprehend batch-detect-sentiment --language-code ja --text-list :ieei1::ieei2::ieei2::ieei2::ieei3::ieei3::ieei2::ieei4:
{
    "ResultList": [
        {
            "Index": 0,
            "Sentiment": "NEUTRAL",
            "SentimentScore": {
                "Positive": 0.004755615256726742,
                "Negative": 0.00036574306432157755,
                "Neutral": 0.9948752522468567,
                "Mixed": 3.469406692602206e-06
            }
        }
    ],
    "ErrorList": []
}

NEUTRALと分類されます。自前のモデルを作れば解決できそうですが、そこまでパワーをかけなくても今回の目的は達成できそうなので、とりあえずこの課題については無視しようと思います。

今後の発展性について

AWS上にデータ分析基盤を構築する際はS3を中心に据えるのがベストプラクティスです。RedshiftやEMR、Amazon ES...データ分析に使えるサービスは多数ありますが、元データはあくまでS3に保存しておき必要に応じて別のサービスにロードする。あるいはRedshift Spectrumのような機能で別のサービスからS3上のデータを参照する構成を取ることで、色々と応用の幅が広がります。今回の例でも分析対象のデータは全てS3上に保存しており、保存先のS3バケットはデータアナリティクス事業本部のメンバーからクロスアカウントアクセスできるように設定済みです。

きっとこれから色々な分析にかけられ、分析結果が色々なツールで可視化されることでしょう。

まとめ

サーバーレスアーキテクチャを活用したデータ分析基盤構築のご紹介でした。まあ、Slackの分報を分析して喜ぶ人はごく一部だと思いますが、今回紹介した手法を少し応用すれば色々とビジネスにも活用できそうな気がします。例えばTwitterから自社製品に関するツイートを取得、Comprehendで感情分析にかけてNEGATIVEなツイートを自動抽出。NEGATIVEなツイートは人間がチェックして製品改善のヒントを得る。とかとか。

需要がありそうなら少しリファクタリングしたものをSAR(Serverless Application Repository)で公開することも考えたいなーと思います。