V1ES(SWP) APIで推奨設定スキャンを自動実行してみた
こんにちは、シマです。
過去の記事で、Trend Vision One Endpoint Security(Server & Workload Protection)(以降V1ES(SWP))を対象に、EC2リストア後のポリシー自動割り当てまでを紹介しました。
前回の記事ではローカルコマンドでの推奨設定スキャン実行時に403エラーが発生していましたが、こちらは別記事で解消しています。
前回記事の中で「推奨設定スキャンはVision One APIでできそう」と書き残していたので、今回は宿題の回収としてAPIでの実装を試します。EC2新規構築をトリガーに推奨設定スキャンを自動化したい方の参考になれば幸いです。
全体構成

処理の流れは次のとおりです。
- EC2インスタンスの新規作成(
RunInstances)をEventBridgeがCloudTrail経由で検知 - EventBridgeルールによりLambda関数が起動
- Lambda関数がNameタグを確認し、対象外であればスキップ
- ステータスチェック完了を待機
- EC2のホスト名からV1ES(SWP)上のコンピュータIDを検索
- 対象コンピュータを指定したスケジュールタスクを
runNow: trueで作成し、即時スキャン実行 - 作成したスケジュールタスクを削除(タスクの蓄積防止)
V1ES(SWP)のAPIには「対象コンピュータに直接スキャンを実行する」という単発のAPIが存在しないため、対象コンピュータを指定したスケジュールタスクを作成・即時実行する方式を採用しています。タスクが溜まり続けないように、実行後は削除します。スキャン自体は非同期で進行するため、タスクを削除してもスキャン結果は問題なく反映されます。
使用するAPIキーについて
V1ES(SWP)のAPIキーはVision Oneコンソールから「Server & Workload Protection」→「管理」→「ユーザ管理」→「APIキー」から発行します。

Vision Oneコンソールの「Administration」→「API Keys」で発行したキーはVision One API用です。V1ES(SWP)のAPIエンドポイントに使用すると認証エラー(401)になりました。

