GCPのサービスを活用してSlackのやりとりを分析してみよう

GCPのサービスを活用したサーバーレスなデータ分析基盤を構築してみました
2021.08.26

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

CX事業本部@大阪の岩田です。以前「AWSのサービスを活用してSlackのやりとりを分析してみよう」というテーマでLTさせて頂いたことがあるのですが、GCPでも似たようなことができないか挑戦してみました。

構成

今回構築する環境です。

Cloud Scheduler → Cloud Pub/Sub → Cloud Functionsという流れでSlackの会話履歴をスクレイピングするプログラムを起動し、指定されたチャンネルの前日分の会話履歴を収集。収集したデータをJSON Lines形式でCloud Storageに保存し、Big Queryの外部テーブル経由で分析するという構成です。

環境構築

さっそく順を追って環境を構築していきましょう

Secret ManagerにSlackのトークンを保存

Cloud FunctionsからSlackのAPIを呼び出すためにSlackのトークンが必要になります。トークンをセキュアに管理するためSecret Managerのシークレットを作成し、Slackのトークンを保存しましょう。

まずはCloud SDKを利用してSLACK_TOKENという名前でシークレットを作成します。

gcloud secrets create SLACK_TOKEN

続いてシークレットバージョンを作成します。Slackから発行したトークンを事前にslack_token.txtというテキストファイルに保存していることが前提になります。※Slackのトークン発行手順は割愛します。

gcloud secrets versions add  SLACK_TOKEN --data-file=slack_token.txt

Cloud Functionsからシークレットが読み込めるようにCloud Functionsが利用するデフォルトのサービスアカウントにsecretAccessorのロールをバインドします

gcloud secrets add-iam-policy-binding SLACK_TOKEN --member=serviceAccount:<プロジェクトID>@appspot.gserviceaccount.com  --role=roles/secretmanager.secretAccessor

Cloud Functionsのソースコードを準備

続いて定期実行するCloud Functionsのコードを実装します。Python3.9で実装しています。 ※異常系の考慮は実装していません。もしコードを流用される際は適宜エラーチェック等追加で実装して下さい

from datetime import datetime, timezone, timedelta
import os
import json

from google.cloud import secretmanager, storage
from slack_sdk import WebClient

SLACK_CHANNEL_ID = os.environ['SLACK_CHANNEL_ID']
BUCKET_NAME = os.environ['BUCKET_NAME']
PROJECT_ID = os.environ['PROJECT_ID']
SECRET_VERSION = os.environ['SECRET_VERSION']

sm_client = secretmanager.SecretManagerServiceClient()
secret_name = f"projects/{PROJECT_ID}/secrets/SLACK_TOKEN/versions/{SECRET_VERSION}"
secret_res = sm_client.access_secret_version(request={"name": secret_name})
slack_token = secret_res.payload.data.decode("UTF-8")

def handler(event, conttext):


    JST = timezone(timedelta(hours=+9), 'JST')
    today = datetime.now(JST).replace(hour=0, minute=0, second=0, microsecond=0)
    yesterday = today - timedelta(days=1)
    unix_yesterday_start = int(yesterday.timestamp())
    unix_today_start = int(today.timestamp())
    slack_client = WebClient(slack_token)
    latest = unix_today_start
    slack_messages = [] 

    while True:
        res = slack_client.conversations_history(
            channel=SLACK_CHANNEL_ID, latest=latest, inclusive='false',
            oldest=unix_yesterday_start)
        slack_messages.extend(
            [msg for msg in res.data['messages'] if msg['text'] != '']
        )

        if res.data['has_more'] == False:
            break

        print('slack channel has more history continue scrape...')
        latest = int(float(res.data['messages'][-1]['ts']))

    if len(slack_messages) == 0:
        print('no messages... skip upload file')
        return

    upload_file(yesterday, slack_messages)


