[Update] การใช้ Lambda Function เพื่อแปลงข้อมูลใน Firehose

[Update] การใช้ Lambda Function เพื่อแปลงข้อมูลใน Firehose

แนะนำการใช้งาน AWS Lambda ร่วมกับ Amazon Data Firehose เพื่อประมวลผลและแปลงข้อมูลแบบเรียลไทม์ โดยยกตัวอย่างการแปลงข้อมูล Timestamp ให้เป็นรูปแบบที่กำหนดก่อนส่งไปยังปลายทาง เช่น Amazon S3 หรือ Amazon Redshift เพิ่มความยืดหยุ่นและประสิทธิภาพในการจัดการข้อมูล

สวัสดีครับ POP จากบริษัท Classmethod (Thailand) ครับ

ครั้งนี้จะมาแนะนำการใช้งาน AWS Lambda ร่วมกับ Amazon Data Firehose เพื่อประมวลผลและแปลงข้อมูลแบบเรียลไทม์ โดยการแปลงข้อมูล Timestamp ให้เป็นรูปแบบที่กำหนดก่อนส่งไปยัง Amazon S3

แผนภาพโครงสร้าง

diagram-ec2_firehose_lambda_s3

สิ่งที่ต้องมี

สร้าง S3 Bucket ใน Amazon S3

สร้าง S3 Bucket ไว้เป็นพื้นที่สำหรับจัดเก็บข้อมูลเพื่อนำมาแสดงผลใน Firehose โดยจะใช้ชื่อตามด้านล่างนี้
・General purpose buckets: tinnakorn-test-s3

ดูตัวอย่างได้ที่ลิงก์ด้านล่างนี้
https://dev.classmethod.jp/articles/how-to-use-s3-from-ec2-and-using-aws-cli-s3/#toc--buckets--amazon-s3

สร้าง Firehose stream ใน Amazon Data Firehose

ค้นหาและเลือก Amazon Data Firehose
service_amazon_data_firehose-search-202502

คลิก Firehose streams จากเมนูด้านซ้าย แล้วคลิก Create Firehose stream ด้านขวา
service_amazon_data_firehose-menu_and_button-202502

เมื่อเข้ามาหน้าจอ Create Firehose stream จะตั้งค่าตามนี้
Choose source and destination
・Source: Direct PUT
・Destination: Amazon S3

Firehose stream name
Firehose stream name: tinnakorn-test-firehose
create_firehose_stream-202502-1

เลือก S3 bucket ที่สร้างก่อนหน้านี้
Destination settings
・S3 bucket: tinnakorn-test-s3
create_firehose_stream-202502-2

แล้วคลิก ▶ Buffer hints, compression, file extension and encryption เพื่อขยายหน้าต่าง แล้วตั้งค่า Buffer interval เป็น 60 เพื่อกำหนดให้มีการอัปเดตทุกๆ 60 วินาที
เมื่อตั้งค่าเสร็จแล้ว คลิก Create Firehose stream ที่ด้านล่างสุด
create_firehose_stream-202502-3

สร้าง Lambda Function

เข้ามาที่หน้าจอ AWS Lambda แล้วดำเนินการดังนี้

คลิก Functions จากเมนูด้านซ้าย แล้วคลิก Create function ด้านขวา
service_lambda_function-menu_and_button-202502

เมื่อเข้ามาหน้าจอ Create function จะตั้งค่าตามนี้
Create function
Author from scratch

Basic information
・Function name: tinnakorn-test-lambda
・Runtime: Python 3.13
・Architecture: x86_64 (Default)

เมื่อตั้งค่าเสร็จแล้ว คลิก Create function ที่ด้านล่างสุด
create_lambda_functions-202502-1

เมื่อสร้างเสร็จแล้วจะแสดงหน้าจอแบบนี้
create_lambda_functions-202502-2

สร้างฟังก์ชันแปลงข้อมูลวันที่ (Timestamp) ใน Lambda Function

ต่อไปจะตั้งค่าและแปลงรูปแบบวันที่ (Timestamp) ใน Lambda Function

ก่อนอื่นให้คัดลอก Code ด้านล่างนี้เตรียมไว้

อ้างอิง: Code transform date

