みんなアイカツ!についてどう思ってる?理解したいので、ツイートを形態素解析するサーバレスな構築を作ってみた

2020.07.16

データアナリティクス事業本部@札幌の佐藤です。

先日『アイカツオンパレード!ドリームストーリー』が完結しましたが、皆さんご覧になられたでしょうか。
音城ノエルの頑張りを見て、泣いてしまう姉の音城セイラ……『アイカツ!』をくらえって感じでしたね。最高でした。

まだ見ていない人は、YouTubeバンダイチャンネルで配信中ですので是非ご覧ください。
1時間半程度で全話視聴可能です!

そんな感動した『アイカツオンパレード!ドリームストーリー』、みんな見てどう思っていたか気になりませんか?
私は気になります。

ということで、Twitterのツイートを形態素解析してワードクラウドを作成、その結果をツイートする機能をサーバレスで構築したのでその話をしたいと思います。

なお、Twitterに対してのTwitterによる事前の同意がないスクレイピング行為は、規約上禁止されておりますのでご注意ください。

Twitterサービス利用規約

outputイメージ

outputとしてはこんな感じで、スターライド学園の校章柄のワードクラウトと、その日のツイートユーザの情報が投稿されるような形です。

構成

今回の構成としてはこのような形です。

Amazon S3にツイートデータをPUTすると各AWS Lambdaが順番に起動していくような仕組みです。

形態素解析をする際の前処理に使用しているワードや、ストップワードについてはAmazon DynamoDBを使用しています。
またTwitterにPostするときにAPIキーを使用しているため、その情報はAWS Secrets Managerから取得するようにしています。

AWS Lambdaが複数存在しているのは以下理由のためです。

  • 不具合や新機能を追加するときに機能単位でテストを行いたかったので、疎結合にしたかった。
  • AWS Lambda実行時のデプロパッケージのサイズ上限250MBを超過する。

AWS Lambda の制限

そもそもなぜこのような構築を作ろうと思ったのか

愛です!

「アイカツ!」シリーズが好きな人も、「アイカツ!」シリーズにかかわる発表や、アニメの放送日などで周囲がどのような感想を抱いているのか気になるのではと思ったのがきっかけです。
私が気になったというのもあります。

モチベーションは完全に上の内容に完結しますが、技術的な側面ではサーバレスな構築の練習を目的としています。
AWS Lambdaにハードコードすることも可能でしたが拡張性や、触れたことがなかったAWSの機能を触ってみるという意図もあります。

どうせやるなら自分が一番好きな題材を使いたいし、『アイカツ!』に関連したデータ収集をしたかったからでもあります。

作業時間は1日2時間程度、夜寝る前に少しずつ構築していました。

面白い結果だなと思ったのは、『アイドルマスター ミリオンライブ!』のテレビアニメ化が発表された日の結果です。
脚本が『アイカツ!』をご担当された加藤陽一さんで、実質『アイカツ!』じゃない?だったときのワードクラウドが以下です。

信頼や期待などポジティブな感想をみんな思っているんだな、というのを強く感じることができて実装してよかったなと思いました。
(断崖絶壁登る曲があるので、実質「ミリシタ」も『アイカツ!』)

各実装について

実装についてはすべてPython3.6で実装しています。

形態素解析部

MeCabは他のライブラリのように pip install [ライブラリ] -t の後ZIPで固めてLambda Layersへ登録、のようなことができないため、事前にEC2でサーバーを作成し、その中でMeCabの環境をビルドしたものをZIP化する必要があります。

以下のサイトを参考にさせていただき環境を構築しました。

AWSのlambda上でMeCabを実行する (他のバイナリへも応用可)!!

Dockerで環境が作成できるようですが、私のプライベートPCではDocker環境の構築が難しい(Win10 HomeではHyper-Vが通常手順でインストールできない)ので、遠回りしてEC2でサーバーを立てて入れることにしました。

辞書はIPA辞書とユーザ辞書のみです。

拡張辞書のneologdはフルで入れるとサイズ上限250MBを超過してしまうという点と、おそらくユーザ辞書を充実したほうが理想とするワードが出るかなということで今回は見送ることにしました。

ユーザ辞書は、以下のような「アイカツ!」シリーズ関連のワードを登録しています。

  • キャラクター名や「スターライト学園」などの舞台、それぞれのアイドルが着るドレスのブランド名、「芸能人はカードが命」などの有名なセリフなど劇中で登場するワード
  • 168曲分の曲名
  • コーデ名(カードテキスト)や「フィーバータイム」など「DCDアイカツ!」~「DCDアイカツオンパレード!」での用語やコーデ名
  • 「せなあか」のようなカップリングや「なまくら」などファン間での呼称
  • 声優や歌唱担当などのスタッフ・キャスト情報
  • その他、「アイカツ!」シリーズとセットで話されている「電音部」や「バトスピ」などのワード