def upload_file(yesterday, slack_messages):
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(BUCKET_NAME)
    prefix = yesterday.strftime('slack_files/year=%Y/month=%m/day=%d/')
    blob = bucket.blob(f'{prefix}slack.json')
    blob.upload_from_string(
        '\n'.join([json.dumps(msg) for msg in slack_messages])
    )

ざっくり以下のような処理を実行しています

  • Secret ManagerからSlackのトークンを取得
  • Slackの conversations.history をAPIを呼び出し、Slackの会話履歴を収集
    • conversations.history だけだとスレッド内の会話履歴までは取得できないようなので、その点はご注意下さい。
  • 収集した会話履歴をslack_files/year=YYYY/month=mm/day=dd/slack.jsonというオブジェクト名でCloud Storageにアップロード

Cloud Functionsと関連するリソースのデプロイ

ソースコードが準備できたので、Cloud Functionsと関連するリソースをDeployment Mangerでまとめてデプロイします。ディレクトリの構成は以下のような構成です。

.
├── src
│   ├── main.py
│   └── requirements.txt
└── templates
    ├── cloud-functions.py
    ├── resources.yml
    ├── schedule.py
    └── storage.py

srcディレクトリ配下は ZIPにまとめてCloud Functionsのパッケージとしてデプロイします。main.pyは先程のPythonのコードでrequirements.txtはCloud Functionsの実行環境にライブラリをインストールするために利用します。requirements.txtの中身は以下の通りです

google-cloud-storage
google-cloud-secret-manager
slack_sdk

templatesディレクトリ配下はDeployment Managerの構成ファイルとテンプレートファイルです。

まずメインの構成ファイルは以下の通り

imports:
  - path: ../src/main.py
  - path: ../src/requirements.txt
  - path: cloud-functions.py
  - path: storage.py
  - path: schedule.py

resources:
  - type: cloud-functions.py
    name: slack-scrape
    properties:
      function: slack-scrape
      codeLocation: ../src/
      codeBucket: <ソースコードのデプロイに利用できる適当なバケット名>
      location: asia-northeast1
      timeout: 60s
      runtime: python39
      memory: 256
      entryPoint: handler
      environmentVariables:
        SLACK_CHANNEL_ID: <会話履歴収集対象とするSlackのチャンネルID>
        BUCKET_NAME: $(ref.storage-bucket.name)
        SECRET_VERSION: <Slackのトークンを保存したSecrets Managerのシークレットバージョン>
      topicName: $(ref.pubsubtopic.name) 
    metadata:
      dependsOn:
        - pubsubtopic
  - type: pubsub.v1.topic
    name: pubsubtopic
    properties:
      topic: slack-scrape
  - type: storage.py
    name: storage-bucket
    properties:
      projection: full
      location: asia-northeast1
      locationType: region
      storageClass: STANDARD
      iamConfiguration:
        bucketPolicyOnly:
          enabled: true
  - type: schedule.py
    name: slack-scrape-schedule
    properties:
      schedule: "0 1 * * *"
      timeZone: "Asia/Tokyo"
      topicName: $(ref.pubsubtopic.name)
    metadata:
      dependsOn:
        - pubsubtopic

Cloud Functions用のテンプレートファイルです。以下ブログの内容を少し加工しています。

import zipfile
import io
import base64
import hashlib