import json
import base64
import datetime

def lambda_handler(event, context):
    results = []
    records = event["records"]
    for record in records:
        # read record
        recordId = record["recordId"]
        decoded_data = base64.b64decode(record["data"])
        payload = json.loads(decoded_data)
        print("payload: " + str(payload))

        # transform record
        tzinfo = datetime.timezone(datetime.timedelta(hours=7))
        timestamp = datetime.datetime.fromtimestamp(payload['timestamp']).astimezone(tzinfo)
        # timestamp = datetime.datetime.fromtimestamp(payload['timestamp'])
        transformed_payload = {
            "datetime": timestamp.strftime('%Y-%m-%d %H:%M:%S'),
            "distance": payload['payloads']['distance'],
        }
        print("transformed_payload: " + str(transformed_payload))

        # write record
        decoded_data = json.dumps(transformed_payload) + '\n'
        data = base64.b64encode(decoded_data.encode())

        results.append({
            "result": "Ok",
            "recordId": recordId,
            "data": data,
        })
    return {
        "records": results
    }

กล้บมาที่หน้าจอ Lambda Function ของเรา แล้วดำเนินการดังนี้
・เลือกแท็บ Code
・วาง Code ที่คัดลอกมาวางใน Code source (หลังจากวางไปแล้ว แท็บไฟล์จะแสดงเป็น lambda_function.py)
・คลิก Deploy (Ctrl+Shift+U) เพื่ออัปเดต Function ใน Lambda
create_function_change_date_time-1

ต่อไปจะแก้ไขเวลา Timeout โดยคลิกแท็บ Configuration แล้วเลือก General configuration จากเมนูด้านซ้าย และคลิก Edit
create_function_change_date_time-2

แล้วจะตั้งค่า Timeout ดังนี้
1 min 0 sec
・คลิก Save
create_function_change_date_time-3

ต่อไปจะทดสอบการทำงานของการแปลงข้อมูลวันที่ (Timestamp) ที่ตั้งค่ามาจนถึงตอนนี้ว่าสามารถใช้งานได้หรือไม่ โดยเลือกแท็บ Test แล้วคัดลอก Code ด้านล่างนี้วางในช่อง JSON ด้านล่าง แล้วคลิกปุ่ม Test ด้านขวา

※หากไม่ได้บันทึก Event name แล้วมีการ Refersh หน้าจอ Lambda Function นี้ Event JSON จะถูกคืนค่า ซึ่งเราสามารถบันทึก Event JSON ได้โดยป้อนชื่อ Event name และคลิก Save แต่ครั้งนี้จะไม่บันทึกเนื่องจากเป็นแค่การสาธิต)

{
  "records": []
}

create_function_change_date_time-4

ถ้า Code และการตั้งค่าถูกต้องก็จะแสดงแจ้งเตือนแบบนี้
เพียงเท่านี้ก็เสร็จสิ้นของขั้นตอนของ AWS Lambda แล้ว
create_function_change_date_time-5

สร้าง IAM Role

ในขั้นตอนนี้จะสร้าง IAM Role สำหรับใช้งานกับ EC2 โดยจะอนุญาตสิทธิ์การใช้งาน Firehose ให้กับ EC2

ดูตัวอย่างได้ที่ลิงก์ด้านล่างนี้ (วิธีการจะเหมือนกับลิงก์นี้ แต่ตอนเลือก Permissions policies ให้เลือก AmazonKinesisFirehoseFullAccess)
https://dev.classmethod.jp/articles/ssm-iam-role-for-use-with-ec2-instance/

ตัวอย่างการสร้าง IAM Role สำหรับ Firehose ในบทความนี้

Step 1
Select trusted entity
・Trusted entity type: AWS service
Use case
・Service or use case: EC2
・Use case: EC2

Step 2
Add permissions
・Permissions policies:
AmazonKinesisFirehoseFullAccess (อนุญาตให้ Firehose เข้าถึง EC2 Instance)
AmazonSSMManagedInstanceCore (อนุญาตให้เข้าถึง EC2 Instance ผ่าน SSM (AWS Systems Manager))

