How to Send Messages to SQS with Python, Flask, and Boto3

2020-07-20 15:28:00 | #programming #python #sysadmin

Tested On

  • Linux Ubuntu 20.04
  • Windows 10
  • macOS Catalina

In this tutorial, we're going to learn how to set up an API that forwards messages to an SQS queue.

By designing our API to leverage AWS Lambda, it will automatically scale across regions to keep up with requests. And by utilizing an SQS queue, we can deploy workers to consume messages without overloading the database.

Completing the Prerequisites

It is important that you complete the prerequisites above, as this article depends on a custom logger.py module, and localstack.

How to Set Up the Python SQS Message Sender Project Files

How to Create Python Project Files with Windows 10 PowerShell 2.0+

cd ~
New-Item -ItemType "directory" -Path ".\python-sqs-sender"
cd python-sqs-sender
New-Item -ItemType "file" -Path . -Name "main.py"
virtualenv venv
.\venv\Scripts\activate

To verify that the virtual environment is active, make sure (venv) is in the PowerShell command prompt. For example, (venv) PS C:\Users\username\python-sqs-sender>

How to Create Python Project Files with Linux Ubuntu 14.04+ or macOS

cd ~
mkdir python-sqs-sender
cd python-sqs-sender
virtualenv -p python3 venv
source venv/bin/activate
touch main.py

To verify that the virtual environment is active, make sure (venv) is in the terminal command prompt.

This will create the following files and folders, and activate the virtual environment.

▾ python-sqs-sender/
  ▸ venv/
  logger.py
  main.py

Before you continue, double check to make sure you've imported the logger.py file from this article, which will allow our Python Lambda to log to CloudWatch logs.

Full Code Example for the Python SQS Message Sender

Filename: main.py

import os
import boto3
import awsgi
from flask import Flask, request, jsonify
from flask_cors import CORS
import logger
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

app = Flask(__name__)
CORS(app, resources={r'/*': {'origins': list(os.environ['ORIGINS'].split(','))}}, supports_credentials=True)

try:
    sqs_client = boto3.client(
        'sqs',
        region_name=os.environ['AWS_REGION'],
        endpoint_url=os.environ['SQS_ENDPOINT'],
        use_ssl=os.environ['USE_SSL'] == '1',
        verify=False,
        aws_access_key_id=os.environ['ACCESS_KEY'],
        aws_secret_access_key=os.environ['SECRET_KEY'])
except Exception as e:
    logger.ex(e)


def json_status(status_code, message):
    res = jsonify({
        'isBase64Encoded': False,
        'statusCode': status_code,
        'body': message
    })
    res.status = '%s %s' % (status_code, message)
    res.message = message
    return res


@app.route('/', methods=['POST'])
def send_message():
    json_data = request.get_json()
    message = json_data.get('message')
    some_attribute = json_data.get('some_attribute')
    another_attribute = json_data.get('another_attribute')

    if message is None:
        return json_status('400', 'Message is required')

    if some_attribute is None:
        return json_status('400', 'Some attribute is required')

    if another_attribute is None:
        return json_status('400', 'Another attribute is required')

    try:
        queue_url = sqs_client.get_queue_url(QueueName=os.environ['SQS_QUEUE_NAME'])['QueueUrl']
        sqs_client.send_message(
            QueueUrl=queue_url,
            DelaySeconds=10,
            MessageBody=message,
            MessageAttributes={
                'some_attribute': {
                    'DataType': 'String',
                    'StringValue': some_attribute
                },
                'another_attribute': {
                    'DataType': 'String',
                    'StringValue': another_attribute
                }
            }
        )
        logger.ex('Message sent')
        return json_status(200, 'Message sent')
    except Exception as e:
        logger.ex(e)
        return json_status(500, str(e))


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5100, debug=True)


def lambda_handler(event, context):
    return awsgi.response(app, event, context)

Explanation of the main.py Code

Lines 1-8: imports all the required modules.

Line 10: instantiates our Flask app and line 11 indicates which domains we will allow requests from through CORS.

Lines 13-23: instantiates our SQS client. If you're not familiar with environment variables, they get explained later on in this tutorial.

Lines 26-34: formats a json response that is compatible with AWS Lambda, ensuring that our reponses make it back through API Gateway and into the browser.