def generate_config(context):
    in_memory_output_file = io.BytesIO()

    with zipfile.ZipFile(in_memory_output_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file:
        for imp in context.imports:
            if imp.startswith(context.properties['codeLocation']):
                zip_file.writestr(
                    imp[len(context.properties['codeLocation']):], context.imports[imp])

    content = base64.b64encode(in_memory_output_file.getvalue())

    m = hashlib.md5()
    m.update(content)

    source_archive_url = 'gs://%s/%s' % (
        context.properties['codeBucket'], m.hexdigest() + '.zip')

    cmd = f"echo '{content.decode('ascii')}' | base64 -d > /function/function.zip;"


    volumes = [{'name': 'function-code', 'path': '/function'}]

    upload_function_code = {
        'type': 'gcp-types/cloudbuild-v1:cloudbuild.projects.builds.create',
        'name': context.properties['function'] + '-upload-function-code',
        'properties': {
            'steps': [{
                'name': 'ubuntu',
                'args': ['bash', '-c', cmd],
                'volumes': volumes,
            }, {
                'name': 'gcr.io/cloud-builders/gsutil',
                'args': ['cp', '/function/function.zip', source_archive_url],
                'volumes': volumes
            }],
            'timeout': '120s'
        }
    }


    env_vars = {
        'PROJECT_ID': context.env['project']
    }    
    env_vars.update(context.properties['environmentVariables'])

    cloud_function = {
        'type': 'gcp-types/cloudfunctions-v1:projects.locations.functions',
        'name': context.properties['function'] + '-cloud-function',
        'properties': {
            'parent': f"projects/{context.env['project']}/locations/{context.properties['location']}",
            'function': context.properties['function'],
            'sourceArchiveUrl': source_archive_url,
            'entryPoint': context.properties['entryPoint'],
            'eventTrigger': {
                'resource': context.properties['topicName'],
                'eventType': 'providers/cloud.pubsub/eventTypes/topic.publish'
            },
            'timeout': context.properties['timeout'],
            'availableMemoryMb': context.properties['memory'],
            'runtime': context.properties['runtime'],
            'environmentVariables': env_vars
        },
        'metadata': {
            'dependsOn': [
                context.properties['function'] + '-upload-function-code'
            ]
        }
    }

    return {
        'resources': [upload_function_code, cloud_function]
    }

続いてCloud Storage用のテンプレートファイルです。GitHubのリポジトリで公開されている以下テンプレートファイルのバケット名の命名規約だけ少し加工したものです。

https://github.com/GoogleCloudPlatform/cloud-foundation-toolkit/blob/93d1b9625e0e225824838e0eb09653a6d7944cc6/dm/templates/gcs_bucket/gcs_bucket.py

# Copyright 2018 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" This template creates a Google Cloud Storage bucket. """

from hashlib import sha1


def generate_config(context):
    """ Entry point for the deployment resources. """

    resources = []
    properties = context.properties
    project_id = properties.get('project', context.env['project'])
    bucket_name = properties.get('name',
        '{}-{}-{}'.format(
            context.env['deployment'],
            context.env['name'],
            context.env['project_number'])
        )

    # output variables
    bucket_selflink = '$(ref.{}.selfLink)'.format(context.env['name'])
    bucket_uri = 'gs://' + bucket_name + '/'

    bucket = {
        'name': context.env['name'],
        # https://cloud.google.com/storage/docs/json_api/v1/buckets
        'type': 'gcp-types/storage-v1:buckets',
        'properties': {
            'project': project_id,
            'name': bucket_name
        }
    }

    requesterPays = context.properties.get('requesterPays')
    if requesterPays is not None:
      bucket['properties']['billing'] = {'requesterPays': requesterPays}

    optional_props = [
        'acl',
        'iamConfiguration',
        'retentionPolicy',
        'encryption',
        'defaultEventBasedHold',
        'cors',
        'defaultObjectAcl',
        'billing',
        'location',
        'versioning',
        'storageClass',
        'predefinedAcl',
        'predefinedDefaultObjectAcl',
        'logging',
        'lifecycle',
        'labels',
        'website'
    ]

    for prop in optional_props:
        if prop in properties:
            bucket['properties'][prop] = properties[prop]

    if not properties.get('iamConfiguration', {}).get('bucketPolicyOnly', {}).get('enabled', False):
        if 'predefinedAcl' not in bucket['properties']:
            bucket['properties']['predefinedAcl'] = 'private'
        if 'predefinedDefaultObjectAcl' not in bucket['properties']:
            bucket['properties']['predefinedDefaultObjectAcl'] = 'private'

    resources.append(bucket)

    # If IAM policy bindings are defined, apply these bindings.
    storage_provider_type = 'gcp-types/storage-v1:virtual.buckets.iamMemberBinding'
    bindings = properties.get('bindings', [])

    if 'dependsOn' in properties:
        dependson = { 'metadata': { 'dependsOn': properties['dependsOn'] } }
        dependson_root = properties['dependsOn']
    else:
        dependson = {}
        dependson_root = []

    if bindings:
        for role in bindings:
            for member in role['members']:
                suffix = sha1('{}-{}'.format(role['role'], member).encode('utf-8')).hexdigest()[:10]
                policy_get_name = '{}-{}'.format(context.env['name'], suffix)
                policy_name = '{}-iampolicy'.format(policy_get_name)
                iam_policy_resource = {
                    'name': policy_name,
                    # TODO - Virtual type documentation needed
                    'type': (storage_provider_type),
                    'properties':
                        {
                            'bucket': '$(ref.{}.name)'.format(context.env['name']),
                            'role': role['role'],
                            'member': member,
                        }
                }
                iam_policy_resource.update(dependson)
                resources.append(iam_policy_resource)
                dependson = { 'metadata': { 'dependsOn': [policy_name] + dependson_root } }

    if properties.get('billing', {}).get('requesterPays'):
        for resource in resources:
            resource['properties']['userProject'] = properties.get('userProject', context.env['project'])

    return {
        'resources':
            resources,
        'outputs':
            [
                {
                    'name': 'name',
                    'value': bucket_name
                },
                {
                    'name': 'selfLink',
                    'value': bucket_selflink
                },
                {
                    'name': 'url',
                    'value': bucket_uri
                }
            ]
    }

最後にCloud Scheduler用のテンプレートファイルです

def generate_config(context):

    resources = []
    properties = context.properties
    project_id = context.env['project']

    region = properties.get('region', 'asia-northeast1')
    topic_name = properties.get('topicName')
    description = properties.get('description', '')
    name = properties.get('name', context.env['name'])

    pubsub_target = {
        'topicName': topic_name,
        'data': 'e30='
    }
    time_zone = 'Asia/Tokyo'
    schedule =  '0 1 * * *'

    scheduler = {
        'name': context.env['name'],
        'type': 'gcp-types/cloudscheduler-v1:projects.locations.jobs',
        'properties': {
            'parent': f'projects/{project_id}/locations/{region}',
            'name': f'{name}',
            'description': description,
            'schedule': schedule,
            'timeZone': time_zone,
            'pubsubTarget': pubsub_target
        }
    }

    if 'dependsOn' in properties:
        dependson = { 'metadata': { 'dependsOn': properties['dependsOn'] } }
        scheduler.update(dependson)

    return {
        'resources': [scheduler]
    }

テンプレート類の準備ができたので、Deployment Managerを利用してデプロイします

gcloud deployment-manager deployments create slack-analytics --config resources.yml

こんな感じでcompleted successfully.と表示されればOKです

The fingerprint of the deployment is b'dify4zw0LApNDb_G6ffBuA=='
Waiting for create [operation-1629887189548-5ca5faef4ff7d-6ade04b9-0aa08d36]...done.
Create operation operation-1629887189548-5ca5faef4ff7d-6ade04b9-0aa08d36 completed successfully.
NAME                               TYPE                                                       STATE      ERRORS  INTENT
pubsubtopic                        pubsub.v1.topic                                            COMPLETED  []
slack-scrape-cloud-function        gcp-types/cloudfunctions-v1:projects.locations.functions   COMPLETED  []
slack-scrape-schedule              gcp-types/cloudscheduler-v1:projects.locations.jobs        COMPLETED  []
slack-scrape-upload-function-code  gcp-types/cloudbuild-v1:cloudbuild.projects.builds.create  COMPLETED  []
storage-bucket                     gcp-types/storage-v1:buckets                               COMPLETED  []

ジョブの手動実行

このあとBigQueryの外部テーブルを作成するのですが、Cloud Storageバケットにファイルが存在しない状態だと外部テーブルの作成が失敗してしまいます。Cloud Schedulerのジョブを手動実行して前日分の会話履歴を収集してCloud Storageバケットにアップロードするしておきます。

gcloud scheduler jobs run slack-scrape-schedule

ジョブを実行したらCloud Storageバケットの中身を確認しておきましょう

gsutil ls gs://<デプロイされたCloud Storageバケット>/
gs://<デプロイされたCloud Storageバケット>/slack_files/

BigQuery関連リソースのデプロイ

続いてBiqQuery関連のリソースもDeployment Managerでまとめてデプロイします。先程のtemplatesディレクトリにbigquery.pyという名前でテンプレートファイルを作成します。

def generate_config(context):

    resources = []
    properties = context.properties
    bucket_name = properties['bucketName']
    project_id = context.env['project']

    storage_prefix = f'gs://{bucket_name}/slack_files'
    dataset_id = 'slack_analytics_ds'
    dataset_name = 'bq-dataset'

    dataset = {
        'name': dataset_name,
        'type': 'bigquery.v2.dataset',
        'properties': {
            'datasetReference': {
                'datasetId': dataset_id
            },
            'location': 'asia-northeast1'
        }
    }

    table = {
        'name': 'bq-table',
        'type': 'bigquery.v2.table',
        'properties': {
            'externalDataConfiguration': {
                'sourceUris': [
                    f'{storage_prefix}*'
                ],
                'sourceFormat': 'NEWLINE_DELIMITED_JSON',
                'autodetect': True,
                'hivePartitioningOptions': {
                    'mode': 'AUTO',
                    'sourceUriPrefix': storage_prefix
                }
            },
            'tableReference': {
                'projectId': project_id,
                'tableId': 'slack-analytics-bucket',
                'datasetId': dataset_id
            }
        },
        'metadata': {
            'dependsOn': [
                dataset_name
            ]
        }
    }


    return {
        'resources': [dataset, table]
    }

構成ファイルは使わずにテンプレートファイルだけでデプロイします

gcloud deployment-manager deployments create bigquery --template=bigquery.py --properties bucketName:<デプロイされたCloud Storageバケット>

The fingerprint of the deployment is b'smNEzqfkrTHSOUKjKsuOiw=='
Waiting for create [operation-1629892183402-5ca60d89d2641-86474ff3-70f465ce]...done.
Create operation operation-1629892183402-5ca60d89d2641-86474ff3-70f465ce completed successfully.
NAME        TYPE                 STATE      ERRORS  INTENT
bq-dataset  bigquery.v2.dataset  COMPLETED  []
bq-table    bigquery.v2.table    COMPLETED  []

completed successfully.と出力されればデプロイ成功です

BigQueryで分析してみる

これで分析の足回りが整ったので、実際にCloud Storageバケットに溜めたSlackの会話履歴を分析してみます。ここでは以下のSQLで対象チャンネルでリアクションに利用されている人気の絵文字TOP5でも分析してみましょう。

SELECT
    react.name,
    SUM(react.count)
FROM
    `<プロジェクト名>.slack_analytics_ds.slack-analytics-bucket`
CROSS JOIN UNNEST(reactions) AS react
GROUP BY
    react.name
ORDER BY
    SUM(react.count) DESC LIMIT 5

結果は以下のようになりました。

  • 1位... :kawaii:
  • 2位... :kusa:
  • 3位... :eraizo:
  • 4位... :aa:
  • 5位... :wakaru:

まだ収集済みのデータが少ないので実際の人気具合を反映できているかは微妙ですが、とりあえずSlackのやりとりをBigQueryで分析するという目的は達成できました。

まとめ

GCPでもSlackの会話履歴を分析するためのサーバーレスなデータ分析基盤が作れることが確認できました。実際に自分で環境構築することでAWS・GCPそれぞれの特徴がより見えて来た気がします。AWS上に構築したシステムをGCP上に再構築するというのは学習のための良いテーマになりそうなので、今後も機会を見つけて挑戦していきたいと思います。