※หากสร้าง Key pair จะสามารถเชื่อมต่อ EC2 Instance ผ่าน SSH ได้จากช่องทางอื่นๆ เช่น เชื่อมต่อ Instance ด้วย PuTTY ดังนั้น "AmazonSSMManagedInstanceCore" อาจไม่จำเป็นต้องเพิ่มใน IAM Role
แต่ครั้งนี้ไม่ได้ใช้ Key pair จึงต้องใช้ SSM เพื่อเชื่อมต่อ Instance

Step 3
Name, review, and create
Role details
・Role name: tinnakorn-test-ec2-firehose-role

สร้าง EC2 Instance

ขั้นตอนนี้จะสร้าง EC2 Instance สำหรับใช้เชื่อมต่อกับ Firehose

ดูตัวอย่างได้ที่ลิงก์ด้านล่างนี้
https://dev.classmethod.jp/articles/how-to-install-amazon-linux-2023-on-ec2-in-thailand-region/

ตัวอย่างการสร้าง EC2 Instance ในบทความนี้

Region: Thailand

※Launch instances
Name and tags
・Name: tinnakorn-test-ec2 (ป้อนชื่อตามต้องการ)

Application and OS Images (Amazon Machine Image)
・Amazon Machine Image (AMI): Amazon Linux 2023 AMI

Instance type
・Instance type: t2.micro (Default)

Key pair (login)
・Key pair name - required: tinnakorn-test-ec2
Proceed without a key pair (Not recommended) (ครั้งนี้จะไม่ใช้ Key pair)

Network settings
Firewall (security groups)
・Security group name - required: tinnakorn-test-ec2 (ตั้งชื่อที่ต้องการ)
・Description - required: tinnakorn-test-ec2 (ป้อนตามต้องการ)
Inbound security groups rules
・คลิก Remove (ครั้งนี้จะไม่ตั้งค่า Security group rule)

Configure storage
1x: 8 GiB gp3 Root volume (Default)

ー สามารถเพิ่ม SSM Role ได้ที่ Advanced details ー

คลิก Advanced details เพื่อขยายหน้าจอ แล้วเลือก Role ที่เราสร้างไว้ก่อนหน้านี้
IAM instance profile: tinnakorn-test-ec2-firehose-role
iam_instance_profile-ec2_firehose_role

เตรียม EC2 Instance สำหรับ Firehose

ครั้งนี้จะรันคำสั่งใน SSM (AWS Systems Manager)
เชื่อมต่อ Instance ด้วย SSM แล้วดำเนินการรันคำสั่งตามนี้

เมื่อเชื่อมต่อเข้ามาได้แล้วให้รันคำสั่งใช้งานสิทธิ์ root ตามด้านล่างนี้

sudo su -

แล้วรันคำสั่ง Update server ให้เป็นปัจจุบันเสมอ

yum update -y

ตรวจสอบเวอร์ชัน Python

ต่อไปตรวจสอบเวอร์ชัน Python ว่าตอนนี้มีอยู่ในเซิร์ฟเวอร์หรือไม่

python3 --version

เปลี่ยน Time Zone EC2

รันคำสั่งนี้เพื่อเปลี่ยนเวลาจาก UTC ให้เป็น +07

timedatectl set-timezone Asia/Bangkok
date

ดูข้อมูลเพิ่มเติมเกี่ยวกับ Time Zone ได้ที่ลิงก์ด้านล่างนี้
วิธีตั้งค่า Time Zone ใน Amazon Linux 2 ของ EC2

ติดตั้ง PIP

รันคำสั่งติดตั้งแพ็กเกจ python3-pip

yum install python3-pip -y

รันคำสั่งตรวจสอบเวอร์ชันของ PIP ที่ติดตั้งอยู่ในระบบของเรา

pip3 --version

แล้วติดตั้ง PIP (Package Installer for Python)
*boto3 คือ AWS SDK ที่ใช้งานสำหรับ Python โดยเฉพาะทำให้ง่ายต่อการทำงานร่วมกันของโค้ดหรือโปรแกรมที่เขียนด้วย Python

pip3 install boto3

ดูรายละเอียดเพิ่มเติมได้ที่ลิงก์ด้านล่างนี้
ดูเฉพาะหัวข้อนี้: รันคำสั่งใน PuTTY

