【Step Functions × Slack API】大量のメッセージを一気に消去する定時実行アプリを作ってみた

こんにちは。DI部の春田です。

Slackは、無料プランでも実用に足る非常に助かるツールですが、メッセージ数の表示上限が結構キツイ縛りだったりします。先日、開発環境のテストに伴って通知用のBotが荒ぶったため、秒殺でメッセージ数が上限の10,000に達してしまいました。かと言って、メインでないワークスペースを有料プランに上げるのは、ちょっともったいない気もします。

Message, file storage, and app limits on the Free plan – Slack Help Center

今回は、Slackメッセージの定期消去アプリをStep Functionsで実装してみました。その時の実装の手順や注意点をご紹介していきたいと思います。せこめのライフハックですみません。(ちなみにAWS関連の記事は初投稿です。テンション上がりますねー。)

目次

各リソースの制限を確認

今回の実装で使用する主なリソースは、以下の3つです。

各リソースには制限があるので、一つずつ確認していきましょう。

まず、今回使用する Slack API のメソッドはchannels.historychat.delete の2つです。 channels:history は最大1,000件までのメッセージ履歴を取得することができるので、1,000件を越える場合は日付を指定して再度APIを叩く必要があります。また、メッセージを一括消去するメソッドは用意されていないため、 chat:delete で一件ごとリクエストをかけていく必要があります。

このAPIのRate Limitsについてですが、今回使用するOAuthトークンのScopeは全て Web API Tier 3 です。すなわち、1分間に約50リクエスト受け付けることできるので、リクエスト間隔は2秒間ぐらいが丁度良さそうですね。注意点として、インストール先のワークスペースに、 Web API Tier 3 を使用する他のAppが存在している場合、それを考慮した上でリクエスト間隔を設定する必要があります。

さて、このAPIを叩くためのプログラムを AWS Lambda で実装していくわけですが、Lambdaの最長実行時間は15分であるため、一つのLambdaだけで繰り返しAPIを叩くと、 15分 * 60秒 / 2秒間隔のリクエスト ≒ 450件 ぐらいしか削除できません。削除したい件数が1,000件以上の場合、一つのLambdaで解決するのはベストとは言えないでしょう。今回、Lambdaの実行時間を最小限に抑えるために、繰り返し処理を AWS Step Functions の方で実装していきます。

