この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
CX事業本部@大阪の岩田です。以前「AWSのサービスを活用してSlackのやりとりを分析してみよう」というテーマでLTさせて頂いたことがあるのですが、GCPでも似たようなことができないか挑戦してみました。
構成
今回構築する環境です。
Cloud Scheduler → Cloud Pub/Sub → Cloud Functionsという流れでSlackの会話履歴をスクレイピングするプログラムを起動し、指定されたチャンネルの前日分の会話履歴を収集。収集したデータをJSON Lines形式でCloud Storageに保存し、Big Queryの外部テーブル経由で分析するという構成です。
環境構築
さっそく順を追って環境を構築していきましょう
Secret ManagerにSlackのトークンを保存
Cloud FunctionsからSlackのAPIを呼び出すためにSlackのトークンが必要になります。トークンをセキュアに管理するためSecret Managerのシークレットを作成し、Slackのトークンを保存しましょう。
まずはCloud SDKを利用してSLACK_TOKEN
という名前でシークレットを作成します。
gcloud secrets create SLACK_TOKEN
続いてシークレットバージョンを作成します。Slackから発行したトークンを事前にslack_token.txt
というテキストファイルに保存していることが前提になります。※Slackのトークン発行手順は割愛します。
gcloud secrets versions add SLACK_TOKEN --data-file=slack_token.txt
Cloud Functionsからシークレットが読み込めるようにCloud Functionsが利用するデフォルトのサービスアカウントにsecretAccessorのロールをバインドします
gcloud secrets add-iam-policy-binding SLACK_TOKEN --member=serviceAccount:<プロジェクトID>@appspot.gserviceaccount.com --role=roles/secretmanager.secretAccessor
Cloud Functionsのソースコードを準備
続いて定期実行するCloud Functionsのコードを実装します。Python3.9で実装しています。 ※異常系の考慮は実装していません。もしコードを流用される際は適宜エラーチェック等追加で実装して下さい
from datetime import datetime, timezone, timedelta
import os
import json
from google.cloud import secretmanager, storage
from slack_sdk import WebClient
SLACK_CHANNEL_ID = os.environ['SLACK_CHANNEL_ID']
BUCKET_NAME = os.environ['BUCKET_NAME']
PROJECT_ID = os.environ['PROJECT_ID']
SECRET_VERSION = os.environ['SECRET_VERSION']
sm_client = secretmanager.SecretManagerServiceClient()
secret_name = f"projects/{PROJECT_ID}/secrets/SLACK_TOKEN/versions/{SECRET_VERSION}"
secret_res = sm_client.access_secret_version(request={"name": secret_name})
slack_token = secret_res.payload.data.decode("UTF-8")
def handler(event, conttext):
JST = timezone(timedelta(hours=+9), 'JST')
today = datetime.now(JST).replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today - timedelta(days=1)
unix_yesterday_start = int(yesterday.timestamp())
unix_today_start = int(today.timestamp())
slack_client = WebClient(slack_token)
latest = unix_today_start
slack_messages = []
while True:
res = slack_client.conversations_history(
channel=SLACK_CHANNEL_ID, latest=latest, inclusive='false',
oldest=unix_yesterday_start)
slack_messages.extend(
[msg for msg in res.data['messages'] if msg['text'] != '']
)
if res.data['has_more'] == False:
break
print('slack channel has more history continue scrape...')
latest = int(float(res.data['messages'][-1]['ts']))
if len(slack_messages) == 0:
print('no messages... skip upload file')
return
upload_file(yesterday, slack_messages)
def upload_file(yesterday, slack_messages):
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)
prefix = yesterday.strftime('slack_files/year=%Y/month=%m/day=%d/')
blob = bucket.blob(f'{prefix}slack.json')
blob.upload_from_string(
'\n'.join([json.dumps(msg) for msg in slack_messages])
)
ざっくり以下のような処理を実行しています
- Secret ManagerからSlackのトークンを取得
- Slackの
conversations.history
をAPIを呼び出し、Slackの会話履歴を収集- ※
conversations.history
だけだとスレッド内の会話履歴までは取得できないようなので、その点はご注意下さい。
- ※
- 収集した会話履歴を
slack_files/year=YYYY/month=mm/day=dd/slack.json
というオブジェクト名でCloud Storageにアップロード
Cloud Functionsと関連するリソースのデプロイ
ソースコードが準備できたので、Cloud Functionsと関連するリソースをDeployment Mangerでまとめてデプロイします。ディレクトリの構成は以下のような構成です。
.
├── src
│ ├── main.py
│ └── requirements.txt
└── templates
├── cloud-functions.py
├── resources.yml
├── schedule.py
└── storage.py
src
ディレクトリ配下は ZIPにまとめてCloud Functionsのパッケージとしてデプロイします。main.py
は先程のPythonのコードでrequirements.txt
はCloud Functionsの実行環境にライブラリをインストールするために利用します。requirements.txt
の中身は以下の通りです
google-cloud-storage
google-cloud-secret-manager
slack_sdk
templates
ディレクトリ配下はDeployment Managerの構成ファイルとテンプレートファイルです。
まずメインの構成ファイルは以下の通り
imports:
- path: ../src/main.py
- path: ../src/requirements.txt
- path: cloud-functions.py
- path: storage.py
- path: schedule.py
resources:
- type: cloud-functions.py
name: slack-scrape
properties:
function: slack-scrape
codeLocation: ../src/
codeBucket: <ソースコードのデプロイに利用できる適当なバケット名>
location: asia-northeast1
timeout: 60s
runtime: python39
memory: 256
entryPoint: handler
environmentVariables:
SLACK_CHANNEL_ID: <会話履歴収集対象とするSlackのチャンネルID>
BUCKET_NAME: $(ref.storage-bucket.name)
SECRET_VERSION: <Slackのトークンを保存したSecrets Managerのシークレットバージョン>
topicName: $(ref.pubsubtopic.name)
metadata:
dependsOn:
- pubsubtopic
- type: pubsub.v1.topic
name: pubsubtopic
properties:
topic: slack-scrape
- type: storage.py
name: storage-bucket
properties:
projection: full
location: asia-northeast1
locationType: region
storageClass: STANDARD
iamConfiguration:
bucketPolicyOnly:
enabled: true
- type: schedule.py
name: slack-scrape-schedule
properties:
schedule: "0 1 * * *"
timeZone: "Asia/Tokyo"
topicName: $(ref.pubsubtopic.name)
metadata:
dependsOn:
- pubsubtopic
Cloud Functions用のテンプレートファイルです。以下ブログの内容を少し加工しています。
import zipfile
import io
import base64
import hashlib
def generate_config(context):
in_memory_output_file = io.BytesIO()
with zipfile.ZipFile(in_memory_output_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file:
for imp in context.imports:
if imp.startswith(context.properties['codeLocation']):
zip_file.writestr(
imp[len(context.properties['codeLocation']):], context.imports[imp])
content = base64.b64encode(in_memory_output_file.getvalue())
m = hashlib.md5()
m.update(content)
source_archive_url = 'gs://%s/%s' % (
context.properties['codeBucket'], m.hexdigest() + '.zip')
cmd = f"echo '{content.decode('ascii')}' | base64 -d > /function/function.zip;"
volumes = [{'name': 'function-code', 'path': '/function'}]
upload_function_code = {
'type': 'gcp-types/cloudbuild-v1:cloudbuild.projects.builds.create',
'name': context.properties['function'] + '-upload-function-code',
'properties': {
'steps': [{
'name': 'ubuntu',
'args': ['bash', '-c', cmd],
'volumes': volumes,
}, {
'name': 'gcr.io/cloud-builders/gsutil',
'args': ['cp', '/function/function.zip', source_archive_url],
'volumes': volumes
}],
'timeout': '120s'
}
}
env_vars = {
'PROJECT_ID': context.env['project']
}
env_vars.update(context.properties['environmentVariables'])
cloud_function = {
'type': 'gcp-types/cloudfunctions-v1:projects.locations.functions',
'name': context.properties['function'] + '-cloud-function',
'properties': {
'parent': f"projects/{context.env['project']}/locations/{context.properties['location']}",
'function': context.properties['function'],
'sourceArchiveUrl': source_archive_url,
'entryPoint': context.properties['entryPoint'],
'eventTrigger': {
'resource': context.properties['topicName'],
'eventType': 'providers/cloud.pubsub/eventTypes/topic.publish'
},
'timeout': context.properties['timeout'],
'availableMemoryMb': context.properties['memory'],
'runtime': context.properties['runtime'],
'environmentVariables': env_vars
},
'metadata': {
'dependsOn': [
context.properties['function'] + '-upload-function-code'
]
}
}
return {
'resources': [upload_function_code, cloud_function]
}
続いてCloud Storage用のテンプレートファイルです。GitHubのリポジトリで公開されている以下テンプレートファイルのバケット名の命名規約だけ少し加工したものです。
# Copyright 2018 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" This template creates a Google Cloud Storage bucket. """
from hashlib import sha1
def generate_config(context):
""" Entry point for the deployment resources. """
resources = []
properties = context.properties
project_id = properties.get('project', context.env['project'])
bucket_name = properties.get('name',
'{}-{}-{}'.format(
context.env['deployment'],
context.env['name'],
context.env['project_number'])
)
# output variables
bucket_selflink = '$(ref.{}.selfLink)'.format(context.env['name'])
bucket_uri = 'gs://' + bucket_name + '/'
bucket = {
'name': context.env['name'],
# https://cloud.google.com/storage/docs/json_api/v1/buckets
'type': 'gcp-types/storage-v1:buckets',
'properties': {
'project': project_id,
'name': bucket_name
}
}
requesterPays = context.properties.get('requesterPays')
if requesterPays is not None:
bucket['properties']['billing'] = {'requesterPays': requesterPays}
optional_props = [
'acl',
'iamConfiguration',
'retentionPolicy',
'encryption',
'defaultEventBasedHold',
'cors',
'defaultObjectAcl',
'billing',
'location',
'versioning',
'storageClass',
'predefinedAcl',
'predefinedDefaultObjectAcl',
'logging',
'lifecycle',
'labels',
'website'
]
for prop in optional_props:
if prop in properties:
bucket['properties'][prop] = properties[prop]
if not properties.get('iamConfiguration', {}).get('bucketPolicyOnly', {}).get('enabled', False):
if 'predefinedAcl' not in bucket['properties']:
bucket['properties']['predefinedAcl'] = 'private'
if 'predefinedDefaultObjectAcl' not in bucket['properties']:
bucket['properties']['predefinedDefaultObjectAcl'] = 'private'
resources.append(bucket)
# If IAM policy bindings are defined, apply these bindings.
storage_provider_type = 'gcp-types/storage-v1:virtual.buckets.iamMemberBinding'
bindings = properties.get('bindings', [])
if 'dependsOn' in properties:
dependson = { 'metadata': { 'dependsOn': properties['dependsOn'] } }
dependson_root = properties['dependsOn']
else:
dependson = {}
dependson_root = []
if bindings:
for role in bindings:
for member in role['members']:
suffix = sha1('{}-{}'.format(role['role'], member).encode('utf-8')).hexdigest()[:10]
policy_get_name = '{}-{}'.format(context.env['name'], suffix)
policy_name = '{}-iampolicy'.format(policy_get_name)
iam_policy_resource = {
'name': policy_name,
# TODO - Virtual type documentation needed
'type': (storage_provider_type),
'properties':
{
'bucket': '$(ref.{}.name)'.format(context.env['name']),
'role': role['role'],
'member': member,
}
}
iam_policy_resource.update(dependson)
resources.append(iam_policy_resource)
dependson = { 'metadata': { 'dependsOn': [policy_name] + dependson_root } }
if properties.get('billing', {}).get('requesterPays'):
for resource in resources:
resource['properties']['userProject'] = properties.get('userProject', context.env['project'])
return {
'resources':
resources,
'outputs':
[
{
'name': 'name',
'value': bucket_name
},
{
'name': 'selfLink',
'value': bucket_selflink
},
{
'name': 'url',
'value': bucket_uri
}
]
}
最後にCloud Scheduler用のテンプレートファイルです
def generate_config(context):
resources = []
properties = context.properties
project_id = context.env['project']
region = properties.get('region', 'asia-northeast1')
topic_name = properties.get('topicName')
description = properties.get('description', '')
name = properties.get('name', context.env['name'])
pubsub_target = {
'topicName': topic_name,
'data': 'e30='
}
time_zone = 'Asia/Tokyo'
schedule = '0 1 * * *'
scheduler = {
'name': context.env['name'],
'type': 'gcp-types/cloudscheduler-v1:projects.locations.jobs',
'properties': {
'parent': f'projects/{project_id}/locations/{region}',
'name': f'{name}',
'description': description,
'schedule': schedule,
'timeZone': time_zone,
'pubsubTarget': pubsub_target
}
}
if 'dependsOn' in properties:
dependson = { 'metadata': { 'dependsOn': properties['dependsOn'] } }
scheduler.update(dependson)
return {
'resources': [scheduler]
}
テンプレート類の準備ができたので、Deployment Managerを利用してデプロイします
gcloud deployment-manager deployments create slack-analytics --config resources.yml
こんな感じでcompleted successfully.
と表示されればOKです
The fingerprint of the deployment is b'dify4zw0LApNDb_G6ffBuA=='
Waiting for create [operation-1629887189548-5ca5faef4ff7d-6ade04b9-0aa08d36]...done.
Create operation operation-1629887189548-5ca5faef4ff7d-6ade04b9-0aa08d36 completed successfully.
NAME TYPE STATE ERRORS INTENT
pubsubtopic pubsub.v1.topic COMPLETED []
slack-scrape-cloud-function gcp-types/cloudfunctions-v1:projects.locations.functions COMPLETED []
slack-scrape-schedule gcp-types/cloudscheduler-v1:projects.locations.jobs COMPLETED []
slack-scrape-upload-function-code gcp-types/cloudbuild-v1:cloudbuild.projects.builds.create COMPLETED []
storage-bucket gcp-types/storage-v1:buckets COMPLETED []
ジョブの手動実行
このあとBigQueryの外部テーブルを作成するのですが、Cloud Storageバケットにファイルが存在しない状態だと外部テーブルの作成が失敗してしまいます。Cloud Schedulerのジョブを手動実行して前日分の会話履歴を収集してCloud Storageバケットにアップロードするしておきます。
gcloud scheduler jobs run slack-scrape-schedule
ジョブを実行したらCloud Storageバケットの中身を確認しておきましょう
gsutil ls gs://<デプロイされたCloud Storageバケット>/
gs://<デプロイされたCloud Storageバケット>/slack_files/
BigQuery関連リソースのデプロイ
続いてBiqQuery関連のリソースもDeployment Managerでまとめてデプロイします。先程のtemplates
ディレクトリにbigquery.py
という名前でテンプレートファイルを作成します。
def generate_config(context):
resources = []
properties = context.properties
bucket_name = properties['bucketName']
project_id = context.env['project']
storage_prefix = f'gs://{bucket_name}/slack_files'
dataset_id = 'slack_analytics_ds'
dataset_name = 'bq-dataset'
dataset = {
'name': dataset_name,
'type': 'bigquery.v2.dataset',
'properties': {
'datasetReference': {
'datasetId': dataset_id
},
'location': 'asia-northeast1'
}
}
table = {
'name': 'bq-table',
'type': 'bigquery.v2.table',
'properties': {
'externalDataConfiguration': {
'sourceUris': [
f'{storage_prefix}*'
],
'sourceFormat': 'NEWLINE_DELIMITED_JSON',
'autodetect': True,
'hivePartitioningOptions': {
'mode': 'AUTO',
'sourceUriPrefix': storage_prefix
}
},
'tableReference': {
'projectId': project_id,
'tableId': 'slack-analytics-bucket',
'datasetId': dataset_id
}
},
'metadata': {
'dependsOn': [
dataset_name
]
}
}
return {
'resources': [dataset, table]
}
構成ファイルは使わずにテンプレートファイルだけでデプロイします
gcloud deployment-manager deployments create bigquery --template=bigquery.py --properties bucketName:<デプロイされたCloud Storageバケット>
The fingerprint of the deployment is b'smNEzqfkrTHSOUKjKsuOiw=='
Waiting for create [operation-1629892183402-5ca60d89d2641-86474ff3-70f465ce]...done.
Create operation operation-1629892183402-5ca60d89d2641-86474ff3-70f465ce completed successfully.
NAME TYPE STATE ERRORS INTENT
bq-dataset bigquery.v2.dataset COMPLETED []
bq-table bigquery.v2.table COMPLETED []
completed successfully.
と出力されればデプロイ成功です
BigQueryで分析してみる
これで分析の足回りが整ったので、実際にCloud Storageバケットに溜めたSlackの会話履歴を分析してみます。ここでは以下のSQLで対象チャンネルでリアクションに利用されている人気の絵文字TOP5でも分析してみましょう。
SELECT
react.name,
SUM(react.count)
FROM
`<プロジェクト名>.slack_analytics_ds.slack-analytics-bucket`
CROSS JOIN UNNEST(reactions) AS react
GROUP BY
react.name
ORDER BY
SUM(react.count) DESC LIMIT 5
結果は以下のようになりました。
- 1位...
:kawaii:
- 2位...
:kusa:
- 3位...
:eraizo:
- 4位...
:aa:
- 5位...
:wakaru:
まだ収集済みのデータが少ないので実際の人気具合を反映できているかは微妙ですが、とりあえずSlackのやりとりをBigQueryで分析するという目的は達成できました。
まとめ
GCPでもSlackの会話履歴を分析するためのサーバーレスなデータ分析基盤が作れることが確認できました。実際に自分で環境構築することでAWS・GCPそれぞれの特徴がより見えて来た気がします。AWS上に構築したシステムをGCP上に再構築するというのは学習のための良いテーマになりそうなので、今後も機会を見つけて挑戦していきたいと思います。