V1ES(SWP) APIで推奨設定スキャンを自動実行してみた

V1ES(SWP) APIで推奨設定スキャンを自動実行してみた

2026.04.17

こんにちは、シマです。

過去の記事で、Trend Vision One Endpoint Security(Server & Workload Protection)(以降V1ES(SWP))を対象に、EC2リストア後のポリシー自動割り当てまでを紹介しました。
https://dev.classmethod.jp/articles/202602-v1es-swp-restore-02/

前回の記事ではローカルコマンドでの推奨設定スキャン実行時に403エラーが発生していましたが、こちらは別記事で解消しています。

https://dev.classmethod.jp/articles/202603-v1es-swp-selfprotection-01/

前回記事の中で「推奨設定スキャンはVision One APIでできそう」と書き残していたので、今回は宿題の回収としてAPIでの実装を試します。EC2新規構築をトリガーに推奨設定スキャンを自動化したい方の参考になれば幸いです。

全体構成

20260417shima01

処理の流れは次のとおりです。

  1. EC2インスタンスの新規作成(RunInstances)をEventBridgeがCloudTrail経由で検知
  2. EventBridgeルールによりLambda関数が起動
  3. Lambda関数がNameタグを確認し、対象外であればスキップ
  4. ステータスチェック完了を待機
  5. EC2のホスト名からV1ES(SWP)上のコンピュータIDを検索
  6. 対象コンピュータを指定したスケジュールタスクをrunNow: trueで作成し、即時スキャン実行
  7. 作成したスケジュールタスクを削除(タスクの蓄積防止)

V1ES(SWP)のAPIには「対象コンピュータに直接スキャンを実行する」という単発のAPIが存在しないため、対象コンピュータを指定したスケジュールタスクを作成・即時実行する方式を採用しています。タスクが溜まり続けないように、実行後は削除します。スキャン自体は非同期で進行するため、タスクを削除してもスキャン結果は問題なく反映されます。

使用するAPIキーについて

V1ES(SWP)のAPIキーはVision Oneコンソールから「Server & Workload Protection」→「管理」→「ユーザ管理」→「APIキー」から発行します。

20260417shima02

Vision Oneコンソールの「Administration」→「API Keys」で発行したキーはVision One API用です。V1ES(SWP)のAPIエンドポイントに使用すると認証エラー(401)になりました。

20260417shima03

CloudFormationテンプレート

Lambda関数、IAMロール、EventBridgeルール、Secrets Managerを一括作成します。サンプルコードは折りたたみで掲載します。

サンプルコード
AWSTemplateFormatVersion: '2010-09-09'
Description: V1ES(SWP) Recommendation Scan Automation

Parameters:
  V1esApiSecretKey:
    Type: String
    NoEcho: true
    Description: V1ES(SWP) API Secret Key
  V1esApiHost:
    Type: String
    Default: https://workload.jp-1.cloudone.trendmicro.com/api
    Description: V1ES(SWP) API Host URL
  TargetNameTagPrefix:
    Type: String
    Default: ''
    Description: Target EC2 Name tag prefix (empty = all instances)