Step Functionsで気をつけたい制限は、タスクの実行に関連する制限です。データサイズの上限は32,768文字となっており、今回メインで渡すタイムスタンプは1レコードあたり20文字ほどです。(例: '1558518949.025600',32,768 / 20 ≒ 1,600 ぐらいなので、 channels:history で取得できる最大件数は十分に満たしていると言っていいでしょう。

SlackでOAuthトークンを発行する

Slack AppのOAuthトークンを発行する際は、事前に ユーザー権限 について確認しておきましょう。Slack無料プランのユーザータイプは OwnerAdminMember の3種類あります。

Permissions in Slack

OAuthトークンの発行にあたって非常に重要なポイントは、 どのユーザータイプによってトークンがインストールされたか という点です。OAuthトークンはユーザーごとに割り当てられるので、同じAppのCollaborators間で権限レベルに差があると、内容によっては権限の弱いユーザーがインストールできない可能性があります。例えば、Admin権限を持つユーザーがOAuthトークンを発行する時、あらゆるPermission Scopesを設定することができますが、Scopesの中にかなり権限の強いもの(例えば adminbot )が存在すると、Member権限を持つユーザー側ではインストールすることができません。できるだけ、Collaborators間の権限レベルは同じになるように割り当てておくべきでしょう。

さて、まずはMember権限のユーザーでOAuthトークンを発行してみます。今回使用するScopeは channels:history chat:write:bot incoming-webhook の3つです。これらのScopeを設定して、対象のチャンネルのメッセージ履歴を取得し、Incoming Webhook URLを通して投稿した通知メッセージだけ選択して削除する、という方針で実装すればうまくいくのではないでしょうか?

残念ながらこの方針ではうまくいきません。なぜなら、 Incoming Webhookで投稿されたメッセージは、同じAppで紐づけられているBotユーザーでも削除することができない からです。下の channels:history のレスポンスを見てもらうとわかるのですが、 Incoming Webhookで投稿されたメッセージとBotユーザーが投稿したメッセージ間では、Bot IDが異なります。



{
    "ok": true,
    "messages": [
        {
            "bot_id": "BJZRBDTAS",
            "type": "message",
            "text": "Hello World From Member User API!!",
            "user": "UJZQQACAJ",
            "ts": "1558622186.002700"
        },
        {
            "type": "message",
            "subtype": "bot_message",
            "text": "Hello World From Member Bot API!!",
            "ts": "1558622125.002500",
            "username": "Member1",
            "bot_id": "BJZRBDTAS"
        },
        {
            "type": "message",
            "subtype": "bot_message",
            "text": "Hello World From Member Webhook!!",
            "ts": "1558622060.002400",
            "bot_id": "BJLR635C2" # <- Bot APIからの投稿とIDが異なっている
        },
        {
            "client_msg_id": "1e96e037-0204-4c5f-a482-dee8ed0820df",
            "type": "message",
            "text": "Hello World From Admin!!",
            "user": "UJR9C6JDP",
            "ts": "1558622006.002300"
        },
        {
            "client_msg_id": "394b87e1-900f-4347-8d9b-0bafcd2e85bd",
            "type": "message",
            "text": "Hello World From Member!!",
            "user": "UJZQQACAJ",
            "ts": "1558621829.001800"
        }
    ],
    "has_more": false
}

Botユーザーのメッセージや、OAuthトークンをインストールしたユーザーのメッセージは、 chat:write:botchat:write:user で削除することができるのですが、Incoming Webhookのメッセージは削除対象にできません。もともとの通知機能がIncoming Webhookで実装されている場合、このためだけにわざわざBotを使用した通知機能へ変更するのは余計な手間だと思います。かなり強い権限ですが、今回は全ての投稿を削除できるAdmin権限のユーザーでOAuthトークンを発行します。

OAuthトークンは、作成したSlack Appの OAuth & Permissions というページから発行することができます。

Step Functionsで骨組みを組む

OAuthトークンを発行した後、まずはStep Functionsで骨組みを組んでいきます。ステートマシンの定義は以下の通りです。

{
  "StartAt": "FetchHistory",
  "States": {
    "FetchHistory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:***********:function:FetchHistory",
      "Next": "IfMessagesNone"
    },
    "IfMessagesNone": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.State",
          "StringEquals": "QUEUED",
          "Next": "DeleteMessage"
        },
        {
          "Variable": "$.State",
          "StringEquals": "EMPTY",
          "Next": "Wait00"
        }
      ],
      "Default": "Done"
    },
    "Wait00": {
      "Type": "Wait",
      "Seconds": 2,
      "Next": "Retry00"
    },
    "Retry00": {
      "Type": "Pass",
      "Next": "FetchHistory"
    },
    "DeleteMessage": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:***********:function:DeleteMessage",
      "Next": "Wait01"
    },
    "Wait01": {
      "Type": "Wait",
      "Seconds": 2,
      "Next": "Retry01"
    },
    "Retry01": {
      "Type": "Pass",
      "Next": "IfMessagesNone"
    },
    "Done": {
      "Type": "Pass",
      "End": true
    }
  }
}

Lambda関数は FetchHistoryDeleteMessage の2つです。 FetchHistory でチャンネルのメッセージ履歴を取得し、 DeleteMessage でメッセージを削除します。ステートマシンの起点は IfMessagesNone であり、各State間で渡される変数 State の値を判定し、ループさせています。ちなみに、この実装ではCloudWatch Eventsからの定時実行をトリガーに考えていますが、 channels:history を叩く際のパラメータを改造すれば、期間を指定して削除することもできます。

このように、Step Functionsで上手いことWaitの時間を作ってループ処理を実装すると、Lambdaの実行時間を最小限に抑えることができます。Step Functionsは他にも、ステートマシン上でエラー処理したり、他のマネージドサービスと連携したりもでき、活用の幅がかなり広いです。

[神アップデート]Step Functionsが新たに8つのマネージドサービスと連携可能になりました! #reinvent | DevelopersIO

Lambda関数を作成する

上記Step Functionsで使用するLambda関数は、以下の2つです。

# fetch_history.py

import urllib.request
import urllib.parse
import json
import os
import logging

from datetime import datetime
from datetime import timedelta
from datetime import timezone


SLACK_API_TOKEN = os.environ.get("SLACK_API_TOKEN")
CH_ID = os.environ.get("CH_ID")
BOT_ID = os.environ.get("BOT_ID")


