Amazon API Gateway経由でSalesforceのRest APIをコールする(Serverless Framework使用)

Salesforce REST APIの運用を楽にする方法として、Amazon API Gatewayを差し挟み、認証をAWS IAMに移譲するサンプルを作ってみました。
2022.05.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

SalesforceのREST APIをAPIGatewayでラップする話 - Qiitaという素晴らしい記事があります。

SalesforceのREST APIを直接コールするのではなく、その前にAmazon API Gatewayを立てることで、次の3つの主なメリットが得られると記載されています。

- クライアントから使いやすいAPIを提供する
- APIの運用を楽にする
- Salesforceの制約を緩和する

このうち、私は「APIの運用を楽にする」ことを目的に、この構成を実装してみました。

背景

当社では基幹システムの一部として、Salesforceを使っていますが、データがSalesforceに貯まっていくに従って、様々な部署からSalesforceのデータの利用をAPIで行いたいというリクエストが増えてきました。
Salesforce REST APIを使うためには、

  • 接続アプリケーションの認証情報(アクセスキー、シークレットキー)
  • Salesforceのユーザ

が必要ですが、これをクライアント毎に払い出すのはライセンス費用と手間を考えると大変です。
かといって、クライアント間で共有するのもセキュリティの観点からハイリスクです。

そこで、Salesforceの接続情報はAPI Gatewyで保持し、APIへのアクセスはAWSのIAM情報で制御することで、クライアントにSalesforceの接続情報が漏れることを防げますし、万が一API Gatewayへの認証情報が流出した場合にもSalesforce側の設定を変えることなく対応できます。使っているIAM情報を無効化するだけで良いのです。

実装

今回、サンプルとして、Serverless Frameworkのテンプレート(serverless.yml)、API Gatewayからのリクエストを仲介してSalesforce REST APIを呼び出すLambda(app.py)、デプロイしたAPI Gatewayを利用するスクリプト(client.js)の3つを主に示していきます。

Serverless Frameworkのテンプレート(serverless.yml)

service: sf-api-gateway

plugins:
  - serverless-step-functions
  - serverless-python-requirements

provider:
  name: aws
  runtime: python3.9
  lambdaHashingVersion: 20201221
  stage: ${opt:stage, 'dev'}
  region: ap-northeast-1
  memorySize: 128
  timeout: 900

custom:
  basePath: sf-api-gateway

functions:
  sf-api-gateway:
    handler: app.lambda_handler
    name: sf-api-gateway-${self:provider.stage}
    description: API Gateway for Salesforce's API

stepFunctions:
  stateMachines:
    state-machine:
      name: ${self:custom.basePath}-machine-${self:provider.stage}
      events:
        - http:
            path: ${self:custom.basePath}/start-execution
            method: post
            authorizer: aws_iam
            action: StartExecution
            iamRole:
              Fn::GetAtt: [SfApiGatewayRole, Arn]
            request:
              template:
                application/json: |-
                  {
                    "input": "$util.escapeJavaScript($input.json('$')).replaceAll("\\'", "'")",
                    "stateMachineArn":"arn:aws:states:${aws:region}:${aws:accountId}:stateMachine:${self:custom.basePath}-machine-${self:provider.stage}"
                  }
        - http:
            path: ${self:custom.basePath}/describe-execution
            method: post
            authorizer: aws_iam
            action: DescribeExecution
            iamRole:
              Fn::GetAtt: [SfApiGatewayRole, Arn]
            response:
              template:
                application/json: |-
                  {
                    "input": $util.parseJson($input.json('$.input')),
                    #if($input.path('$.output') != "")
                      "output": $util.parseJson($input.json('$.output')),
                    #end
                    "status": $input.json('$.status')
                  }
      definition:
        StartAt: sf-api-gateway-task
        States:
          sf-api-gateway-task:
            Type: Task
            Resource:
              Fn::GetAtt: [sf-api-gateway, Arn]
            End: true