Lines 37-56: defines a route we can send requests to that will invoke the Lambda function.

Lines 59-60: allows us to run our Flask app locally on port 5100 and lines 63-64 are necessary for AWS Lambda to invoke the service through Flask routes.

How to Run the Program

Activate your virtual machine by running source venv/bin/activate from inside the project root folder. Then run python main.py to execute the program.

You should receive a ModuleNotFoundError similar to the following. This just means you need to pip install a few dependencies in order to run the program.

Traceback (most recent call last):
    File "main.py", line 2, in 
      import boto3
  ModuleNotFoundError: No module named 'boto3'

Installing Python Module Dependencies

So if you see the above error about boto3, run pip install boto3. When you try to run python main.py again, it will complain about another missing module so pip install that as well. By the end, you should have run the following commands:

source venv/bin/activate
pip install boto3
pip install aws-wsgi
pip install flask
pip install flask_cors

Now, when you run python main.py, you should see another error:

Traceback (most recent call last):
  File "main.py", line 11, in 
    CORS(app, resources={r'/*': {'origins': list(os.environ['ORIGINS'].split(','))}}, supports_credentials=True)
  File "/usr/lib/python3.8/os.py", line 675, in __getitem__
    raise KeyError(key) from None
KeyError: 'ORIGINS'

Setting Environment Variables

While it can be more convenient to add environment variables in our ~/.bashrc file, we're going to set them in the command we use to run the service, making it easier to debug and resolve issues. Run the following command:

ORIGINS=https://somewebsite.com,https://somewebsite2.com AWS_REGION=us-east-1 SQS_ENDPOINT=http://0.0.0.0:4566 USE_SSL=0 ACCESS_KEY=foo SECRET_KEY=bar SQS_QUEUE_NAME=Queue python main.py

ORIGINS is not so important for testing purposes, because we'll be using curl to send requests. But when we launch to production, we'll be able to pass any number of comma-separated domains that we want to allow requests from. Run the following command, for now:

curl -ivvv localhost:5100 -H "Content-Type: application/json" -d '{"message": "My message", "some_attribute": "Something", "another_attribute": "Another thing"}'

If everything was set up properly, you should have received the following output:

{
  "body": "Message sent", 
  "isBase64Encoded": false, 
  "statusCode": 200
}

To verify that the SQS queue received the message from our curl request, run the following command:

aws sqs receive-message --queue-url localhost:4566/queue/Queue --endpoint-url http://0.0.0.0:4566 --region us-east-1 --profile localstack
{
    "Messages": [
        {
            "MessageId": "5caac084-9950-5050-6a57-a7c1890d7e0b",
            "ReceiptHandle": "dygasgldmlswerjwymbkcnrsdwnbimzjtvztyxtxiueeqrirxobfrtreapczopidasykadqasvmuuhetozdofpwokydsmerzrnmbghdmyhlcaeabkgmnyzwifnkndtzohebctpxxjzoeylmijjnpxidpqecoecgiwbdiyekvzddmscgfjwtwjzidq",
            "MD5OfBody": "be59f66a07d05b2c52a1387d1f62d753",
            "Body": "My message",
            "Attributes": {
                "SenderId": "AIDAIT2UOQQY3AUEKVGXU",
                "SentTimestamp": "1595518929857",
                "ApproximateReceiveCount": "1",
                "ApproximateFirstReceiveTimestamp": "1595525992436"
            }
        }
    ]
}

This concludes our tutorial. Future articles will cover how to deploy this Lambda function to both localstack and production environments.

Want To See More Exercises?

View Exercises

Comments

You must log in to comment. Don't have an account? Sign up for free.

Subscribe to comments for this post

Want To Receive More Free Content?

Would you like to receive free resources, tailored to help you reach your IT goals? Get started now, by leaving your email address below. We promise not to spam. You can also sign up for a free account and follow us on and engage with the community. You may opt out at any time.



Tell Us About Your Project









Contact Us

Do you have a specific IT problem that needs solving or just have a general IT question? Use the contact form to get in touch with us and an IT professional will be with you, momentarily.

Hire Us

We offer web development, enterprise software development, QA & testing, google analytics, domains and hosting, databases, security, IT consulting, and other IT-related services.

Free IT Tutorials

Head over to our tutorials section to learn all about working with various IT solutions.

Contact