How to Receive Messages from SQS with Python and Boto3

2020-07-20 15:28:00 | #programming #python #sysadmin #aws #sqs

Tested On

  • Linux Ubuntu 20.04
  • Windows 10
  • macOS Catalina

In this tutorial, we're going to learn how to set up a Python AWS Lambda worker that can be automatically invoked to consume messages from an SQS queue.

By designing our program to utilize AWS Lambda, AWS can scale it across multiple workers to consume messages at a healthy interval.

Completing the Prerequisites

It is important that you complete the prerequisites above, as this article depends on a custom logger.py module, and localstack. A localstack environment runs the AWS cloud stack, locally, which gives you the ability to run and debug your code against mock AWS services, without having to sign up for AWS.

As a side note, if you'd like to learn about how to send messages to an AWS SQS queue with Python, you can read this article, as well.

How to Set Up the Python SQS Message Receiver Project Files

How to Create Python Project Files with Windows 10 PowerShell 2.0+

cd ~
New-Item -ItemType "directory" -Path ".\python-sqs-receiver"
cd python-sqs-receiver
New-Item -ItemType "file" -Path . -Name "main.py"
virtualenv venv
.\venv\Scripts\activate

To verify that the virtual environment is active, make sure (venv) is in the PowerShell command prompt. For example, (venv) PS C:\Users\username\python-sqs-receiver>

How to Create Python Project Files with Linux Ubuntu 14.04+ or macOS

cd ~
mkdir python-sqs-receiver
cd python-sqs-receiver
virtualenv -p python3 venv
source venv/bin/activate
touch main.py

To verify that the virtual environment is active, make sure (venv) is in the terminal command prompt. This will create the following files and folders, and activate the virtual environment.

▾ python-sqs-receiver/
  ▸ venv/
  logger.py
  main.py

Before you continue, double check to make sure you've imported the logger.py file from this article, which will allow our Python Lambda to log to CloudWatch logs.

Spinning Up a Local AWS Cloud Stack with LocalStack

Creating a Localstack SQS Queue

With localstack running, create an SQS queue, if you have not done so, already:

aws sqs create-queue --queue-name MyQueue --endpoint-url http://0.0.0.0:4566 --region us-east-1 --profile localstack

If you get the error, The config profile (localstack) could not be found, it's because you haven't added the localstack profile to your ~/.aws/credentials file. Complete the localstack guide and return to this tutorial.

Listing Existing LocalStack SQS Queues

You will then be able to list your SQS queues with the following command:

aws sqs list-queues --endpoint-url http://0.0.0.0:4566 --region us-east-1 --profile localstack
{
  "QueueUrls": [
      "http://localhost:4566/000000000000/MyQueue"
  ]
}

Sending a Message to SQS

Now, that you have an existing SQS queue, you'll want to send messages to it that can be consumed, later. First create the following attributes.json file, in the same directory.

{
  "myattr": {
    "DataType": "String",
    "StringValue": "Some Value"
  },
  "myattr2": {
    "DataType": "Binary",
    "BinaryValue": "Another Value"
  },
  "myattr3": {
    "DataType": "Number",
    "StringValue": "123"
  }
}

You can then send as many messages as needed, with the following command, which demonstrates how to send body content and attributes, which are fully customizeable. If everything was set up correctly, you'll receive the MD5 metadata for your message.

aws sqs send-message \
  --queue-url=http://0.0.0.0:4566/queue/MyQueue \
  --message-body 'Content' \
  --message-attributes=file://attributes.json \
  --endpoint-url=http://0.0.0.0:4566 \
  --region us-east-1 \
  --profile localstack
{
    "MD5OfMessageBody": "f15c1cae7882448b3fb0404682e17e61",
    "MD5OfMessageAttributes": "485f25af5d01da7b500440fa9459ac92",
    "MessageId": "5e94426b-6701-b141-1c9c-a79defd0fc88"
}

Full Code Example for a Manual Python SQS Message Receiver

Now, we're ready to start consuming messages. Add the following code to your main.py file. Note: this is a very simple example, for tutorial purposes. It is not meant to be used in production, but to teach you the basic concept. A production-ready version is provided towards the end of this tutorial.