CloudFormationテンプレート
Lambda関数、IAMロール、EventBridgeルール、Secrets Managerを一括作成します。サンプルコードは折りたたみで掲載します。
サンプルコード
AWSTemplateFormatVersion: '2010-09-09'
Description: V1ES(SWP) Recommendation Scan Automation
Parameters:
V1esApiSecretKey:
Type: String
NoEcho: true
Description: V1ES(SWP) API Secret Key
V1esApiHost:
Type: String
Default: https://workload.jp-1.cloudone.trendmicro.com/api
Description: V1ES(SWP) API Host URL
TargetNameTagPrefix:
Type: String
Default: ''
Description: Target EC2 Name tag prefix (empty = all instances)
Resources:
# Secrets Manager
V1esApiKeySecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: v1es-swp-api-key
SecretString: !Sub '{"api-secret-key": "${V1esApiSecretKey}"}'
# Lambda Role
ScanFunctionRole:
Type: AWS::IAM::Role
Properties:
RoleName: v1es-scan-function-role
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: ScanFunctionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- ec2:DescribeInstances
- ec2:DescribeInstanceStatus
Resource: '*'
- Effect: Allow
Action: secretsmanager:GetSecretValue
Resource: !Ref V1esApiKeySecret
# Lambda Function
ScanFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: v1es-recommendation-scan
Runtime: python3.12
Handler: index.lambda_handler
Timeout: 600
Role: !GetAtt ScanFunctionRole.Arn
Environment:
Variables:
API_HOST: !Ref V1esApiHost
SECRET_NAME: !Ref V1esApiKeySecret
TARGET_NAME_PREFIX: !Ref TargetNameTagPrefix
Code:
ZipFile: |
import json
import os
import time
import boto3
import urllib3
API_HOST = os.environ['API_HOST']
SECRET_NAME = os.environ['SECRET_NAME']
TARGET_NAME_PREFIX = os.environ.get('TARGET_NAME_PREFIX', '')
http = urllib3.PoolManager()
def get_api_key():
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=SECRET_NAME)
secret = json.loads(response['SecretString'])
return secret['api-secret-key']
def api_request(method, path, body=None):
headers = {
'api-secret-key': get_api_key(),
'api-version': 'v1',
'Content-Type': 'application/json',
}
url = f'{API_HOST}{path}'
response = http.request(method, url, headers=headers, body=json.dumps(body) if body else None)
if response.status >= 400:
raise Exception(f'API error {response.status}: {response.data.decode()}')
if response.status == 204 or not response.data:
return None
return json.loads(response.data.decode())
def get_instance_info(instance_id):
ec2 = boto3.client('ec2')
response = ec2.describe_instances(InstanceIds=[instance_id])
reservations = response['Reservations']
if not reservations:
return [], None
instance = reservations[0]['Instances'][0]
# V1ES(SWP)側はPublicDnsNameで登録されるケースとPrivateDnsNameで登録されるケースがあるため両方取得
hostnames = [h for h in [instance.get('PublicDnsName'), instance.get('PrivateDnsName')] if h]
tags = instance.get('Tags', [])
name_tag = next((t['Value'] for t in tags if t['Key'] == 'Name'), None)
return hostnames, name_tag
def wait_for_status_checks(instance_id):
ec2 = boto3.client('ec2')
waiter = ec2.get_waiter('instance_status_ok')
print(f'Waiting for status checks: {instance_id}')
waiter.wait(InstanceIds=[instance_id])
print(f'Status checks passed: {instance_id}')
def get_computer_id_by_hostnames(hostnames):
# V1ES(SWP)はPublic/Private両方のDNS名で登録される可能性があるため、
# 各ホスト名に対してワイルドカード前方一致で検索する
for hostname in hostnames:
body = {
'searchCriteria': [{
'fieldName': 'hostName',
'stringTest': 'equal',
'stringValue': f'{hostname}%',
'stringWildcards': True,
}]
}
result = api_request('POST', '/computers/search', body) or {}
computers = result.get('computers', [])
print(f'Computer search: query={hostname}%, matched={len(computers)}')
if computers:
return computers[0]['ID']
return None
def run_recommendation_scan(computer_id):
body = {
'name': f'Recommendation Scan - Lambda - computer {computer_id}',
'type': 'scan-for-recommendations',
'runNow': True,
'scheduleDetails': {
'recurrenceType': 'none',
'recurrenceCount': 1,
'onceOnlyScheduleParameters': {
'startTime': int(time.time() * 1000),
},
},
'scanForRecommendationsTaskParameters': {
'computerFilter': {
'type': 'computer',
'computerID': computer_id,
}
}
}
return api_request('POST', '/scheduledtasks', body)
def delete_scheduled_task(task_id):
api_request('DELETE', f'/scheduledtasks/{task_id}')
def lambda_handler(event, context):
items = event.get('detail', {}).get('responseElements', {}).get('instancesSet', {}).get('items', [])
if not items:
return {'statusCode': 400, 'body': json.dumps({'message': 'No instances found in event'})}
results = []
for item in items:
instance_id = item['instanceId']
try:
hostnames, name_tag = get_instance_info(instance_id)
if TARGET_NAME_PREFIX and (not name_tag or not name_tag.startswith(TARGET_NAME_PREFIX)):
print(f'Skipped: {instance_id} (Name={name_tag}) does not match prefix "{TARGET_NAME_PREFIX}"')
results.append({'instanceId': instance_id, 'status': 'skipped'})
continue
wait_for_status_checks(instance_id)
# ステータスチェック完了後、PublicDnsNameが割り当てられている可能性があるため再取得
hostnames, _ = get_instance_info(instance_id)
computer_id = get_computer_id_by_hostnames(hostnames)
if computer_id is None:
print(f'Computer not found in V1ES(SWP): hostnames={hostnames}')
results.append({'instanceId': instance_id, 'status': 'error', 'message': f'Computer not found: {hostnames}'})
continue
task = run_recommendation_scan(computer_id)
task_id = task.get('ID')
print(f'Recommendation scan triggered: taskId={task_id}, computerId={computer_id}')
delete_scheduled_task(task_id)
print(f'Scheduled task deleted: taskId={task_id}')
results.append({'instanceId': instance_id, 'status': 'success', 'computerId': computer_id})
except Exception as e:
print(f'Error processing {instance_id}: {e}')
results.append({'instanceId': instance_id, 'status': 'error', 'message': str(e)})
return {'statusCode': 200, 'body': json.dumps({'results': results})}
# EventBridge Rule (CloudTrail RunInstances)
Ec2RunInstancesRule:
Type: AWS::Events::Rule
Properties:
Name: v1es-ec2-runinstances-trigger
EventPattern:
source:
- aws.ec2
detail-type:
- AWS API Call via CloudTrail
detail:
eventSource:
- ec2.amazonaws.com
eventName:
- RunInstances
Targets:
- Arn: !GetAtt ScanFunction.Arn
Id: ScanFunctionTarget
# Permission for EventBridge to invoke Lambda
ScanFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref ScanFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt Ec2RunInstancesRule.Arn
Outputs:
ScanFunctionArn:
Value: !GetAtt ScanFunction.Arn
動作確認
EC2インスタンスを前回の記事のとおりリストアし、推奨設定スキャンが実行されることを確認します。スキャンの結果はV1ES(SWP)コンソールの「コンピュータ」画面から確認できます。各保護モジュールの「推奨設定」タブで、前回の推奨設定の検索日時が更新されていれば成功です。

まとめ
V1ES(SWP)のAPIとAWS Lambda、EventBridgeを組み合わせて、推奨設定スキャンをEC2新規構築時に自動実行する仕組みを構築しました。前回のポリシー自動割り当てと合わせることで、構築後の設定作業を手動操作なしで完了できます。
なお、実運用ではエージェントのセルフプロテクションを無効化してローカルコマンドで実行する方式の方がシンプルなケースもあります。詳しくは次の記事を参照してください。
本記事がどなたかのお役に立てれば幸いです。







