Lambda(Python)からSalesforce APIを自己証明書を使って安全にコールしてみた

2023.12.26

情報システム室 進地 です。

Salesforce APIを外部から安全にコールする方法に自己証明書を利用したOAuth 2.0 JWT Bearer Flowがあります。この方法を使うことでSalesforceのログイン情報をやり取りすることなしに認証を行うことができます。

このエントリーでは、

  • Salesorce APIをコールする外部システムはAWS Lambdaで実装する
  • AWS LambdaはPython3で実装する
  • 全てのCredential情報はAWS Systems Managerのパラメータストアに保持する

という形でサンプルプログラムを組んでみました。

自己証明書の作成

自己証明書をサクッと作成します。

# 秘密鍵の作成
$ openssl genrsa 2048 > salesforce.pem
# CSRの作成
$ openssl req -new -key salesforce.pem > salesforce.csr
# 入力情報は必要に応じて変更する
Country Name (2 letter code) [XX]:JP
State or Province Name (full name) []:Tokyo 
Locality Name (eg, city) [Default City]:Minato
Organization Name (eg, company) [Default Company Ltd]:Classmethod, Inc.
Organizational Unit Name (eg, section) []:Information System Office
Common Name (eg, your name or your server's hostname) []:localhost
Email Address []:<Enter>

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:<Enter>
An optional company name []:<Enter>
# 自己証明書の作成
$ openssl x509 -days 365 -req -signkey salesforce.pem < salesforce.csr > salesforce.crt
# CSRを削除
$ rm salesforce.csr

Salesforceの接続アプリケーション設定

Salesforceに接続アプリケーションを作成します。

[設定]>[アプリケーションマネージャ]>[新規接続アプリケーション]から下記の設定で作成します。

項目 設定値
OAuth 設定の有効化 チェックする
接続アプリケーション名 任意の値を指定
API 参照名 任意の値を指定
取引先責任者 メール 任意のメールアドレスを指定
コールバック URL https://login.salesforce.com/services/oauth2/success(適当な値で問題ないです)|
デジタル署名を使用 チェックして、「ファイルを選択」ボタンから作成した自己証明書(*.crtファイル)を指定します
選択した OAuth 範囲 「APIを使用してユーザーデータを管理(api)」と「いつでも要求を実行(refresh_token, offline_access)」を指定
他の項目 入力およびチェックしない

作成した自己証明書をデジタル署名としてセットすることを忘れないでください。

接続アプリケーションを作成したら、当該接続アプリケーションの[Manage]>[ポリシーを編集]に移動して、OAuthポリシーの[許可されているユーザー]に「管理者が承認したユーザーは事前承認済み」を指定します。

また、当該接続アプリケーションの[参照]>[コンシューマーの詳細を管理]に移動して、「コンシューマー鍵」を取得しておきます。これは次のSSM(Systems Manager)設定で使います。

SSM(Systems Manager)設定

Salesforceの各種Credential情報はAWS Systems Manager(以下、SSMと表記)のパラメータストアに安全に保持します。

秘密鍵

作成した秘密鍵(*.pem)を丸ごと一つのパラメータとして保存しておきます。パラメータのタイプには「安全な文字列」を選択して、暗号化します。値に秘密鍵の内容を貼り付けてください。ここでは、パラメータ名をsalesforce-pemとしました。

その他のCredentials

その他のCredentialsを下記のフォーマット(JSON形式)で一つのパラメータとして保存しておきます。こちらも秘密鍵と同様にタイプに「安全な文字列」を選択して暗号化します。

{
  "CONSUMER_ID": "<Salesforceの接続アプリケーションから取得した「コンシューマー鍵」の値>",
  "USERNAME": "<Salesforceの接続に用いるSalesforceのユーザ名>"
}

Lambdaのサンプルコード(Python)

これらの情報を使ったSalesforce APIコールのサンプルを示します。Pythonによる記述例になります。

Salesforceへの接続にはsimple_salesforceライブラリを使います。

Salesforce APIクライアント

utils/salesforce_api_client.py

import os
import jwt
import requests
import datetime
from simple_salesforce import Salesforce
from simple_salesforce.exceptions import SalesforceAuthenticationFailed
from utils.ssm_util import fetch_credentials


class SalesforceApiClient:
    """
    Salesforce関係を扱うクラス。
    """

    def __init__(self):
        private_key = fetch_credentials(os.environ['SALESFORCE_SHARING_SSO_PEM'], True)
        credentials = fetch_credentials(os.environ['SALESFORCE_SHARING_SSO_CREDINTIALS'])
        endpoint = 'https://login.salesforce.com'

        jwt_payload = jwt.encode(
            {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(seconds=30),
                'iss': credentials['CONSUMER_ID'],
                'aud': endpoint,
                'sub': credentials['USERNAME']
            },
            private_key,
            algorithm='RS256'
        )

        result = requests.post(
            endpoint + '/services/oauth2/token',
            data={
                'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
                'assertion': jwt_payload
            }
        )
        self.body = result.json()

        if result.status_code != 200:
            raise SalesforceAuthenticationFailed(self.body['error'], self.body['error_description'])

    def init_salesforce(self):
        return Salesforce(instance_url=self.body['instance_url'], session_id=self.body['access_token'])

後述するSSMクライアントのfetch_credentialsメソッドを使って、SSMパラメータストアから秘密鍵やその他Credentialsを取得しています(ハイライト部)。

SSMクライアント

utils/ssm_util.py

import logging
import json
import boto3

logging.basicConfig(
    format="""
        %(asctime)s %(module)s:%(funcName)s(%(lineno)d) -
        %(levelname)s -
        %(message)s
        """
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

ssm_client = boto3.client("ssm")


class SsmUtil:
    """
    AWS Systems Managerに関する処理を扱う
    """

    @classmethod
    def get_parameter_values(self, param_name: str):
        """
        指定されたパラメータストアに格納された値を取得する
        """
        return ssm_client.get_parameter(Name=param_name, WithDecryption=True)[
            "Parameter"
        ]["Value"]


def fetch_credentials(param_name: str, raw=False) -> dict:
    """
    AWS Systems Manager パラメータストアに格納された各種クレデンシャル情報を取得する
    """
    try:
        if raw:
            return SsmUtil.get_parameter_values(param_name)
        else:
            return json.loads(SsmUtil.get_parameter_values(param_name))
    except Exception as e:
        logger.error(f"パラメータストアからクレデンシャル情報を取得できませんでした: {e}")
        raise

fetch_credentialsは第二引数にTrueを指定されたら、json.loadsを行わずにそのまま値を返します。

利用例

後は、Lambdaのmain処理などで次のように使います。

index.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import logging
import json
import requests
from utils.salesforce_api_client import SalesforceApiClient

# ロギング用
logging.basicConfig(format='%(asctime)s - %(threadName)s - %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

def lambda_handler(
    event: dict,
    context: dict
) -> dict:
    """
    実質メイン処理
    """
    logger.info("処理開始 [__PROGRESS__]")

    param = json.loads(event["body"])

    # クライアントを作成
    salesforce = SalesforceApiClient().init_salesforce()

    # パラメータemailでSalesforceユーザのメールアドレスが渡ってきているとする
    # SOQLを発行してユーザ情報を検索する
    sf_user = salesforce.query("""
         SELECT
             Id, Name, IsActive
         FROM
             User
         WHERE
             Email = '{0}'
         LIMIT 1
    """.format(param.get("email")))

    logger.info(sf_user)

    if sf_user['records'][0]['IsActive']:
        statusCode = 200
        body = {"state": f"有効なユーザが見つかりました。"}
    else:
        statusCode = 400
        body = {"state": f"指定の有効なユーザは見つかりませんでした。"}

    logger.info("処理終了 [__PROGRESS__]")

    return {
        "statusCode": statusCode,
        "headers": {"Access-Control-Allow-Origin": "*"},
        "body": json.dumps(body)
    }

if __name__ == "__main__":
    event = {
    }
    lambda_handler(event, {})

留意事項

  • 自己証明書は今回期限を365日で作ったので、1年経過時点で証明書の更新が必要であることに留意してください。
  • 実際に実行する際には、例えばLambdaであれば、実行ロールにSSMの読み取り権限の付与が必要です。CDKで記述するなら次のようになります。
    const lambdaRole = new Role(this, "SalesforceLambdaRole", {
      assumedBy: new ServicePrincipal("lambda.amazonaws.com"),
      managedPolicies: [
        ManagedPolicy.fromManagedPolicyArn(
          this,
          "lambda",
          "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
        ),
        ManagedPolicy.fromAwsManagedPolicyName('AmazonSSMReadOnlyAccess')
      ],
      description: "Basic Lambda Role"
    });