Filename: main.py

import os
import boto3

try:
    sqs_client = boto3.client(
        'sqs',
        region_name=os.environ['AWS_REGION'],
        endpoint_url=os.environ['SQS_ENDPOINT'],
        use_ssl=os.environ['USE_SSL'] == '1',
        verify=False,
        aws_access_key_id=os.environ['ACCESS_KEY'],
        aws_secret_access_key=os.environ['SECRET_KEY'])
except Exception as e:
    print(e)

queue_url = sqs_client.get_queue_url(QueueName=os.environ['SQS_QUEUE_NAME'])['QueueUrl']

try:
    # Receive message from SQS queue
    response = sqs_client.receive_message(
        QueueUrl=queue_url,
        AttributeNames=[
            'SentTimestamp'
        ],
        MaxNumberOfMessages=1,
        MessageAttributeNames=[
            'All'
        ],
        VisibilityTimeout=0,
        WaitTimeSeconds=0
    )

    message = response['Messages'][0]
    receipt_handle = message['ReceiptHandle']

    # Delete received message from queue
    sqs_client.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=receipt_handle
    )
    print('Received message: %s' % message)

except Exception as e:
    print(e)

Explanation of the main.py Code

Lines 1-5: imports all the required modules.

Lines 7-17: Instantiates the sqs_client which connects to an SQS endpoint, based on the environment variables that you set.

Lines 21-47: Receives the number of messages indicated in line 28, extracts the content and receipt handle, then proceeds to delete the message from the queue.

How to Run the Program While Pointing to LocalStack

Activate your virtual machine by running source venv/bin/activate from inside the project root folder. Then run AWS_REGION=us-east-1 SQS_ENDPOINT=http://0.0.0.0:4566 USE_SSL=0 ACCESS_KEY=foo SECRET_KEY=bar SQS_QUEUE_NAME=MyQueue python main.py to execute the program.

If you receive a ModuleNotFoundError similar to the one below, this just means you need to pip install a few dependencies in order to run the program.

Traceback (most recent call last):
    File "main.py", line 2, in 
      import boto3
  ModuleNotFoundError: No module named 'boto3'

Installing Python Module Dependencies

So if you see the above error about boto3, run pip install boto3. When you try to run python main.py again, it will complain about another missing module so pip install that as well. By the end, you should have run the following commands:

source venv/bin/activate
pip install boto3

After installing the dependencies, run AWS_REGION=us-east-1 SQS_ENDPOINT=http://0.0.0.0:4566 USE_SSL=0 ACCESS_KEY=foo SECRET_KEY=bar SQS_QUEUE_NAME=MyQueue python main.py again.

How to Run the Program While Pointing to a Production SQS

Setting Up an API User with Limited Permissions in the IAM Console

Because we're going to be using the AWS CLI to set up and communicate with SQS, you'll want to set up an api user with limited permissions. If you're not sure how to do this, read our tutorial about Making Your AWS Account More Secure By Restricting Access Key Permissions. Do not use your root user or any key pair with full permissions, as it's a huge security risk. Once you've created the api user, attach the following policies:

  • AmazonSQSFullAccess

Adding the API User's Access Keys to a Local API Profile

Once you've attached the above policy, add your API user's access key ID and secret access key to an [api] profile in your ~/.aws/credentials file.

[default]
aws_access_key_id = foo
aws_secret_access_key = bar

[localstack]
aws_access_key_id = foo
aws_secret_access_key = bar

[api]
aws_access_key_id = API_USER_ACCESS_KEY_GOES_HERE
aws_secret_access_key = API_USER_SECRET_KEY_GOES_HERE

Listing Existing Production SQS Queues

You will then be able to list your SQS queues with the following command. Make sure to change the region in both the --endpoint-url and --region if your production services are running in a different region than us-east-1.

aws sqs list-queues --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api
{
  "QueueUrls": [
      "https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue"
  ]
}

Creating a Production SQS Queue

If you have not yet created any SQS queues in production, the list above will be empty. Create a new SQS queue with your preferred --queue-name.

aws sqs create-queue --queue-name MyQueue --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api