เมื่อติดตั้งคำสั่งเหล่านี้เสร็จแล้ว ดำเนินการในขั้นตอนถัดไป

การส่งข้อมูลไปยัง Firehose

สร้างโปรแกรมทดสอบการส่งข้อมูลไปยัง Firehose

รันคำสั่งย้ายไปยัง /root (หากทำตามขั้นตอนนี้ทั้งหมดจะอยู่ใน /root อยู่แล้ว)
*หากต้องการตรวจสอบ path ปัจจุบันให้ใช้คำสั่ง pwd

cd ~/

รันคำสั่งดาวน์โหลดไฟล์ create_sample_json_to_firehose2.py จาก GitHub Repository ลงใน EC2 Instance

wget https://raw.githubusercontent.com/classmethod-thailand/cmth_seminar/develop/iot_core_webinar/create_sample_json_to_firehose2.py

รันคำสั่ง chmod + โหมดสิทธิ์ ใช้กำหนดสิทธิ์การเข้าถึงไฟล์หรือโฟลเดอร์ โดย chmod 777 อนุญาตให้ทุกคนสามารถอ่าน, เขียน และรันไฟล์ได้

chmod 777 create_sample_json_to_firehose2.py

รันคำสั่ง vi แก้ไขไฟล์ create_sample_json_to_firehose2.py ที่ดาวน์โหลดเมื่อสักครู่นี้

vi create_sample_json_to_firehose2.py

แล้วแก้ไขไฟล์ดังนี้
・กดปุ่ม i ที่แป้นพิมพ์ ให้ "create_sample_json_to_firehose2.py" 26L, 602B /変更かも ที่อยู่ด้านล่างซ้ายเปลี่ยนเป็น --INSERT--
・แล้วแก้ไขไฟล์โดยเปลี่ยน firehose_stream_name ให้เป็นชื่อ Firehose streams ของเรา เช่น tinnakorn-test-firehose
・แล้วกดปุ่ม Esc ให้คำว่า -- INSERT -- หายไป และพิมพ์ :x + Enter เพื่อบันทึกและออกจากไฟล์
Output (Example)

import json
import datetime
import random
import math
import boto3

firehose_stream_name = "tinnakorn-test-firehose"

hour = datetime.datetime.now().hour
a1 = math.sin(hour * math.pi / 24) + 1
dist = random.lognormvariate(a1, a1 / 3)
if dist <= 0.1:
    dist = 0.1

json_obj = {
    "timestamp": datetime.datetime.now().timestamp(),
    "payloads":{
        "distance": dist,
    }
}
send_data = (json.dumps(json_obj) + "\n").encode()

client = boto3.client('firehose', region_name='ap-southeast-1')
response = client.put_record(
    DeliveryStreamName = firehose_stream_name,
    Record = {"Data": send_data})
~
~
:x

รันสคริปต์ Python เพื่ออัปโหลดไฟล์ไปยัง Amazon S3 แล้วรอประมาณ 60 วินาทีตามที่ได้ตั้งค่า Buffer interval ไว้

python3 create_sample_json_to_firehose2.py

เมื่อผ่านไป 60 วินาทีแล้ว เข้าไปที่บริการ Amazon S3 แล้วคลิกเข้าไปที่โฟลเดอร์ด้านในสุดจะเห็นไฟล์ที่ถูกอัปโหลดเข้ามาแสดงอยู่ที่นี่ ก็ให้ติ๊ก ✅️ แล้วคลิก Download ไฟล์ลงอุปกรณ์ของเรา
enable_lambda_function_in_firehose-1

แล้วเปิดไฟล์เพื่อดูข้อมูลที่อยู่ในไฟล์เหล่านี้ ก็จะเห็นข้อมูลที่เป็น "distance" และ "datetime" แบบนี้
โดยในส่วนของ Timestamp ตัวเลขเวลาจะเป็นจุดทศนิยม ซึ่งทำให้ไม่สามารถอ่านค่าได้ว่าเป็นวันที่เวลาเท่าไหร่
ดังนั้นจึงต้องใช้ Lambda Function มาเป็นตัวช่วยในการแปลงข้อมูลวันที่ (Timestamp) ในขั้นตอนถัดไป
enable_lambda_function_in_firehose-2

