How to Receive Messages from SQS with Python, Flask, and Boto3

2020-07-20 15:28:00 | #programming #python #sysadmin

Tested On

  • Linux Ubuntu 20.04
  • Windows 10
  • macOS Catalina

In this tutorial, we're going to learn how to set up a Python AWS Lambda worker that can be automatically invoked to consume messages from an SQS queue.

By designing our program to utilize AWS Lambda, AWS can scale it across multiple workers to consume messages at a healthy interval.

Completing the Prerequisites

It is important that you complete the prerequisites above, as this article depends on a custom logger.py module, custom database.py module, and localstack. Optionally, if you'd like to learn about how to send messages to an AWS SQS queue with Python, you can read this article, as well.

How to Set Up the Python SQS Message Sender Project Files

How to Create Python Project Files with Windows 10 PowerShell 2.0+

cd ~
New-Item -ItemType "directory" -Path ".\python-sqs-receiver"
cd python-sqs-receiver
New-Item -ItemType "file" -Path . -Name "main.py"
virtualenv venv
.\venv\Scripts\activate

To verify that the virtual environment is active, make sure (venv) is in the PowerShell command prompt. For example, (venv) PS C:\Users\username\python-sqs-receiver>

How to Create Python Project Files with Linux Ubuntu 14.04+ or macOS

cd ~
mkdir python-sqs-receiver
cd python-sqs-receiver
virtualenv -p python3 venv
source venv/bin/activate
touch main.py

To verify that the virtual environment is active, make sure (venv) is in the terminal command prompt.

This will create the following files and folders, and activate the virtual environment.

▾ python-sqs-receiver/
  ▸ venv/
  database.py
  logger.py
  main.py

Before you continue, double check to make sure you've imported the logger.py file from this article, which will allow our Python Lambda to log to CloudWatch logs and the database.py file from to save records to a Postgresql database. Or substitute with your preferred database logic.

Full Code Example for the Python SQS Message Receiver

Filename: main.py

import os
import time
import uuid
import datetime
import logger
from database import PsqlClient

db = PsqlClient(
    database=os.environ['POSTGRES_DB'],
    user=os.environ['POSTGRES_USER'],
    password=os.environ['POSTGRES_PASSWORD'],
    host=os.environ['POSTGRES_HOST'],
    port=int(os.environ['POSTGRES_PORT'])
)


def lambda_handler(event, context):
    for record in event['Records']:
        body = record['body']
        attrs = record['messageAttributes']
        some_attribute = attrs['some_attribute']['stringValue']
        another_attribute = attrs['another_attribute']['stringValue']

        try:
          try:
              db.execute('INSERT INTO table \
                  (id, body, some_attribute, another_attribute, created_date) \
                  VALUES (%(id)s, %(body)s, %(some_attribute)s, %(another_attribute)s, %(created_date)s)', (
                  {
                      'id': uuid.uuid4(),
                      'body': body,
                      'some_attribute': some_attribute,
                      'another_attribute': another_attribute,
                      'created_date': datetime.datetime.utcnow()
                  })
              )
              logger.info('Success')
          except Exception as e:
              logger.ex(e)
              raise

if __name__ == '__main__':
    lambda_handler({'Records': [{
        'body': 'Body content',
        'MessageAttributes': {
            'some_attribute': {
                'dataType': 'String',
                'stringValue': 'Something'
            },
            'another_attribute': {
                'dataType': 'String',
                'stringValue': 'Another thing'
            }
        }
    }]}, {})

Explanation of the main.py Code

Lines 1-6: imports all the required modules.

Lines 9-15: instantiates the Postgresql database client. You will need to substitute with your own credentials. We recommend setting sensitive credentials, such as this, in your environment variables rather than hardcoding them.

In lines 17-40 is the logic for our lambda_handler. This function automatically gets invoked by an SQS trigger we set up in production, and creates a new record in the database for every SQS message it consumes. If the process was successful, the message will be discarded from the SQS queue as it is no longer needed. If an error occurs, the exception will get logged to CloudWatch logs, and the SQS meesage will remain in the queue for future attempts. Make sure to keep an eye on your logs or set an alarm if a Lambda error occurs so that you can read the logs and address the issue.

There are various ways to handle unconsummable messages in the SQS queue, that are beyond the scope of this article. But we recommend analyzing how the message was created, first, to ensure that all the attributes and values are what this consumer expects, and update the code, if necessary. If there was a mistake in how the message was created, you have the option to purge the old message so that the SQS trigger doesn't constantly invoke this function.

Lines 19-22: demonstrates how to extract data from an SQS event record. You can substitute some_attribute and another_attribute for whatever the SQS message has them named as.

Lines 42-55: purely for testing purposes. You don't need this block of code, at all, for production. It just allows us to invoke the lambda_handler by running the command python main.py. We are purposely passing in a mock SQS message event record.

How to Run the Program

Activate your virtual machine by running source venv/bin/activate from inside the project root folder. Then run python main.py to execute the program.

You should receive a ModuleNotFoundError similar to the following. This just means you need to pip install a few dependencies in order to run the program.

Traceback (most recent call last):
    File "main.py", line 2, in 
      import boto3
  ModuleNotFoundError: No module named 'boto3'

Installing Python Module Dependencies

So if you see the above error about boto3, run pip install boto3. When you try to run python main.py again, it will complain about another missing module so pip install that as well. By the end, you should have run the following commands:

source venv/bin/activate
pip install boto3

when you are ready, run

python main.py

Hopefully, you should see 'Success' printed to the console, and a new record in the database. If you're running this in proudction, you should also see that the SQS message is no longer in the queue.

Want To See More Exercises?

View Exercises

Comments

You must log in to comment. Don't have an account? Sign up for free.

Subscribe to comments for this post

Want To Receive More Free Content?

Would you like to receive free resources, tailored to help you reach your IT goals? Get started now, by leaving your email address below. We promise not to spam. You can also sign up for a free account and follow us on and engage with the community. You may opt out at any time.



Tell Us About Your Project









Contact Us

Do you have a specific IT problem that needs solving or just have a general IT question? Use the contact form to get in touch with us and an IT professional will be with you, momentarily.

Hire Us

We offer web development, enterprise software development, QA & testing, google analytics, domains and hosting, databases, security, IT consulting, and other IT-related services.

Free IT Tutorials

Head over to our tutorials section to learn all about working with various IT solutions.

Contact