Triggering Lambda from SQS
비동기적인 메시지에 대한 완결성을 보장해줘야한다
됐는지 안 됐는지 결과를 알려줘야한다
이게 Message Queue Service
람다 함수를 통해 메시지 큐에 있는 걸 처리하고

send_message.py는

이거랑 비슷할듯 확인해보기!!

MQ에 변화가 생기면 람다함수를 호출

람다함수는 다이나모디비에 저장
람다 함수는 다양한 aws서비스와 연결되는데
그 때 각 서비스의 권한이 잘 주어져야 각자가 해야할일을 할 수 있다
그래서 람다와 관련된 랩에서는 role설정이 꼭 나온다

#1 정책 생성
(람다함수 먼저 만들고 정책 생성해도 됨)

UI가 자꾸 바뀐다 헷갈리구로.
위에서 정책 생성 누르면 이게 시각적 편집기로 원하는 서비스를 선택해서 할 수도 있게 바뀜
근데 랩에서는 Json 파일로 제공됐기 때문에 JSON 탭 누르고 복붙 한다

{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:*:*:*" }, { "Action": [ "dynamodb:*", "sqs:*" ], "Effect": "Allow", "Resource": "*" } ] } |
이름 기입하고 정책 생성 버튼 눌러준다

#2 role 만들기


태그 넘어가

#3 함수 생성
람다 서비스로 들어가서 함수 생성 클릭


함수 코드 작성하기

# 함수 코드 import json import os import boto3
QUEUE_NAME = os.environ['QUEUE_NAME'] MAX_QUEUE_MESSAGES = os.environ['MAX_QUEUE_MESSAGES'] DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
sqs = boto3.resource('sqs') dynamodb = boto3.resource('dynamodb')
def lambda_handler(event, context):
# Receive messages from SQS queue queue = sqs.get_queue_by_name(QueueName=QUEUE_NAME)
print("ApproximateNumberOfMessages:", queue.attributes.get('ApproximateNumberOfMessages'))
for message in queue.receive_messages( MaxNumberOfMessages=int(MAX_QUEUE_MESSAGES)):
print(message)
# Write message to DynamoDB table = dynamodb.Table(DYNAMODB_TABLE)
response = table.put_item( Item={ 'MessageId': message.message_id, 'Body': message.body, 'Timestamp': datetime.now().isoformat() } ) print("Wrote message to DynamoDB:", json.dumps(response))
# Delete SQS message message.delete() print("Deleted message:", message.message_id) |
함수 코드 설명

메세지를 가져오고
출력하고
다이나모디비에 넣어준다
다이나모 디비 들어가서 확인해보면 항목에 아직은 아무것도 없는 것을 확인


#4 Lambda 함수 환경변수 편집

들어가서 스크롤 내리면




* 여기서 잠깐 - 우리가 뭐하고 있어 *
1. 첫 번째로는 람다함수 돌리려고 테스트 해보고
2. 두번 째는 CLI에서 call 해서 실행해봤고
3. 지금 할게 여러가지 케이스로 람다함수 실행될 수 있는데 실제는 특정 조건이 되었을 때 얘를 호출할 수 있도록 한다
→ 트리거 추가
#5 트리거 추가



#6 메시지 생성
Ec2에서 send_message.py를 통해

#6-1 인스턴스 퍼블릭 주소를 통해 SSH 접속
#!/usr/bin/env python3.7 # -*- coding: utf-8 -*- import argparse import logging import sys from time import sleep import boto3 from faker import Faker
# 이런 파라미터에 따라 작업 parser = argparse.ArgumentParser() parser.add_argument("--queue-name", "-q", required=True, help="SQS queue name") parser.add_argument("--interval", "-i", required=True, help="timer interval", type=float) parser.add_argument("--message", "-m", help="message to send") parser.add_argument("--log", "-l", default="INFO", help="logging level") args = parser.parse_args()
if args.log: logging.basicConfig( format='[%(levelname)s] %(message)s', level=args.log)
else: parser.print_help(sys.stderr)
sqs = boto3.client('sqs')
response = sqs.get_queue_url(QueueName=args.queue_name)
queue_url = response['QueueUrl'] # (1)
logging.info(queue_url)
while True: # (3) message = args.message if not args.message: fake = Faker() # (2) message = fake.text()
logging.info('Sending message: ' + message)
response = sqs.send_message( QueueUrl=queue_url, MessageBody=message)
logging.info('MessageId: ' + response['MessageId']) sleep(args.interval) |
# 코드 설명
(1) queue_url = response['QueueUrl']
여기에 아래 대기열 Messages의 URL이 담긴다


(2) fake = Faker()
while문 안에서 계속해서 돌면서 난수 메시지 생성

#6-2 send_message.py 실행
./send_message.py -q Messages(큐이름) -i(interval) 0.1(초)
엔터치면
의미없는 데이터가 메시지 아이디와 메시지가 메시지 큐에 들어간다

#7 CloudWatch에서 로그 스트림이 어떻게 쌓이는지 확인
- 브라우저로 전환하고 AWS Management Console에서 CloudWatch 서비스로 이동합니다.
- 왼쪽 사이드 바에서 로그 를 클릭 합니다.
- 로그 그룹의 이름을 클릭하여 엽니 다.
- 로그 스트림 링크 중 하나를 클릭하고 Lamda 함수가 트리거되었는지 확인합니다.

#8 다이나모 디비에서 결과 확인

이렇게 추가 되고 있는 것을 볼 수 있다