実装よりもこのユーザ辞書の作成のほうがが大変でした。

実装

Amazon S3に対してファイルがPUT(POST)されたら起動します。

ユーザ辞書が更新されるたびにLambda Layersのバージョンが上がっていく状態なので、今後の改善要素です。

import MeCab
import boto3
import datetime as dt
import urllib.parse
import unicodedata
import re

def lambda_handler(event, context):
    AWS_S3_BUCKET_NAME = get_ssm_parameter_store('AIKATSU_TWEET_MORP_OUTPUT_BUCKET')
    output_datetime = dt.datetime.now().strftime('%Y%m%d')
    s3 = boto3.resource('s3')

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    bucket = s3.Bucket(bucket)

    # ローカルにファイルをダウンロードする
    file_path = '/tmp/' + key.split('/')[1]
    bucket.download_file(key, file_path) 

    # ツイートファイルを取得し、結合する
    with open(file_path, 'r') as f:
        lines = f.readlines()
    all_text = ' '.join([i.split()[2] for i in set(lines)])
    # 前処理としてbot系の不要な文章を除外する
    table = boto3.resource('dynamodb').Table('aikatsu_wordcloud_stopwords')
    tables = table.scan()
    preprocessing_words = '|'.join(set(tables['Items'][0]['preprocessing_words']))
     all_text = re.sub(preprocessing_words, '', all_text)
    # 全角半角の統一化
    all_text = unicodedata.normalize("NFKC", all_text)

    # MeCabの準備
    tagger = MeCab.Tagger()
    tagger.parse('')

    node = tagger.parseToNode(all_text)

    # 名詞を取り出す
    word_list = []
    while node:
        word_type = node.feature.split(',')[0]
        #print(node.feature)
        if word_type == '名詞':
            # 草は事前に取り除く
            if not node.surface.startswith('w'):
                word_list.append(node.surface)
        node = node.next

    # S3にアップロード
    file_key = '{0}/{0}_{1}.tsv'.format(output_datetime, 'mecab')  
    r = s3.Object(AWS_S3_BUCKET_NAME, file_key).put(Body = ' '.join(word_list))

    return True

# SSMのパラメータストアの取得
def get_ssm_parameter_store(param, WithDecryption=True):

    ssm_client = boto3.client('ssm')
    res = ssm_client.get_parameters(Names=[param], WithDecryption=WithDecryption)
    return res['Parameters'][0]['Value']

ここでTwitterのツイート内容を前処理として、全角半角の変換と、bot系の定型文を除外する対応を行っています。
bot系の定型文はAmazon DynamoDBにワードをListで保持しておき、正規表現で一気に書き換えるようにしています。

あまりハードコードはやりたくなかったのですが、後続のストップワードではインターネットスラングの草(w)を取り除ききる難易度が高かったため、特例としてここで取り除くようにしています。

ワードクラウド作成部

実装

サフィックスにtsvがついたものがPUT(POST)されたら起動します。

ワードクラウドを作成するにあたり、wordcloudのライブラリをLambda Layersに追加しています。

WordCloud for Python documentation

import boto3
import numpy as np
from wordcloud import WordCloud, STOPWORDS
from PIL import Image

def lambda_handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']   
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket)

    # WordCloudに使用する画像を取得
    img_file_path = '/tmp/aikatsu.png'
    bucket.download_file('aikatsu.png', img_file_path) 
    # フォントを取得
    font_file_path = '/tmp/TakaoPGothic.ttf'
    bucket.download_file('TakaoPGothic.ttf', font_file_path)

    # 形態素解析したファイルを取得
    file_path = '/tmp/' + key.split('/')[1]
    bucket.download_file(key, file_path) 
    # ファイルを結合する
    with open(file_path, 'r') as f:
        lines = f.readlines()
    all_text = ' '.join(lines)

    # ストップワードの取得
    table = boto3.resource('dynamodb').Table('aikatsu_wordcloud_stopwords')
    tables = table.scan()

    # デフォルトで2文字以上しか出ないのだが、regexpを設定して1文字も出力するようにしている
    wc = WordCloud(background_color="white", 
                   width=900, 
                   height=500, 
                   mask=np.array(Image.open(img_file_path)), 
                   stopwords=set(tables['Items'][1]['words']), 
                   font_path=font_file_path, 
                   collocations = False,
                   regexp="[\w']+").generate(all_text)
    wc.to_file("/tmp/wordcloud_tmp.jpg")

    bucket.upload_file("/tmp/wordcloud_tmp.jpg", 'aikatsu_tweet_wordcloud.jpg')

    # 処理したファイルは不要なので削除
    boto3.client('s3').delete_object(Bucket=bucket.name, Key=key)

    return True