resources:
  Resources:
    SfApiGatewayRole:
      Type: AWS::IAM::Role
      Properties:
        RoleName: sf-api-gateway-role-${self:provider.stage}
        Path: /service-role/
        AssumeRolePolicyDocument:
          Version: "2012-10-17"
          Statement:
          - Effect: Allow
            Principal:
              Service:
                - apigateway.amazonaws.com
                - lambda.amazonaws.com
              AWS:
                - arn:aws:iam::xxxxxxxxxxxx:role/YOUR-ROLE-NAME
            Action:
              - sts:AssumeRole
        ManagedPolicyArns:
          - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess
          - arn:aws:iam::aws:policy/AmazonAPIGatewayInvokeFullAccess

API GatewayとStep Functionsを組み合わせた非同期APIが最強だった話 - Qiitaを参考に、非同期APIの構成にしています。これによって、API Gatewayの29秒制限とLambdaの15分制限のギャップを埋めてLambdaの性能を目一杯使えるようにしています。

また、API Gatewayの実行ロール(SfApiGatewayRole)を定義し、このロールにスイッチロールできるロールをPrincipalのAWS項目に次のように設定しています。

              AWS:
                - arn:aws:iam::xxxxxxxxxxxx:role/YOUR-ROLE-NAME

ここはスイッチロール可能にしたいIAMロールのARNに書き換えてください。

定義している各API(start-executiondescribe-execution)には、IAM認証を行うように次のように設定しています。

            authorizer: aws_iam

この設定がないと、APIが認証なしで公開されてしまうので注意してください。

API Gatewayからのリクエストを仲介してSalesforce REST APIを呼び出すLambda(app.py)

import sys
import json
import requests
from simple_salesforce import Salesforce
from simple_salesforce.exceptions import SalesforceMalformedRequest
import logging

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

sf_conf = {}
exec(compile(open("sf.conf").read(), "sf.conf", 'exec'), sf_conf)
logger.debug("Salesforce設定ファイル(sf.conf)を読み込みました")

def exception_handler(e):
    logger.error("Exception name is %s. Detail is %s" % (type(e).__name__, e))
    return {
      "statusCode": 400,
      "body": json.dumps(e.content)
    }

def lambda_handler(event, context):
    access_token_url = sf_conf["access_token_url"]
    data = {
             'grant_type': 'password',
             'client_id' : sf_conf["client_id"],
             'client_secret' : sf_conf["client_secret"],
             'username'  : sf_conf["username"],
             'password'  : sf_conf["password"]
           }
    headers = { 'content-type': 'application/x-www-form-urlencoded' }
    response = requests.post(access_token_url,data=data,headers=headers)
    response = response.json()
    if response.get('error'):
        raise Exception(response.get('error_description'))

    session = requests.Session()
    sf = Salesforce(instance_url = response['instance_url'],
                    session_id=response['access_token'],
                    domain=sf_conf['domain'],
                    session=session)

    if "query" in event:
        try:
            return {
              "statusCode": 200,
              "body": json.dumps(sf.query_all(event["query"]))
            }
        except SalesforceMalformedRequest as e:
            return exception_handler(e)
        except Exception as e:
            return exception_handler(e)
    else:
        return {
          "statusCode": 200,
          "body": json.dumps([])
        }

# メイン関数
if __name__ == "__main__":
    lambda_handler({}, {})

queryでSOQLを引き取って、当該SOQLをSalesforce REST APIのQueryに対して実行し、結果を返すLambdaです。

Salesforceの接続にはsimple-salesforceを利用しています。query_allメソッドにSOQLを渡すことで、当該SOQLを実行しています。

例外発生時は次のコードで、statusCodeに400を指定し、例外の内容(e.content)を返しています。 これによって、不正なSOQLなどが実行された場合などに、エラーメッセージをAPI利用者が確認できるようにしています。

def exception_handler(e):
    logger.error("Exception name is %s. Detail is %s" % (type(e).__name__, e))
    return {
      "statusCode": 400,
      "body": json.dumps(e.content)
    }

Salesforceへの接続情報はsf.confファイルに下記のように設定しています。

