2️⃣ DynamoDB Stream을 이용해 Github Event 저장하기
✅ 실습 전 us-west-2 리전이 맞는지 확인해주세요!
DynamoDB 생성
DynamoDB를 검색하고 메인 페이지에서 Create table을 클릭하면 아래와 같은 화면이 나옵니다. 여기서 테이블 이름을 'GitHubEvent'로 설정하시고, primary key는 'id'로 설정하시고 오른쪽 하단의 Create Table 버튼을 클릭합니다.
이름을 반드시 GitHubEvent 로 설정해주세요. 아래 제공되는 WebhookToDB 람다 코드에 dynamoDB 테이블명이 GitHubEvent로 설정되어 있기 때문에 테이블명을 다르게 설정하고 (ex. GithubEvent) 아래 코드를 copy & paste 하면 에러가 발생합니다!
이제 람다 함수에서 DynamoDB에 데이터를 쓸 수 있도록 execution role을 제공해야 합니다. 그러기 위해서는 IAM으로 가서 설정을 해야 합니다. 다시 람다로 돌아가서 Configuration -> Permissions로 들어가 Role name을 클릭합니다.
IAM 페이지로 들어가게 되고 여기서 Add permissions를 클릭하고 Create inline policy를 클릭합니다.
JSON을 클릭 후 Policy editior에 아래의 코드를 복사해줍니다.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["dynamodb:PutItem"],
"Resource": "arn:aws:dynamodb:*:*:table/*"
}
]
}
Next를 누르시면 아래와 같은 화면이 나옵니다. Policy name은 DB_Write_Only로 설정 후 Create policy를 눌러줍니다.
다시 람다로 돌아가서 코드 적는 창에 아래의 코드를 복사하여 붙여넣고 Deploy를 눌러줍니다.
import json
import boto3
from datetime import datetime
def lambda_handler(event, context):
print("Received event:", json.dumps(event))
try:
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('GitHubEvent')
# webhook 페이로드 파싱
body = json.loads(event['body'])
event_type = event['headers'].get('x-github-event')
created_timestamp = int(datetime.fromisoformat(datetime.utcnow().isoformat().replace("Z", "+00:00")).timestamp())
# 이벤트 타입에 따라 고유 ID 설정
if event_type == 'pull_request':
item_id = f"pr_{body['pull_request']['id']}"
elif event_type == 'issues':
item_id = f"issue_{body['issue']['id']}"
else:
raise ValueError(f"Unsupported event type: {event_type}")
# 기존 항목이 있는지 확인
try:
existing_item = table.get_item(Key={'id': item_id})
except:
existing_item = {'Item': None}
# 공통 항목 설정
print("body:",body)
base_item = {
'id': item_id,
'event_type': event_type,
'action': body['action'],
'last_updated_at': datetime.utcnow().isoformat(),
'repository': body['repository']['full_name'],
'sender': body['sender']['login'],
'created_timestamp':created_timestamp
}
# 기존 항목이 있으면 생성 시간 유지
if 'Item' in existing_item and existing_item['Item']:
base_item['created_at'] = existing_item['Item']['created_at']
else:
base_item['created_at'] = datetime.utcnow().isoformat()
# PR 이벤트 처리
if event_type == 'pull_request':
pr = body['pull_request']
item = {
**base_item,
'number': pr['number'],
'title': pr['title'],
'url': pr['html_url'],
'assignee': pr['user']['login'],
'organization': body.get('organization', {}).get('login', None),
'diff_url': pr['diff_url']
}
# Issue 이벤트 처리
elif event_type == 'issues':
issue = body['issue']
item = {
**base_item,
'number': issue['number'],
'title': issue['title'],
'url': issue['html_url'],
'assignee' : issue['assignee']['login'] if issue['assignee'] else None,
}
# DynamoDB에 저장
response = table.put_item(Item=item)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Successfully processed GitHub webhook',
'event_type': event_type,
'id': item['id']
})
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({
'message': 'Error processing webhook',
'error': str(e)
})
}
이제 GitHub에 이벤트가 생겼을 때 Lambda가 DynamoDB에 데이터를 제대로 저장하는지 확인하겠습니다. 다시 GitHub으로 돌아가서 Issue를 생성해봅니다. 아무거나 적어보셔도 상관없습니다. DB에 잘 저장되는지 확인을 하기 위함이라 기억할 수 있는 걸로 적어주세요.
DynamoDB로 돌아가서 왼쪽 사이드바에서 Explore items를 클릭하고 방금 만든 테이블을 선택합니다. 방금 만든 이슈가 테이블에 item으로 들어가 있는 것을 확인할 수 있습니다.
이제 DynamoDB의 Streams를 활용하여 데이터 변경사항을 실시간으로 감지하고 Lambda 함수를 트리거하는 기능을 추가할 것입니다. 먼저 새로운 람다를 생성합니다.
이름을 StreamConsumer로 짓고 아까와 동일한 세팅인 Python 3.13을 선택합니다.
DynamoDB Stream을 읽고 처리하기 위해서는 아까처럼 Lambda의 excution role을 설정해야 합니다. 람다의 메인페이지에서 Configuration -> Permissions에 들어가 Role name을 눌러줍니다.
아까처럼 Add permissions를 클릭후 Create inline policy를 눌러줍니다.
여기서 DynamoDB Streams과 이후에 사용할 SNS, Bedrock의 policy 또한 함께 추가하겠습니다.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["bedrock:InvokeModel"],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": ["sns:Publish"],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Resource": "arn:aws:dynamodb:*:*:table/*/stream/*"
}
]
}
policy name은 'getStream'으로 설정하고 Create policy를 눌러줍니다.
이후 DynamoDB에 들어가서 Tables -> GitHubEvent -> Exports and streams를 클릭합니다.
하단으로 스크롤하면 DynamoDB의 stream을 켤 수 있는 부분이 있습니다. Turn on 버튼을 눌러줍니다.
그러면 아래와 같은 화면으로 이동하게 됩니다. New image를 클릭하고 Turn on stream 버튼을 눌러줍니다.
다시 메인 페이지로 돌아갈텐데 하단으로 스크롤하시면 DynamoDB stream이 Turn on이 된 걸 확인할 수 있을 것입니다. 그리고 아래의 Create trigger를 눌러 줍니다.
Stream을 받을 람다 함수를 선택합니다. 아까 만든 StreamConsumer를 눌러주시면 됩니다.
다시 아까 Trigger 부분으로 돌아가면 잘 생성된 걸 확인할 수 있습니다.
StreamConsumer Lambda 함수가 잘 작동되는지 확인하기 위해 람다 메인 페이지로 다시 돌아갑니다. DynamoDB가 트리거로 추가된 것을 확인할 수 있습니다. 추가로 진짜 잘 작동되는지 확인하기 위해 아까와 동일하게 print("Event: " , json.dumps(event))
를 코드에 적어줍시다. 그리고 Deploy 버튼을 눌러줍니다.
다음으로 GitHub으로 돌아가서 이슈를 생성 후 StreamConsumer 람다 함수가 트리거 되는지 확인합니다.
CloudWatch로 들어가 Log groups를 확인하면 아래의 이미지처럼 StreamConsumer 람다 함수가 있을 것입니다. 클릭 합니다.
로그가 하나 생긴 것을 확인할 수 있습니다. 클릭합니다.
StreamConsumer 람다 함수에 추가해둔 코드가 로그에 남은 것을 확인할 수 있습니다.
클릭하여 자세하게 보면 issue title로 기입했던 것이 로그에 남아 있는 걸 확인할 수 있습니다.