Verifying the Production SQS Queue Was Created

Make sure this new queue was added by running the aws sqs list-queues --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api command again, and copy its QueueUrl, which should be in the following format: https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue

Once you have the QueueUrl, you'll be able to do things like delete the queue with the aws sqs delete-queue --queue-url https://sqs.us-east-1.amazonaws.com/000000000000/MyQueue --endpoint-url https://sqs.us-east-1.amazonaws.com --region us-east-1 --profile api command. But don't do anything like that now.

Running the Code

Activate your virtual machine by running source venv/bin/activate from inside the project root folder. Then run AWS_REGION=us-east-1 SQS_ENDPOINT=https://sqs.us-east-1.amazonaws.com USE_SSL=1 ACCESS_KEY=API_USER_ACCESS_KEY_GOES_HERE SECRET_KEY=API_USER_SECRET_KEY_GOES_HERE SQS_QUEUE_NAME=MyQueue python main.py to execute the program.

Full Code Example for an Event-Based Python SQS Message Receiver

This example is more suited for production. It's formatted to be AWS Lambda-compatible and can be set up to execute lambda_handler on an SQS event trigger, which would fire as messages entered the queue. Or an EventBridge trigger that runs at a specified time interval. Polling, basically.

Filename: main.py

import logger


def lambda_handler(event, context):
    for record in event['Records']:
        body = record['body']
        attrs = record['messageAttributes']
        some_attribute = attrs['myattr']['stringValue']
        another_attribute = attrs['myattr2']['stringValue']

        # Do something with the message content...
        # ...like save it to a database
        logger.info(body, attrs, some_attribute, another_attribute)
        logger.info('Success')

Explanation of the main.py Code

Lines 1-2: imports all the required modules.

In lines 4-14 is the logic for our lambda_handler. This function automatically gets invoked by the trigger we set up in production. Replace lines 11-14 with your own method of processing the message. You can store the message into a database, for example. Whatever you do, I recommend wrapping your logic in a try except block, logging the error to Cloudwatch with logger.ex() and then using raise to cause the error to bubble up.

If the process was successful, the message will be discarded from the SQS queue as it is no longer needed. But if an error occurs, the exception that bubbled up will force the SQS message to remain in the queue for future attempts. Make sure to keep an eye on your logs or set an alarm if a Lambda error occurs so that you can read the logs and address the issue. Otherwise, your lambda will continue trying to process the message.

There are various ways to handle unconsummable messages in the SQS queue, that are beyond the scope of this article. But we recommend analyzing how the message was created, first, to ensure that all the attributes and values are what this consumer expects, and update the code, if necessary. If there was a mistake in how the message was created, you have the option to purge the old message so that the SQS trigger doesn't constantly invoke this function.

When you're finished running API commands, downgrade the api user's SQS policy to AWSLambdaSQSQueueExecutionRole, which ensures that your flask service only has the necessary permissions to receive and delete messages, and not make modifications to any SQS buckets.

Conclusion

That's the end of this tutorial. We hope you found it helpful. Make sure to check out our other tutorials, as well.

If you're interested in programs that carry out your computer tasks for you, take our Automation the Easy Way with Python course. This course teaches CSV and Excel file generation, API requests, website scraping, email delivery, task scheduling, and browser click, mouse, and keyboard automation. Automate your daily tasks, free up time, and get ahead, today.

Want To See More Exercises?

View Exercises View Courses

Comments

You must log in to comment. Don't have an account? Sign up for free.

Subscribe to comments for this post

Want To Receive More Free Content?

Would you like to receive free resources, tailored to help you reach your IT goals? Get started now, by leaving your email address below. We promise not to spam. You can also sign up for a free account and follow us on and engage with the community. You may opt out at any time.



Tell Us About Your Project









Contact Us

Do you have a specific IT problem that needs solving or just have a general IT question? Use the contact form to get in touch with us and an IT professional will be with you, momentarily.

Hire Us

We offer web development, enterprise software development, QA & testing, google analytics, domains and hosting, databases, security, IT consulting, and other IT-related services.

Free IT Tutorials

Head over to our tutorials section to learn all about working with various IT solutions.

Contact