Resources:
  # Secrets Manager
  V1esApiKeySecret:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: v1es-swp-api-key
      SecretString: !Sub '{"api-secret-key": "${V1esApiSecretKey}"}'

  # Lambda Role
  ScanFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: v1es-scan-function-role
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: ScanFunctionPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - ec2:DescribeInstances
                  - ec2:DescribeInstanceStatus
                Resource: '*'
              - Effect: Allow
                Action: secretsmanager:GetSecretValue
                Resource: !Ref V1esApiKeySecret

  # Lambda Function
  ScanFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: v1es-recommendation-scan
      Runtime: python3.12
      Handler: index.lambda_handler
      Timeout: 600
      Role: !GetAtt ScanFunctionRole.Arn
      Environment:
        Variables:
          API_HOST: !Ref V1esApiHost
          SECRET_NAME: !Ref V1esApiKeySecret
          TARGET_NAME_PREFIX: !Ref TargetNameTagPrefix
      Code:
        ZipFile: |
          import json
          import os
          import time
          import boto3
          import urllib3

          API_HOST = os.environ['API_HOST']
          SECRET_NAME = os.environ['SECRET_NAME']
          TARGET_NAME_PREFIX = os.environ.get('TARGET_NAME_PREFIX', '')
          http = urllib3.PoolManager()

          def get_api_key():
              client = boto3.client('secretsmanager')
              response = client.get_secret_value(SecretId=SECRET_NAME)
              secret = json.loads(response['SecretString'])
              return secret['api-secret-key']

          def api_request(method, path, body=None):
              headers = {
                  'api-secret-key': get_api_key(),
                  'api-version': 'v1',
                  'Content-Type': 'application/json',
              }
              url = f'{API_HOST}{path}'
              response = http.request(method, url, headers=headers, body=json.dumps(body) if body else None)
              if response.status >= 400:
                  raise Exception(f'API error {response.status}: {response.data.decode()}')
              if response.status == 204 or not response.data:
                  return None
              return json.loads(response.data.decode())

          def get_instance_info(instance_id):
              ec2 = boto3.client('ec2')
              response = ec2.describe_instances(InstanceIds=[instance_id])
              reservations = response['Reservations']
              if not reservations:
                  return [], None
              instance = reservations[0]['Instances'][0]
              # V1ES(SWP)側はPublicDnsNameで登録されるケースとPrivateDnsNameで登録されるケースがあるため両方取得
              hostnames = [h for h in [instance.get('PublicDnsName'), instance.get('PrivateDnsName')] if h]
              tags = instance.get('Tags', [])
              name_tag = next((t['Value'] for t in tags if t['Key'] == 'Name'), None)
              return hostnames, name_tag

          def wait_for_status_checks(instance_id):
              ec2 = boto3.client('ec2')
              waiter = ec2.get_waiter('instance_status_ok')
              print(f'Waiting for status checks: {instance_id}')
              waiter.wait(InstanceIds=[instance_id])
              print(f'Status checks passed: {instance_id}')

          def get_computer_id_by_hostnames(hostnames):
              # V1ES(SWP)はPublic/Private両方のDNS名で登録される可能性があるため、
              # 各ホスト名に対してワイルドカード前方一致で検索する
              for hostname in hostnames:
                  body = {
                      'searchCriteria': [{
                          'fieldName': 'hostName',
                          'stringTest': 'equal',
                          'stringValue': f'{hostname}%',
                          'stringWildcards': True,
                      }]
                  }
                  result = api_request('POST', '/computers/search', body) or {}
                  computers = result.get('computers', [])
                  print(f'Computer search: query={hostname}%, matched={len(computers)}')
                  if computers:
                      return computers[0]['ID']
              return None

          def run_recommendation_scan(computer_id):
              body = {
                  'name': f'Recommendation Scan - Lambda - computer {computer_id}',
                  'type': 'scan-for-recommendations',
                  'runNow': True,
                  'scheduleDetails': {
                      'recurrenceType': 'none',
                      'recurrenceCount': 1,
                      'onceOnlyScheduleParameters': {
                          'startTime': int(time.time() * 1000),
                      },
                  },
                  'scanForRecommendationsTaskParameters': {
                      'computerFilter': {
                          'type': 'computer',
                          'computerID': computer_id,
                      }
                  }
              }
              return api_request('POST', '/scheduledtasks', body)

          def delete_scheduled_task(task_id):
              api_request('DELETE', f'/scheduledtasks/{task_id}')

          def lambda_handler(event, context):
              items = event.get('detail', {}).get('responseElements', {}).get('instancesSet', {}).get('items', [])
              if not items:
                  return {'statusCode': 400, 'body': json.dumps({'message': 'No instances found in event'})}

              results = []
              for item in items:
                  instance_id = item['instanceId']
                  try:
                      hostnames, name_tag = get_instance_info(instance_id)

                      if TARGET_NAME_PREFIX and (not name_tag or not name_tag.startswith(TARGET_NAME_PREFIX)):
                          print(f'Skipped: {instance_id} (Name={name_tag}) does not match prefix "{TARGET_NAME_PREFIX}"')
                          results.append({'instanceId': instance_id, 'status': 'skipped'})
                          continue

                      wait_for_status_checks(instance_id)

                      # ステータスチェック完了後、PublicDnsNameが割り当てられている可能性があるため再取得
                      hostnames, _ = get_instance_info(instance_id)

                      computer_id = get_computer_id_by_hostnames(hostnames)
                      if computer_id is None:
                          print(f'Computer not found in V1ES(SWP): hostnames={hostnames}')
                          results.append({'instanceId': instance_id, 'status': 'error', 'message': f'Computer not found: {hostnames}'})
                          continue

                      task = run_recommendation_scan(computer_id)
                      task_id = task.get('ID')
                      print(f'Recommendation scan triggered: taskId={task_id}, computerId={computer_id}')

                      delete_scheduled_task(task_id)
                      print(f'Scheduled task deleted: taskId={task_id}')

                      results.append({'instanceId': instance_id, 'status': 'success', 'computerId': computer_id})
                  except Exception as e:
                      print(f'Error processing {instance_id}: {e}')
                      results.append({'instanceId': instance_id, 'status': 'error', 'message': str(e)})

              return {'statusCode': 200, 'body': json.dumps({'results': results})}

  # EventBridge Rule (CloudTrail RunInstances)
  Ec2RunInstancesRule:
    Type: AWS::Events::Rule
    Properties:
      Name: v1es-ec2-runinstances-trigger
      EventPattern:
        source:
          - aws.ec2
        detail-type:
          - AWS API Call via CloudTrail
        detail:
          eventSource:
            - ec2.amazonaws.com
          eventName:
            - RunInstances
      Targets:
        - Arn: !GetAtt ScanFunction.Arn
          Id: ScanFunctionTarget

  # Permission for EventBridge to invoke Lambda
  ScanFunctionPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref ScanFunction
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn: !GetAtt Ec2RunInstancesRule.Arn

Outputs:
  ScanFunctionArn:
    Value: !GetAtt ScanFunction.Arn

動作確認

EC2インスタンスを前回の記事のとおりリストアし、推奨設定スキャンが実行されることを確認します。スキャンの結果はV1ES(SWP)コンソールの「コンピュータ」画面から確認できます。各保護モジュールの「推奨設定」タブで、前回の推奨設定の検索日時が更新されていれば成功です。

20260417shima04

まとめ

V1ES(SWP)のAPIとAWS Lambda、EventBridgeを組み合わせて、推奨設定スキャンをEC2新規構築時に自動実行する仕組みを構築しました。前回のポリシー自動割り当てと合わせることで、構築後の設定作業を手動操作なしで完了できます。

なお、実運用ではエージェントのセルフプロテクションを無効化してローカルコマンドで実行する方式の方がシンプルなケースもあります。詳しくは次の記事を参照してください。

https://dev.classmethod.jp/articles/202603-v1es-swp-selfprotection-01/

本記事がどなたかのお役に立てれば幸いです。

この記事をシェアする

関連記事