เปิดใช้งาน Lambda function ใน Firehose

ให้เปิดใช้งาน transformation สำหรับ Lambda Function ที่สร้างเตรียมไว้ในตอนแรกเพื่อประมวลผลและแปลงข้อมูลแบบเรียลไทม์ โดยการแปลงข้อมูล Timestamp ให้เป็นรูปแบบที่กำหนดก่อนส่งไปยัง Amazon S3

กลับมาที่หน้าจอ Firehose stream ของเรา แล้วตั้งค่า transformation ดังนี้
・เลือกแท็บ Configuration
・คลิก Edit ในหัวข้อ "Transform and convert records"
enable_lambda_function_in_firehose-1

・ติ๊ก ✅️ Turn on data transformation ในหัวข้อ "Transform source records with AWS Lambda"
・AWS Lambda function: tinnakorn-test-lambda (เลือก Lambda Function ที่สร้างเตรียมไว้ในตอนแรก)
・คลิก Save changes
enable_lambda_function_in_firehose-2

ทดสอบการทำงาน Lambda function ใน Firehose

การบวนการทำงานมีดังนี้

รันสคริปต์ Python เพื่ออัปโหลดไฟล์ไปยัง Amazon S3 อีกครั้ง แล้วรอประมาณ 60 วินาทีตามที่ได้ตั้งค่า Buffer interval ไว้

python3 create_sample_json_to_firehose2.py

แล้วเข้าไปที่บริการ Amazon S3 เพื่อดาวน์โหลดไฟล์ล่าสุดอีกครั้ง

แล้วเปิดดูก็จะพบว่าข้อมูล Timestamp จากก่อนหน้านี้ที่เป็นทศนิยมได้เปลี่ยนเป็นรูปแบบของวันที่และเวลาที่เราสามารถเข้าใจได้แล้ว
enable_lambda_function_in_firehose-3

นี้คือการเปรียบเทียบผลลัพธ์ของการใช้งาน Lambda function ในการแปลงข้อมูล Timestamp ออกมาในรูปแบบปกติ

download_from_s3-2

ลบ Resource

หากเราไม่ต้องการใช้งานแล้วก็ควรจะลบ Resource เพื่อประหยัดค่าใช้จ่ายโดยดูวิธีการลบในหัวข้อนี้ได้เลย

  • EC2
    • Instances
    • Key pairs
    • Security Groups
  • Amazon Data Firehose
    • Firehose streams
  • Identity and Access Management (IAM)
    • Roles
    • Policies
  • Amazon S3
    • Buckets
  • Lambda
    • Functions

การ Terminate Instance, ลบ Key Pair และลบ Security Group ใน EC2

ดูวิธีการลบ Resource ใน EC2 เช่น Instance, Key Pair และ Security Group ได้ที่ลิงก์ด้านล่างนี้
https://dev.classmethod.jp/articles/terminate-instances-delete-key-pairs-and-delete-security-groups-in-ec2/

ลบ Firehose stream ใน Amazon Data Firehose

ดูตัวอย่างที่นี่เฉพาะหัวข้อนี้: ลบ Firehose stream ใน Amazon Data Firehose

ลบ Identity and Access Management (IAM)

ลบ Roles และ Policies โดยดูตัวอย่างที่ลิงก์ด้านล่างนี้
ดูตัวอย่างที่นี่เฉพาะหัวข้อนี้: ลบ Identity and Access Management (IAM)

ลบ Bucket ใน Amazon S3

ดูตัวอย่างที่นี่เฉพาะหัวข้อนี้: ลบ Bucket ใน Amazon S3

ลบ Functions ใน Lambda

เข้ามาที่หน้าจอบริการ "Lambda > Functions" ค้นหาและเลือก Functions ที่ต้องการลบ แล้วคลิก Delete แล้วยืนยันการลบตามคำแนะนำ
delete_lambda_function-1

ผมหวังว่าบทความนี้จะเป็นประโยชน์ให้กับผู้อ่านได้นะครับ

POP (Tinnakorn Maneewong) จากบริษัท Classmethod (Thailand) ครับ !

บทความที่เกี่ยวข้อง

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.