Receiving Messages from SQS with Python, Flask, and Boto3

2020-07-20 15:28:00 | #programming #python #sysadmin

In this tutorial, we're going to learn how to set up a Python AWS Lambda worker that can be automatically invoked to consume messages from an SQS queue.

By designing our program to utilize AWS Lambda, AWS can scale it across multiple workers to consume messages at a healthy interval.

Completing the Prerequisites

It is important that you complete the prerequisites above, as this article depends on a custom logger.py module, and localstack. Optionally, if you'd like to learn about how to send messages to an AWS SQS queue with Python, you can read this article, as well.

How to Set Up a Python Project

Run the following commands on Linux to create the project skeleton and activate the virtual environment:

cd ~
mkdir python-sqs-receiver
cd python-sqs-receiver
touch main.py
virtualenv -p python3 venv
source venv/bin/activate

If you're on a different operating system, just make sure your skeleton looks like the following example and that you're running in the virtual environment. Also, make sure you've imported the logger.py file from this article, which will allow our Python Lambda to log to CloudWatch logs and the database.py file from to save records to a Postgresql database. Or substitute with your preferred database logic.

▾ python-sqs-receiver/
  ▸ venv/
  database.py
  logger.py
  main.py

Full Code Example for the SQS Message Receiver

Filename: main.py

import os
import time
import uuid
import datetime
import logger
from database import PsqlClient

db = PsqlClient(
    database=os.environ['POSTGRES_DB'],
    user=os.environ['POSTGRES_USER'],
    password=os.environ['POSTGRES_PASSWORD'],
    host=os.environ['POSTGRES_HOST'],
    port=int(os.environ['POSTGRES_PORT'])
)


def lambda_handler(event, context):
    for record in event['Records']:
        body = record['body']
        attrs = record['messageAttributes']
        some_attribute = attrs['some_attribute']['stringValue']
        another_attribute = attrs['another_attribute']['stringValue']

        try:
          try:
              db.execute('INSERT INTO table \
                  (id, body, some_attribute, another_attribute, created_date) \
                  VALUES (%(id)s, %(body)s, %(some_attribute)s, %(another_attribute)s, %(created_date)s)', (
                  {
                      'id': uuid.uuid4(),
                      'body': body,
                      'some_attribute': some_attribute,
                      'another_attribute': another_attribute,
                      'created_date': datetime.datetime.utcnow()
                  })
              )
              logger.info('Success')
          except Exception as e:
              logger.ex(e)
              raise

if __name__ == '__main__':
    lambda_handler({'Records': [{
        'body': 'Body content',
        'MessageAttributes': {
            'some_attribute': {
                'dataType': 'String',
                'stringValue': 'Something'
            },
            'another_attribute': {
                'dataType': 'String',
                'stringValue': 'Another thing'
            }
        }
    }]}, {})

Explanation of the main.py Code

Lines 1-6: imports all the required modules.

Lines 9-15: instantiates the Postgresql database client. You will need to substitute with your own credentials. We recommend setting sensitive credentials, such as this, in your environment variables rather than hardcoding them.

In lines 17-40 is the logic for our lambda_handler. This function automatically gets invoked by an SQS trigger we set up in production, and creates a new record in the database for every SQS message it consumes. If the process was successful, the message will be discarded from the SQS queue as it is no longer needed. If an error occurs, the exception will get logged to CloudWatch logs, and the SQS meesage will remain in the queue for future attempts. Make sure to keep an eye on your logs or set an alarm if a Lambda error occurs so that you can read the logs and address the issue.

There are various ways to handle unconsummable messages in the SQS queue, that are beyond the scope of this article. But we recommend analyzing how the message was created, first, to ensure that all the attributes and values are what this consumer expects, and update the code, if necessary. If there was a mistake in how the message was created, you have the option to purge the old message so that the SQS trigger doesn't constantly invoke this function.

Lines 19-22: demonstrates how to extract data from an SQS event record. You can substitute some_attribute and another_attribute for whatever the SQS message has them named as.

Lines 42-55: purely for testing purposes. You don't need this block of code, at all, for production. It just allows us to invoke the lambda_handler by running the command python main.py. We are purposely passing in a mock SQS message event record.

How to Run the Program

Activate your virtual machine by running source venv/bin/activate from inside the project root folder. Then run python main.py to execute the program.

You should receive a ModuleNotFoundError similar to the following. This just means you need to pip install a few dependencies in order to run the program.

Traceback (most recent call last):
    File "main.py", line 2, in 
      import boto3
  ModuleNotFoundError: No module named 'boto3'

Installing Python Module Dependencies

So if you see the above error about boto3, run pip install boto3. When you try to run python main.py again, it will complain about another missing module so pip install that as well. By the end, you should have run the following commands:

source venv/bin/activate
pip install boto3

when you are ready, run

python main.py

Hopefully, you should see 'Success' printed to the console, and a new record in the database. If you're running this in proudction, you should also see that the SQS message is no longer in the queue.

Comments

You must log in to comment. Don't have an account? Sign up for free.

Subscribe to Our Newsletter

Would you like to receive free whitepapers and other IT news? Just leave your email address below. You may opt out at any time.



Tell Us About Your Project









Contact Us

Do you have a specific IT problem that needs solving or just have a general IT question? Use the contact form to get in touch with us and an IT professional will be with you, momentarily.

Hire Us

We offer web development, enterprise software development, QA & testing, google analytics, domains and hosting, databases, security, IT consulting, and other IT-related services.

Free IT Tutorials

Head over to our tutorials section to learn all about working with various IT solutions.

Contact