ワードクラウドはAmazon S3上にスターライト学園の校章をクレーアウトした画像とフォントを事前に格納しています。
ストップワードはAmazon DynamoDBでListで保持しています。

最後にワードクラウドのインプット情報として使用していた形態素解析処理のtsvファイルを削除しています。
蓄積しておいても単体では意味がないのと、上手く処理できない場合、Twitterのツイートデータが存在していればそこからやり直せばよいという考え方です。

ツイートのPOST部

実装

サフィックスにjpgがついたものがPUT(POST)されたら起動します。

Twitterへの接続を行うため、requests-oauthlibをLambda Layersに追加しています。

https://pypi.org/project/requests-oauthlib/

import ast
import json
import boto3
import base64
from botocore.exceptions import ClientError
from requests_oauthlib import OAuth1Session


def get_apikey():
    session = boto3.session.Session()
    client = session.client(
        service_name = 'secretsmanager',
        region_name = 'ap-northeast-1'
    )

    try:
        get_secret_value_response = client.get_secret_value(SecretId='AIKATSU_TWEET')
    except ClientError as e:
        raise e
    else:
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
        else:
            secret = base64.b64decode(get_secret_value_response['SecretBinary'])
    return secret


def get_tweet_user_num(s3):
    AWS_S3_ANALYICS_BUCKET_NAME = get_ssm_parameter_store('AIKATSU_ANALYTICS_OUTPUT_BUCKET')
    bucket = s3.Bucket(AWS_S3_ANALYICS_BUCKET_NAME)
    output_file_name = 'aikatsu_tweet_num.tsv'
    file_path = '/tmp/' + output_file_name
    # ローカルにファイルをダウンロードする
    bucket.download_file(output_file_name, file_path) 

    with open(file_path, 'r') as f:
        lines = f.readlines()

    return str(lines[-1].split()[1])


def lambda_handler(event, context):
    # 文字列型から辞書型に変換
    secret = ast.literal_eval(get_apikey())
    # ツイートユーザ数を取得する
    s3 = boto3.resource('s3')
    tweet_user_num = get_tweet_user_num(s3)

    twitter = OAuth1Session(
        secret['API_KEY'],
        secret['API_SECRET_KEY'],
        secret['ACCESS_TOKEN'],
        secret['ACCESS_TOKEN_SECRET']
    )

    url_update = 'https://api.twitter.com/1.1/statuses/update.json'
    url_media = 'https://upload.twitter.com/1.1/media/upload.json'

    # 今後収集とツイートのタイミングをずらすかもしれないので直接指定
    AWS_S3_BUCKET_NAME = get_ssm_parameter_store('AIKATSU_TWEET_MORP_OUTPUT_BUCKET')
    file = 'aikatsu_tweet_wordcloud.jpg'

    bucket = s3.Bucket(AWS_S3_BUCKET_NAME)

    file_path = '/tmp/' + file
    bucket.download_file(file, file_path) 

    param = { "media" : open(file_path, "rb").read() }
    res = twitter.post(url_media, files = param)

    if res.status_code == 200 :
        requestJson = json.loads(res.text)
        mediaId = requestJson["media_id"]
        params = { "status" : "昨日はアイカツ!ついて{}人がツイートしていました。こんなワードがホットだったようです #aikatsu".format(tweet_user_num), "media_ids" : (mediaId) }
        res = twitter.post(url_update, params = params)
        if res.status_code == 200 :
            pass
    else:
        print('error:' + str(res.status_code))

    return True

# SSMのパラメータストアの取得
def get_ssm_parameter_store(param, WithDecryption=True):

    ssm_client = boto3.client('ssm')
    res = ssm_client.get_parameters(Names=[param], WithDecryption=WithDecryption)
    return res['Parameters'][0]['Value']

最後にTwitterへのPostですが、以前にAWS Secrets Managerに関して記載しているのでそちらを見ていただければと思います。

PythonでAWS Secrets ManagerからAPIキーを取得するときのちょっとしたポイント

最後に

実装としては正常終了すること前提でエラーが発生した場合の考慮がされていなかったり、課題も残っている状況となります。
ただDone is better than perfectという形で一旦ゴールとしては動くものができたというところでOKかなと思っています。

今回はAmazon S3にPUTされたことをトリガーとしていますが、AWS Step Functionsでワークフロー化したりまだ練習できることもあるので少しずつリファクタリングして改善していければよいかなと思います。