[Update] การใช้ Lambda Function เพื่อแปลงข้อมูลใน Firehose
สวัสดีครับ POP จากบริษัท Classmethod (Thailand) ครับ
ครั้งนี้จะมาแนะนำการใช้งาน AWS Lambda ร่วมกับ Amazon Data Firehose เพื่อประมวลผลและแปลงข้อมูลแบบเรียลไทม์ โดยการแปลงข้อมูล Timestamp ให้เป็นรูปแบบที่กำหนดก่อนส่งไปยัง Amazon S3
แผนภาพโครงสร้าง
สิ่งที่ต้องมี
สร้าง S3 Bucket ใน Amazon S3
สร้าง S3 Bucket ไว้เป็นพื้นที่สำหรับจัดเก็บข้อมูลเพื่อนำมาแสดงผลใน Firehose โดยจะใช้ชื่อตามด้านล่างนี้
・General purpose buckets: tinnakorn-test-s3
ดูตัวอย่างได้ที่ลิงก์ด้านล่างนี้
สร้าง Firehose stream ใน Amazon Data Firehose
ค้นหาและเลือก Amazon Data Firehose
คลิก Firehose streams
จากเมนูด้านซ้าย แล้วคลิก Create Firehose stream
ด้านขวา
เมื่อเข้ามาหน้าจอ Create Firehose stream จะตั้งค่าตามนี้
Choose source and destination
・Source: Direct PUT
・Destination: Amazon S3
Firehose stream name
Firehose stream name: tinnakorn-test-firehose
เลือก S3 bucket ที่สร้างก่อนหน้านี้
Destination settings
・S3 bucket: tinnakorn-test-s3
แล้วคลิก ▶ Buffer hints, compression, file extension and encryption
เพื่อขยายหน้าต่าง แล้วตั้งค่า Buffer interval เป็น 60
เพื่อกำหนดให้มีการอัปเดตทุกๆ 60 วินาที
เมื่อตั้งค่าเสร็จแล้ว คลิก Create Firehose stream
ที่ด้านล่างสุด
สร้าง Lambda Function
เข้ามาที่หน้าจอ AWS Lambda แล้วดำเนินการดังนี้
คลิก Functions
จากเมนูด้านซ้าย แล้วคลิก Create function
ด้านขวา
เมื่อเข้ามาหน้าจอ Create function จะตั้งค่าตามนี้
Create function
・Author from scratch
Basic information
・Function name: tinnakorn-test-lambda
・Runtime: Python 3.13
・Architecture: x86_64
(Default)
เมื่อตั้งค่าเสร็จแล้ว คลิก Create function
ที่ด้านล่างสุด
เมื่อสร้างเสร็จแล้วจะแสดงหน้าจอแบบนี้
สร้างฟังก์ชันแปลงข้อมูลวันที่ (Timestamp) ใน Lambda Function
ต่อไปจะตั้งค่าและแปลงรูปแบบวันที่ (Timestamp) ใน Lambda Function
ก่อนอื่นให้คัดลอก Code ด้านล่างนี้เตรียมไว้
อ้างอิง: Code transform date
import json
import base64
import datetime
def lambda_handler(event, context):
results = []
records = event["records"]
for record in records:
# read record
recordId = record["recordId"]
decoded_data = base64.b64decode(record["data"])
payload = json.loads(decoded_data)
print("payload: " + str(payload))
# transform record
tzinfo = datetime.timezone(datetime.timedelta(hours=7))
timestamp = datetime.datetime.fromtimestamp(payload['timestamp']).astimezone(tzinfo)
# timestamp = datetime.datetime.fromtimestamp(payload['timestamp'])
transformed_payload = {
"datetime": timestamp.strftime('%Y-%m-%d %H:%M:%S'),
"distance": payload['payloads']['distance'],
}
print("transformed_payload: " + str(transformed_payload))
# write record
decoded_data = json.dumps(transformed_payload) + '\n'
data = base64.b64encode(decoded_data.encode())
results.append({
"result": "Ok",
"recordId": recordId,
"data": data,
})
return {
"records": results
}
กล้บมาที่หน้าจอ Lambda Function ของเรา แล้วดำเนินการดังนี้
・เลือกแท็บ Code
・วาง Code ที่คัดลอกมาวางใน Code source
(หลังจากวางไปแล้ว แท็บไฟล์จะแสดงเป็น lambda_function.py
)
・คลิก Deploy (Ctrl+Shift+U)
เพื่ออัปเดต Function ใน Lambda
ต่อไปจะแก้ไขเวลา Timeout โดยคลิกแท็บ Configuration
แล้วเลือก General configuration
จากเมนูด้านซ้าย และคลิก Edit
แล้วจะตั้งค่า Timeout ดังนี้
・1
min 0
sec
・คลิก Save
ต่อไปจะทดสอบการทำงานของการแปลงข้อมูลวันที่ (Timestamp) ที่ตั้งค่ามาจนถึงตอนนี้ว่าสามารถใช้งานได้หรือไม่ โดยเลือกแท็บ Test
แล้วคัดลอก Code ด้านล่างนี้วางในช่อง JSON ด้านล่าง แล้วคลิกปุ่ม Test
ด้านขวา
※หากไม่ได้บันทึก Event name แล้วมีการ Refersh หน้าจอ Lambda Function นี้ Event JSON จะถูกคืนค่า ซึ่งเราสามารถบันทึก Event JSON ได้โดยป้อนชื่อ
Event name
และคลิกSave
แต่ครั้งนี้จะไม่บันทึกเนื่องจากเป็นแค่การสาธิต)
{
"records": []
}
ถ้า Code และการตั้งค่าถูกต้องก็จะแสดงแจ้งเตือนแบบนี้
เพียงเท่านี้ก็เสร็จสิ้นของขั้นตอนของ AWS Lambda แล้ว
สร้าง IAM Role
ในขั้นตอนนี้จะสร้าง IAM Role สำหรับใช้งานกับ EC2 โดยจะอนุญาตสิทธิ์การใช้งาน Firehose ให้กับ EC2
ดูตัวอย่างได้ที่ลิงก์ด้านล่างนี้ (วิธีการจะเหมือนกับลิงก์นี้ แต่ตอนเลือก Permissions policies ให้เลือก AmazonKinesisFirehoseFullAccess
)
ตัวอย่างการสร้าง IAM Role สำหรับ Firehose ในบทความนี้
Step 1
Select trusted entity
・Trusted entity type: AWS service
Use case
・Service or use case: EC2
・Use case: EC2
Step 2
Add permissions
・Permissions policies:
AmazonKinesisFirehoseFullAccess
(อนุญาตให้ Firehose เข้าถึง EC2 Instance)
AmazonSSMManagedInstanceCore
(อนุญาตให้เข้าถึง EC2 Instance ผ่าน SSM (AWS Systems Manager))
※หากสร้าง Key pair จะสามารถเชื่อมต่อ EC2 Instance ผ่าน SSH ได้จากช่องทางอื่นๆ เช่น เชื่อมต่อ Instance ด้วย PuTTY ดังนั้น "AmazonSSMManagedInstanceCore" อาจไม่จำเป็นต้องเพิ่มใน IAM Role
แต่ครั้งนี้ไม่ได้ใช้ Key pair จึงต้องใช้ SSM เพื่อเชื่อมต่อ Instance
Step 3
Name, review, and create
Role details
・Role name: tinnakorn-test-ec2-firehose-role
สร้าง EC2 Instance
ขั้นตอนนี้จะสร้าง EC2 Instance สำหรับใช้เชื่อมต่อกับ Firehose
ดูตัวอย่างได้ที่ลิงก์ด้านล่างนี้
ตัวอย่างการสร้าง EC2 Instance ในบทความนี้
Region:
Thailand
※Launch instances
Name and tags
・Name:tinnakorn-test-ec2
(ป้อนชื่อตามต้องการ)Application and OS Images (Amazon Machine Image)
・Amazon Machine Image (AMI):Amazon Linux 2023 AMI
Instance type
・Instance type:t2.micro
(Default)Key pair (login)
・Key pair name - required:tinnakorn-test-ec2
Proceed without a key pair (Not recommended) (ครั้งนี้จะไม่ใช้ Key pair)Network settings
Firewall (security groups)
・Security group name - required:tinnakorn-test-ec2
(ตั้งชื่อที่ต้องการ)
・Description - required:tinnakorn-test-ec2
(ป้อนตามต้องการ)
Inbound security groups rules
・คลิก Remove (ครั้งนี้จะไม่ตั้งค่า Security group rule)Configure storage
1x:8
GiBgp3
Root volume (Default)ー สามารถเพิ่ม SSM Role ได้ที่ Advanced details ー
คลิก Advanced details เพื่อขยายหน้าจอ แล้วเลือก Role ที่เราสร้างไว้ก่อนหน้านี้
IAM instance profile:tinnakorn-test-ec2-firehose-role
เตรียม EC2 Instance สำหรับ Firehose
ครั้งนี้จะรันคำสั่งใน SSM (AWS Systems Manager)
เชื่อมต่อ Instance ด้วย SSM แล้วดำเนินการรันคำสั่งตามนี้
เมื่อเชื่อมต่อเข้ามาได้แล้วให้รันคำสั่งใช้งานสิทธิ์ root ตามด้านล่างนี้
sudo su -
แล้วรันคำสั่ง Update server ให้เป็นปัจจุบันเสมอ
yum update -y
ตรวจสอบเวอร์ชัน Python
ต่อไปตรวจสอบเวอร์ชัน Python ว่าตอนนี้มีอยู่ในเซิร์ฟเวอร์หรือไม่
python3 --version
เปลี่ยน Time Zone EC2
รันคำสั่งนี้เพื่อเปลี่ยนเวลาจาก UTC ให้เป็น +07
timedatectl set-timezone Asia/Bangkok
date
ดูข้อมูลเพิ่มเติมเกี่ยวกับ Time Zone ได้ที่ลิงก์ด้านล่างนี้
วิธีตั้งค่า Time Zone ใน Amazon Linux 2 ของ EC2
ติดตั้ง PIP
รันคำสั่งติดตั้งแพ็กเกจ python3-pip
yum install python3-pip -y
รันคำสั่งตรวจสอบเวอร์ชันของ PIP ที่ติดตั้งอยู่ในระบบของเรา
pip3 --version
แล้วติดตั้ง PIP (Package Installer for Python)
*boto3 คือ AWS SDK ที่ใช้งานสำหรับ Python โดยเฉพาะทำให้ง่ายต่อการทำงานร่วมกันของโค้ดหรือโปรแกรมที่เขียนด้วย Python
pip3 install boto3
ดูรายละเอียดเพิ่มเติมได้ที่ลิงก์ด้านล่างนี้
ดูเฉพาะหัวข้อนี้: รันคำสั่งใน PuTTY
เมื่อติดตั้งคำสั่งเหล่านี้เสร็จแล้ว ดำเนินการในขั้นตอนถัดไป
การส่งข้อมูลไปยัง Firehose
สร้างโปรแกรมทดสอบการส่งข้อมูลไปยัง Firehose
รันคำสั่งย้ายไปยัง /root
(หากทำตามขั้นตอนนี้ทั้งหมดจะอยู่ใน /root อยู่แล้ว)
*หากต้องการตรวจสอบ path ปัจจุบันให้ใช้คำสั่ง pwd
cd ~/
รันคำสั่งดาวน์โหลดไฟล์ create_sample_json_to_firehose2.py
จาก GitHub Repository ลงใน EC2 Instance
wget https://raw.githubusercontent.com/classmethod-thailand/cmth_seminar/develop/iot_core_webinar/create_sample_json_to_firehose2.py
รันคำสั่ง chmod + โหมดสิทธิ์
ใช้กำหนดสิทธิ์การเข้าถึงไฟล์หรือโฟลเดอร์ โดย chmod 777 อนุญาตให้ทุกคนสามารถอ่าน, เขียน และรันไฟล์ได้
chmod 777 create_sample_json_to_firehose2.py
รันคำสั่ง vi
แก้ไขไฟล์ create_sample_json_to_firehose2.py
ที่ดาวน์โหลดเมื่อสักครู่นี้
vi create_sample_json_to_firehose2.py
แล้วแก้ไขไฟล์ดังนี้
・กดปุ่ม i
ที่แป้นพิมพ์ ให้ "create_sample_json_to_firehose2.py" 26L, 602B /変更かも
ที่อยู่ด้านล่างซ้ายเปลี่ยนเป็น --INSERT--
・แล้วแก้ไขไฟล์โดยเปลี่ยน firehose_stream_name
ให้เป็นชื่อ Firehose streams ของเรา เช่น tinnakorn-test-firehose
・แล้วกดปุ่ม Esc
ให้คำว่า -- INSERT --
หายไป และพิมพ์ :x
+ Enter เพื่อบันทึกและออกจากไฟล์
Output (Example)
import json
import datetime
import random
import math
import boto3
firehose_stream_name = "tinnakorn-test-firehose"
hour = datetime.datetime.now().hour
a1 = math.sin(hour * math.pi / 24) + 1
dist = random.lognormvariate(a1, a1 / 3)
if dist <= 0.1:
dist = 0.1
json_obj = {
"timestamp": datetime.datetime.now().timestamp(),
"payloads":{
"distance": dist,
}
}
send_data = (json.dumps(json_obj) + "\n").encode()
client = boto3.client('firehose', region_name='ap-southeast-1')
response = client.put_record(
DeliveryStreamName = firehose_stream_name,
Record = {"Data": send_data})
~
~
:x
รันสคริปต์ Python เพื่ออัปโหลดไฟล์ไปยัง Amazon S3 แล้วรอประมาณ 60 วินาทีตามที่ได้ตั้งค่า Buffer interval ไว้
python3 create_sample_json_to_firehose2.py
เมื่อผ่านไป 60 วินาทีแล้ว เข้าไปที่บริการ Amazon S3 แล้วคลิกเข้าไปที่โฟลเดอร์ด้านในสุดจะเห็นไฟล์ที่ถูกอัปโหลดเข้ามาแสดงอยู่ที่นี่ ก็ให้ติ๊ก ✅️ แล้วคลิก Download ไฟล์ลงอุปกรณ์ของเรา
แล้วเปิดไฟล์เพื่อดูข้อมูลที่อยู่ในไฟล์เหล่านี้ ก็จะเห็นข้อมูลที่เป็น "distance" และ "datetime" แบบนี้
โดยในส่วนของ Timestamp ตัวเลขเวลาจะเป็นจุดทศนิยม ซึ่งทำให้ไม่สามารถอ่านค่าได้ว่าเป็นวันที่เวลาเท่าไหร่
ดังนั้นจึงต้องใช้ Lambda Function มาเป็นตัวช่วยในการแปลงข้อมูลวันที่ (Timestamp) ในขั้นตอนถัดไป
เปิดใช้งาน Lambda function ใน Firehose
ให้เปิดใช้งาน transformation สำหรับ Lambda Function ที่สร้างเตรียมไว้ในตอนแรกเพื่อประมวลผลและแปลงข้อมูลแบบเรียลไทม์ โดยการแปลงข้อมูล Timestamp ให้เป็นรูปแบบที่กำหนดก่อนส่งไปยัง Amazon S3
กลับมาที่หน้าจอ Firehose stream ของเรา แล้วตั้งค่า transformation ดังนี้
・เลือกแท็บ Configuration
・คลิก Edit
ในหัวข้อ "Transform and convert records"
・ติ๊ก ✅️ Turn on data transformation
ในหัวข้อ "Transform source records with AWS Lambda"
・AWS Lambda function: tinnakorn-test-lambda
(เลือก Lambda Function ที่สร้างเตรียมไว้ในตอนแรก)
・คลิก Save changes
ทดสอบการทำงาน Lambda function ใน Firehose
การบวนการทำงานมีดังนี้
รันสคริปต์ Python เพื่ออัปโหลดไฟล์ไปยัง Amazon S3 อีกครั้ง แล้วรอประมาณ 60 วินาทีตามที่ได้ตั้งค่า Buffer interval ไว้
python3 create_sample_json_to_firehose2.py
แล้วเข้าไปที่บริการ Amazon S3 เพื่อดาวน์โหลดไฟล์ล่าสุดอีกครั้ง
แล้วเปิดดูก็จะพบว่าข้อมูล Timestamp จากก่อนหน้านี้ที่เป็นทศนิยมได้เปลี่ยนเป็นรูปแบบของวันที่และเวลาที่เราสามารถเข้าใจได้แล้ว
นี้คือการเปรียบเทียบผลลัพธ์ของการใช้งาน Lambda function ในการแปลงข้อมูล Timestamp ออกมาในรูปแบบปกติ
ลบ Resource
หากเราไม่ต้องการใช้งานแล้วก็ควรจะลบ Resource เพื่อประหยัดค่าใช้จ่ายโดยดูวิธีการลบในหัวข้อนี้ได้เลย
- EC2
- Instances
- Key pairs
- Security Groups
- Amazon Data Firehose
- Firehose streams
- Identity and Access Management (IAM)
- Roles
- Policies
- Amazon S3
- Buckets
- Lambda
- Functions
การ Terminate Instance, ลบ Key Pair และลบ Security Group ใน EC2
ดูวิธีการลบ Resource ใน EC2 เช่น Instance
, Key Pair
และ Security Group
ได้ที่ลิงก์ด้านล่างนี้
ลบ Firehose stream ใน Amazon Data Firehose
ดูตัวอย่างที่นี่เฉพาะหัวข้อนี้: ลบ Firehose stream ใน Amazon Data Firehose
ลบ Identity and Access Management (IAM)
ลบ Roles
และ Policies
โดยดูตัวอย่างที่ลิงก์ด้านล่างนี้
ดูตัวอย่างที่นี่เฉพาะหัวข้อนี้: ลบ Identity and Access Management (IAM)
ลบ Bucket ใน Amazon S3
ดูตัวอย่างที่นี่เฉพาะหัวข้อนี้: ลบ Bucket ใน Amazon S3
ลบ Functions ใน Lambda
เข้ามาที่หน้าจอบริการ "Lambda > Functions" ค้นหาและเลือก Functions ที่ต้องการลบ แล้วคลิก Delete
แล้วยืนยันการลบตามคำแนะนำ
ผมหวังว่าบทความนี้จะเป็นประโยชน์ให้กับผู้อ่านได้นะครับ
POP (Tinnakorn Maneewong) จากบริษัท Classmethod (Thailand) ครับ !