def lambda_handler(event, context):
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel('INFO')
    hist_url = "https://slack.com/api/channels.history"
    base_time = get_jst_date(event['time']) - timedelta(weeks=2)
    #base_time = get_jst_date(event['time'])

    app_logger.debug(str(base_time))
    hist_params = {
        'token': SLACK_API_TOKEN,
        'count': 1000,
        'channel': CH_ID,
        'latest': base_time.timestamp()
    }
    app_logger.debug(hist_params)

    req = urllib.request.Request(hist_url)
    hist_params = urllib.parse.urlencode(hist_params).encode('ascii')
    req.data = hist_params
    with urllib.request.urlopen(req) as res:
        body = res.read().decode("utf-8")
        data = json.loads(body)
        app_logger.debug(data)
        if not data['ok']:
            error_subject = "channels.historyメソッドに失敗しました。: {}"
            app_logger.error(error_subject.format(data['error']))
            raise

    tss = [m.get('ts') for m in data['messages'] if m.get('bot_id') == BOT_ID]
    if tss:
        msgs = {
            'time': event['time'],
            'State': 'QUEUED',
            'ChId': CH_ID,
            'Msgs': tss
        }
    else:
        # tssが空なら終了
        msgs = {'State': 'END'}

    app_logger.info(msgs)
    return msgs


def get_jst_date(timestamp_utc):
    datetime_utc = datetime.strptime(timestamp_utc + "+0000", "%Y-%m-%dT%H:%M:%SZ%z")
    datetime_jst = datetime_utc.astimezone(timezone(timedelta(hours=+9)))
    return datetime_jst.replace(hour=0, minute=0, second=0, microsecond=0)

fetch_history.py のinputは、CloudWatch Eventsのトリガーを想定しています。Inputの変数 time は、タイムゾーンをUTCからJSTに変換する処理が必要で、今回はさらに時分秒を0に切り下げました。実行日から二週間以上経過したメッセージを channels.history で取得した後、削除対象の BOT_ID のみ抽出し、 msgs に入れて DeleteMessege に渡します。環境変数からは、 SLACK_API_TOKEN と、対象のチャンネルID CH_ID 、対象のBotユーザーのID BOT_ID を取得しています。チャンネルIDはチャンネルのURL、BotユーザーIDはWebhook URLに書いてあるので、そこから参照できます。

# delete_history.py

import urllib.request
import urllib.parse
import json
import os
import logging

SLACK_API_TOKEN = os.environ.get("SLACK_API_TOKEN")


def lambda_handler(event, context):
    app_logger = logging.getLogger(__name__)
    app_logger.setLevel('INFO')

    if not event['Msgs']:
        event['State'] = 'EMPTY'
        app_logger.info(event)
        return event

    delete_url = "https://slack.com/api/chat.delete"
    delete_params = {
        'token': SLACK_API_TOKEN,
        'channel': event['ChId'],
        'ts': event['Msgs'].pop(0)
    }
    req = urllib.request.Request(delete_url)
    delete_params = urllib.parse.urlencode(delete_params).encode('ascii')
    req.data = delete_params

    with urllib.request.urlopen(req) as res:
        body = res.read()
        data = json.loads(body)
        app_logger.debug(data)
        if not data['ok']:
            error_subject = "chat.deleteメソッドに失敗しました。: {}"
            app_logger.error(error_subject.format(data['error']))
            raise

    return event

delete_history.py では、入力されたリスト tss から一件ずつタイムスタンプを吐き出して chat:delete で削除していきます。 tss が空になるまで State='QUEUED' の状態が続き、その間ステートマシン上でひたすらメッセージの削除を繰り返します。Step Functionsを骨組みにしたLambdaの実装では、ステートマシンで何の変数を使用するのか、Lambda側で意識しながら実装していくことが大切です。このケースでは、分岐の判定に使用する変数 State に何の値を入れるかが肝となります。

最後に

今回は、Step FunctionsでSlackのメッセージを定期消去するステートマシンを作成してみました。結果的にかなり汎用性が高く、日々の開発で地味に重宝するアプリとなりました。

Step Functions + Lambda での実装はめちゃ簡単なので、ちょっとしたアプリを作成するときには積極的に使っていきたいですね。サーバレス万歳🙌