GCPのサービスを活用してSlackのやりとりを分析してみよう
CX事業本部@大阪の岩田です。以前「AWSのサービスを活用してSlackのやりとりを分析してみよう」というテーマでLTさせて頂いたことがあるのですが、GCPでも似たようなことができないか挑戦してみました。
構成
今回構築する環境です。
Cloud Scheduler → Cloud Pub/Sub → Cloud Functionsという流れでSlackの会話履歴をスクレイピングするプログラムを起動し、指定されたチャンネルの前日分の会話履歴を収集。収集したデータをJSON Lines形式でCloud Storageに保存し、Big Queryの外部テーブル経由で分析するという構成です。
環境構築
さっそく順を追って環境を構築していきましょう
Secret ManagerにSlackのトークンを保存
Cloud FunctionsからSlackのAPIを呼び出すためにSlackのトークンが必要になります。トークンをセキュアに管理するためSecret Managerのシークレットを作成し、Slackのトークンを保存しましょう。
まずはCloud SDKを利用してSLACK_TOKEN
という名前でシークレットを作成します。
gcloud secrets create SLACK_TOKEN
続いてシークレットバージョンを作成します。Slackから発行したトークンを事前にslack_token.txt
というテキストファイルに保存していることが前提になります。※Slackのトークン発行手順は割愛します。
gcloud secrets versions add SLACK_TOKEN --data-file=slack_token.txt
Cloud Functionsからシークレットが読み込めるようにCloud Functionsが利用するデフォルトのサービスアカウントにsecretAccessorのロールをバインドします
gcloud secrets add-iam-policy-binding SLACK_TOKEN --member=serviceAccount:<プロジェクトID>@appspot.gserviceaccount.com --role=roles/secretmanager.secretAccessor
Cloud Functionsのソースコードを準備
続いて定期実行するCloud Functionsのコードを実装します。Python3.9で実装しています。 ※異常系の考慮は実装していません。もしコードを流用される際は適宜エラーチェック等追加で実装して下さい
from datetime import datetime, timezone, timedelta import os import json from google.cloud import secretmanager, storage from slack_sdk import WebClient SLACK_CHANNEL_ID = os.environ['SLACK_CHANNEL_ID'] BUCKET_NAME = os.environ['BUCKET_NAME'] PROJECT_ID = os.environ['PROJECT_ID'] SECRET_VERSION = os.environ['SECRET_VERSION'] sm_client = secretmanager.SecretManagerServiceClient() secret_name = f"projects/{PROJECT_ID}/secrets/SLACK_TOKEN/versions/{SECRET_VERSION}" secret_res = sm_client.access_secret_version(request={"name": secret_name}) slack_token = secret_res.payload.data.decode("UTF-8") def handler(event, conttext): JST = timezone(timedelta(hours=+9), 'JST') today = datetime.now(JST).replace(hour=0, minute=0, second=0, microsecond=0) yesterday = today - timedelta(days=1) unix_yesterday_start = int(yesterday.timestamp()) unix_today_start = int(today.timestamp()) slack_client = WebClient(slack_token) latest = unix_today_start slack_messages = [] while True: res = slack_client.conversations_history( channel=SLACK_CHANNEL_ID, latest=latest, inclusive='false', oldest=unix_yesterday_start) slack_messages.extend( [msg for msg in res.data['messages'] if msg['text'] != ''] ) if res.data['has_more'] == False: break print('slack channel has more history continue scrape...') latest = int(float(res.data['messages'][-1]['ts'])) if len(slack_messages) == 0: print('no messages... skip upload file') return upload_file(yesterday, slack_messages) def upload_file(yesterday, slack_messages): storage_client = storage.Client() bucket = storage_client.bucket(BUCKET_NAME) prefix = yesterday.strftime('slack_files/year=%Y/month=%m/day=%d/') blob = bucket.blob(f'{prefix}slack.json') blob.upload_from_string( '\n'.join([json.dumps(msg) for msg in slack_messages]) )
ざっくり以下のような処理を実行しています
- Secret ManagerからSlackのトークンを取得
- Slackの
conversations.history
をAPIを呼び出し、Slackの会話履歴を収集- ※
conversations.history
だけだとスレッド内の会話履歴までは取得できないようなので、その点はご注意下さい。
- ※
- 収集した会話履歴を
slack_files/year=YYYY/month=mm/day=dd/slack.json
というオブジェクト名でCloud Storageにアップロード
Cloud Functionsと関連するリソースのデプロイ
ソースコードが準備できたので、Cloud Functionsと関連するリソースをDeployment Mangerでまとめてデプロイします。ディレクトリの構成は以下のような構成です。
. ├── src │ ├── main.py │ └── requirements.txt └── templates ├── cloud-functions.py ├── resources.yml ├── schedule.py └── storage.py
src
ディレクトリ配下は ZIPにまとめてCloud Functionsのパッケージとしてデプロイします。main.py
は先程のPythonのコードでrequirements.txt
はCloud Functionsの実行環境にライブラリをインストールするために利用します。requirements.txt
の中身は以下の通りです
google-cloud-storage google-cloud-secret-manager slack_sdk
templates
ディレクトリ配下はDeployment Managerの構成ファイルとテンプレートファイルです。
まずメインの構成ファイルは以下の通り
imports: - path: ../src/main.py - path: ../src/requirements.txt - path: cloud-functions.py - path: storage.py - path: schedule.py resources: - type: cloud-functions.py name: slack-scrape properties: function: slack-scrape codeLocation: ../src/ codeBucket: <ソースコードのデプロイに利用できる適当なバケット名> location: asia-northeast1 timeout: 60s runtime: python39 memory: 256 entryPoint: handler environmentVariables: SLACK_CHANNEL_ID: <会話履歴収集対象とするSlackのチャンネルID> BUCKET_NAME: $(ref.storage-bucket.name) SECRET_VERSION: <Slackのトークンを保存したSecrets Managerのシークレットバージョン> topicName: $(ref.pubsubtopic.name) metadata: dependsOn: - pubsubtopic - type: pubsub.v1.topic name: pubsubtopic properties: topic: slack-scrape - type: storage.py name: storage-bucket properties: projection: full location: asia-northeast1 locationType: region storageClass: STANDARD iamConfiguration: bucketPolicyOnly: enabled: true - type: schedule.py name: slack-scrape-schedule properties: schedule: "0 1 * * *" timeZone: "Asia/Tokyo" topicName: $(ref.pubsubtopic.name) metadata: dependsOn: - pubsubtopic
Cloud Functions用のテンプレートファイルです。以下ブログの内容を少し加工しています。
import zipfile import io import base64 import hashlib def generate_config(context): in_memory_output_file = io.BytesIO() with zipfile.ZipFile(in_memory_output_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file: for imp in context.imports: if imp.startswith(context.properties['codeLocation']): zip_file.writestr( imp[len(context.properties['codeLocation']):], context.imports[imp]) content = base64.b64encode(in_memory_output_file.getvalue()) m = hashlib.md5() m.update(content) source_archive_url = 'gs://%s/%s' % ( context.properties['codeBucket'], m.hexdigest() + '.zip') cmd = f"echo '{content.decode('ascii')}' | base64 -d > /function/function.zip;" volumes = [{'name': 'function-code', 'path': '/function'}] upload_function_code = { 'type': 'gcp-types/cloudbuild-v1:cloudbuild.projects.builds.create', 'name': context.properties['function'] + '-upload-function-code', 'properties': { 'steps': [{ 'name': 'ubuntu', 'args': ['bash', '-c', cmd], 'volumes': volumes, }, { 'name': 'gcr.io/cloud-builders/gsutil', 'args': ['cp', '/function/function.zip', source_archive_url], 'volumes': volumes }], 'timeout': '120s' } } env_vars = { 'PROJECT_ID': context.env['project'] } env_vars.update(context.properties['environmentVariables']) cloud_function = { 'type': 'gcp-types/cloudfunctions-v1:projects.locations.functions', 'name': context.properties['function'] + '-cloud-function', 'properties': { 'parent': f"projects/{context.env['project']}/locations/{context.properties['location']}", 'function': context.properties['function'], 'sourceArchiveUrl': source_archive_url, 'entryPoint': context.properties['entryPoint'], 'eventTrigger': { 'resource': context.properties['topicName'], 'eventType': 'providers/cloud.pubsub/eventTypes/topic.publish' }, 'timeout': context.properties['timeout'], 'availableMemoryMb': context.properties['memory'], 'runtime': context.properties['runtime'], 'environmentVariables': env_vars }, 'metadata': { 'dependsOn': [ context.properties['function'] + '-upload-function-code' ] } } return { 'resources': [upload_function_code, cloud_function] }
続いてCloud Storage用のテンプレートファイルです。GitHubのリポジトリで公開されている以下テンプレートファイルのバケット名の命名規約だけ少し加工したものです。
# Copyright 2018 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This template creates a Google Cloud Storage bucket. """ from hashlib import sha1 def generate_config(context): """ Entry point for the deployment resources. """ resources = [] properties = context.properties project_id = properties.get('project', context.env['project']) bucket_name = properties.get('name', '{}-{}-{}'.format( context.env['deployment'], context.env['name'], context.env['project_number']) ) # output variables bucket_selflink = '$(ref.{}.selfLink)'.format(context.env['name']) bucket_uri = 'gs://' + bucket_name + '/' bucket = { 'name': context.env['name'], # https://cloud.google.com/storage/docs/json_api/v1/buckets 'type': 'gcp-types/storage-v1:buckets', 'properties': { 'project': project_id, 'name': bucket_name } } requesterPays = context.properties.get('requesterPays') if requesterPays is not None: bucket['properties']['billing'] = {'requesterPays': requesterPays} optional_props = [ 'acl', 'iamConfiguration', 'retentionPolicy', 'encryption', 'defaultEventBasedHold', 'cors', 'defaultObjectAcl', 'billing', 'location', 'versioning', 'storageClass', 'predefinedAcl', 'predefinedDefaultObjectAcl', 'logging', 'lifecycle', 'labels', 'website' ] for prop in optional_props: if prop in properties: bucket['properties'][prop] = properties[prop] if not properties.get('iamConfiguration', {}).get('bucketPolicyOnly', {}).get('enabled', False): if 'predefinedAcl' not in bucket['properties']: bucket['properties']['predefinedAcl'] = 'private' if 'predefinedDefaultObjectAcl' not in bucket['properties']: bucket['properties']['predefinedDefaultObjectAcl'] = 'private' resources.append(bucket) # If IAM policy bindings are defined, apply these bindings. storage_provider_type = 'gcp-types/storage-v1:virtual.buckets.iamMemberBinding' bindings = properties.get('bindings', []) if 'dependsOn' in properties: dependson = { 'metadata': { 'dependsOn': properties['dependsOn'] } } dependson_root = properties['dependsOn'] else: dependson = {} dependson_root = [] if bindings: for role in bindings: for member in role['members']: suffix = sha1('{}-{}'.format(role['role'], member).encode('utf-8')).hexdigest()[:10] policy_get_name = '{}-{}'.format(context.env['name'], suffix) policy_name = '{}-iampolicy'.format(policy_get_name) iam_policy_resource = { 'name': policy_name, # TODO - Virtual type documentation needed 'type': (storage_provider_type), 'properties': { 'bucket': '$(ref.{}.name)'.format(context.env['name']), 'role': role['role'], 'member': member, } } iam_policy_resource.update(dependson) resources.append(iam_policy_resource) dependson = { 'metadata': { 'dependsOn': [policy_name] + dependson_root } } if properties.get('billing', {}).get('requesterPays'): for resource in resources: resource['properties']['userProject'] = properties.get('userProject', context.env['project']) return { 'resources': resources, 'outputs': [ { 'name': 'name', 'value': bucket_name }, { 'name': 'selfLink', 'value': bucket_selflink }, { 'name': 'url', 'value': bucket_uri } ] }
最後にCloud Scheduler用のテンプレートファイルです
def generate_config(context): resources = [] properties = context.properties project_id = context.env['project'] region = properties.get('region', 'asia-northeast1') topic_name = properties.get('topicName') description = properties.get('description', '') name = properties.get('name', context.env['name']) pubsub_target = { 'topicName': topic_name, 'data': 'e30=' } time_zone = 'Asia/Tokyo' schedule = '0 1 * * *' scheduler = { 'name': context.env['name'], 'type': 'gcp-types/cloudscheduler-v1:projects.locations.jobs', 'properties': { 'parent': f'projects/{project_id}/locations/{region}', 'name': f'{name}', 'description': description, 'schedule': schedule, 'timeZone': time_zone, 'pubsubTarget': pubsub_target } } if 'dependsOn' in properties: dependson = { 'metadata': { 'dependsOn': properties['dependsOn'] } } scheduler.update(dependson) return { 'resources': [scheduler] }
テンプレート類の準備ができたので、Deployment Managerを利用してデプロイします
gcloud deployment-manager deployments create slack-analytics --config resources.yml
こんな感じでcompleted successfully.
と表示されればOKです
The fingerprint of the deployment is b'dify4zw0LApNDb_G6ffBuA==' Waiting for create [operation-1629887189548-5ca5faef4ff7d-6ade04b9-0aa08d36]...done. Create operation operation-1629887189548-5ca5faef4ff7d-6ade04b9-0aa08d36 completed successfully. NAME TYPE STATE ERRORS INTENT pubsubtopic pubsub.v1.topic COMPLETED [] slack-scrape-cloud-function gcp-types/cloudfunctions-v1:projects.locations.functions COMPLETED [] slack-scrape-schedule gcp-types/cloudscheduler-v1:projects.locations.jobs COMPLETED [] slack-scrape-upload-function-code gcp-types/cloudbuild-v1:cloudbuild.projects.builds.create COMPLETED [] storage-bucket gcp-types/storage-v1:buckets COMPLETED []
ジョブの手動実行
このあとBigQueryの外部テーブルを作成するのですが、Cloud Storageバケットにファイルが存在しない状態だと外部テーブルの作成が失敗してしまいます。Cloud Schedulerのジョブを手動実行して前日分の会話履歴を収集してCloud Storageバケットにアップロードするしておきます。
gcloud scheduler jobs run slack-scrape-schedule
ジョブを実行したらCloud Storageバケットの中身を確認しておきましょう
gsutil ls gs://<デプロイされたCloud Storageバケット>/ gs://<デプロイされたCloud Storageバケット>/slack_files/
BigQuery関連リソースのデプロイ
続いてBiqQuery関連のリソースもDeployment Managerでまとめてデプロイします。先程のtemplates
ディレクトリにbigquery.py
という名前でテンプレートファイルを作成します。
def generate_config(context): resources = [] properties = context.properties bucket_name = properties['bucketName'] project_id = context.env['project'] storage_prefix = f'gs://{bucket_name}/slack_files' dataset_id = 'slack_analytics_ds' dataset_name = 'bq-dataset' dataset = { 'name': dataset_name, 'type': 'bigquery.v2.dataset', 'properties': { 'datasetReference': { 'datasetId': dataset_id }, 'location': 'asia-northeast1' } } table = { 'name': 'bq-table', 'type': 'bigquery.v2.table', 'properties': { 'externalDataConfiguration': { 'sourceUris': [ f'{storage_prefix}*' ], 'sourceFormat': 'NEWLINE_DELIMITED_JSON', 'autodetect': True, 'hivePartitioningOptions': { 'mode': 'AUTO', 'sourceUriPrefix': storage_prefix } }, 'tableReference': { 'projectId': project_id, 'tableId': 'slack-analytics-bucket', 'datasetId': dataset_id } }, 'metadata': { 'dependsOn': [ dataset_name ] } } return { 'resources': [dataset, table] }
構成ファイルは使わずにテンプレートファイルだけでデプロイします
gcloud deployment-manager deployments create bigquery --template=bigquery.py --properties bucketName:<デプロイされたCloud Storageバケット> The fingerprint of the deployment is b'smNEzqfkrTHSOUKjKsuOiw==' Waiting for create [operation-1629892183402-5ca60d89d2641-86474ff3-70f465ce]...done. Create operation operation-1629892183402-5ca60d89d2641-86474ff3-70f465ce completed successfully. NAME TYPE STATE ERRORS INTENT bq-dataset bigquery.v2.dataset COMPLETED [] bq-table bigquery.v2.table COMPLETED []
completed successfully.
と出力されればデプロイ成功です
BigQueryで分析してみる
これで分析の足回りが整ったので、実際にCloud Storageバケットに溜めたSlackの会話履歴を分析してみます。ここでは以下のSQLで対象チャンネルでリアクションに利用されている人気の絵文字TOP5でも分析してみましょう。
SELECT react.name, SUM(react.count) FROM `<プロジェクト名>.slack_analytics_ds.slack-analytics-bucket` CROSS JOIN UNNEST(reactions) AS react GROUP BY react.name ORDER BY SUM(react.count) DESC LIMIT 5
結果は以下のようになりました。
- 1位...
:kawaii:
- 2位...
:kusa:
- 3位...
:eraizo:
- 4位...
:aa:
- 5位...
:wakaru:
まだ収集済みのデータが少ないので実際の人気具合を反映できているかは微妙ですが、とりあえずSlackのやりとりをBigQueryで分析するという目的は達成できました。
まとめ
GCPでもSlackの会話履歴を分析するためのサーバーレスなデータ分析基盤が作れることが確認できました。実際に自分で環境構築することでAWS・GCPそれぞれの特徴がより見えて来た気がします。AWS上に構築したシステムをGCP上に再構築するというのは学習のための良いテーマになりそうなので、今後も機会を見つけて挑戦していきたいと思います。