import os
version = '53.0'
domain = "test"
username = "YOUR SALESFORCE USERNAME"
password = "YOUR SALESFORCE PASSWORD"
access_token_url = "https://test.salesforce.com/services/oauth2/token"
client_id = "接続アプリケーションのアクセスキー"
client_secret = "接続アプリケーションのシークレットキー"

この設定では、Sandboxへの接続を想定していますが、本番環境に接続する場合は、domainにlogin、access_token_urlにhttps://login.salesforce.com/services/oauth2/tokenを設定してください。

デプロイしたAPI Gatewayを利用するスクリプト(client.js)

(async () => {
  const apigClientFactory = require('aws-api-gateway-client').default

  const apigClient = apigClientFactory.newClient({
    invokeUrl: 'https://yyyyyyyyyy.execute-api.ap-northeast-1.amazonaws.com',
    accessKey: 'YOUR ACCESS KEY',
    secretKey: 'YOUR SECRET KEY',
    sessionToken: 'YOUR SECURITY TOKEN',
    region: 'ap-northeast-1'
  })

  const pathParams = {
    stage: 'dev',
    method: 'start-execution'
  }
  const pathTemplate = '/{stage}/sf-api-gateway/{method}'
  const method = 'POST'
  const additionalParams = {
    headers: {
    },
    queryParams: {
    }
  }
  const body = {
    query: "SELECT Id, Name, Email FROM Contact WHERE Email LIKE '%com' LIMIT 10"
  }

  try {
    let result = await apigClient.invokeApi(pathParams, pathTemplate, method, additionalParams, body)

    pathParams.method = 'describe-execution'
    delete body.query
    body.executionArn = result.data.executionArn

    do {
      result = await apigClient.invokeApi(pathParams, pathTemplate, method, additionalParams, body)
      /* wait 0.5sec */
      const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))
      await sleep(500)
    } while (result.data.status == "RUNNING")

    if (result.data.status == "SUCCEEDED") {
      console.log(result.data.output.body)
    } else {
      console.error(result.data)
    }

  } catch (e) {
    console.error(e)
  }
})();

Amazon API Gatewayの呼び出しを簡素化するaws-api-gateway-clientを利用しました。

serverless.ymlに設定したロール(arn:aws:iam::xxxxxxxxxxxx:role/YOUR-ROLE-NAME)から、SfApiGatewayRole(sf-api-gateway-role-${self:provider.stage})にassume-roleし、accessKey、secretKey、sessionTokenをassume-roleで生成された各値に設定して、

$ npm install aws-api-gateway-client
$ node client.js

で実行します。

queryとして、次のSOQLを渡しています。

  const body = {
    query: "SELECT Id, Name, Email FROM Contact WHERE Email LIKE '%com' LIMIT 10"
  }

実行結果として、Emailがcomで終わるメールアドレスをもつ取引先責任者のID、名前、メールアドレスが10件、JSONで取得できれば成功です。

なお、SOQLを次のように壊れた記述で記載すると、エラーメッセージを確認できます。

  const body = {
    query: "SELEC Id, Name, Email FROM Contact WHERE Email LIKE '%com' LIMIT 10"
  }

実行結果は次の通り。

$ node client.js | jq '.'
[
  {
    "message": "unexpected token: SELEC",
    "errorCode": "MALFORMED_QUERY"
  }
]

なお、先述の通り、非同期APIの構成にしているので、start-executionで得られたexecutionArnをパラメータに設定して、describe-executionを繰り返し呼び出しています。処理中の場合、describe-executionはstatusとしてRUNNINGを返すので、その場合は、0.5秒待って、再度リクエストを投げてstatusをチェックするというのを繰り返しています。

statusがRUNNING以外(つまり、FAILEDSUCCEEDED)になったら、結果を出力して終了しています。

まとめ

Salesforce REST APIの運用を楽にする方法として、Amazon API Gatewayを差し挟み、認証をAWS IAMに移譲する方法のサンプルを示しました。
コールするSalesforce REST APIを絞り込むことも可能ですし、APIを利用するユーザの払い出しもAWS IAMロールの払い出しで済むので手間、コスト共に軽減されます。
是非、活用してみてください。