
How to Orchestrate Data Flows in a Hybrid Cloud with AWS Step Functions
This article is based on the webinar “Automate Data Flows between Legacy Systems and Cloud”. The article is more detailed and step-by-step with code. If you prefer to watch the 1-hour webinar now, click here and choose 'watch replay' at the end.
Why orchestrate processes?
Orchestrating and automating processes is part of companies' goals in their digital transformation phase. This is because many companies with more years in the market have legacy systems fulfilling essential roles for decades. Therefore, when companies seek to modernize their processes, the correct approach is to do it incrementally, with decoupled services deployed in a hybrid cloud: with cloud and on-premise components working together.
One of the Amazon Web Services services we like most at Kranio and in which we are experts, is Step Functions. It consists of a state machine, very similar to a flowchart with sequential inputs and outputs where each output depends on each input.

Each step is a Lambda function, that is, serverless code that only runs when needed. AWS provides the runtime and we do not have to manage any type of server.
Use case
A case that helps us understand how to apply Step Functions is creating sequential records in multiple tables of an on-premise DB from a cloud application, through a REST API with an event-based architecture.
This case can be summarized in a diagram like this:

Here we can see:
- A data source, such as a web form.
- Data payload: these are the data we need to register in the DB.
- CloudWatch Events: Also called Event Bridge, these are events that allow triggering AWS services, in this case, the State Machine.
- API Gateway: It is the AWS service that allows creating, publishing, maintaining, and monitoring REST, HTTP, or WebSocket APIs.
- A relational database.
Advantages
The advantages of orchestrating on-premise from the cloud are:
- Reuse of existing components without leaving them aside
- The solution is decoupled, so each action to be performed has its own development, facilitating maintenance, error identification, etc.,
- If business requirements change, we know what happened and what needs to be modified or between which steps a new state should be added.
- In case changes are required, the impact on on-premise systems is mitigated since orchestration is in the cloud.
- With serverless alternatives, it is not necessary to manage servers or their operating systems.
- They are low-cost solutions. If you want to learn more, check the prices for using Lambda, API Gateway, SNS, and CloudWatch Events.
And now what?
You already know the theory about orchestrating a data flow. Now we will show you the considerations and steps you must take into account to put it into practice.
Development
The resources to use are:
- An Amazon Web Services account and AWS CLI configured as in this link
- Python +3.7
- The Serverless framework (learn here how to set it up)
- The Python Boto3 library
How to start
Since it is an orchestration, we will need to identify the sequential steps we want to orchestrate. And since orchestration is for automation, the flow must also start automatically.
For this, we will base ourselves on the use case presented above, and we will assume that the DB we write to corresponds to one of the components of a company's CRM, that is, one of the technologies used to manage the customer base.
We will create an event-based solution, starting the flow with the reception of a message originating from some source (such as a web form).
After the event is received, its content (payload) must be sent via POST to an endpoint to enter the database. This DB can be cloud or on-premise and the endpoint must have a backend that can perform limited operations on the DB.
To facilitate the deployment of what must be developed, the Serverless framework is used, which allows us to develop and deploy.
The project will be divided into 3 parts:
| Name | Description |
|---|---|
| API Gateway | An API in API Gateway that will be responsible for creating records in the DB |
| Infrastructure | Here we will simulate an on-premise DB and create an Event Bus in Event Bridge to initialize the flow |
| Orchestration | The Step Functions code |
Then these projects are deployed in the order infrastructure >> step-functions >> api-gateway.
It can be the same directory, where we dedicate 3 folders. The structure would be as follows:
├──api-gateway
│ ├── scripts-database
│ │ ├── db.py
│ │ └── script.py
│ ├── libs
│ │ └── api_responses.py
│ ├── serverless.yml
│ └── service
│ ├── create_benefit_back.py
│ ├── create_client_back.py
│ └──create_partner_back.py
├──infrastructure
│ └── serverless.yml
└── step-functions
├── functions
│ ├── catch_errors.py
│ ├── create_benefit.py
│ ├── create_client.py
│ ├── create_partner.py
│ └── receive_lead.py
├── serverless.yml
└── services
└──crm_service.py
Talk is cheap. Show me the code.
And with this famous phrase by Linus Torvalds, we will see the essential code of the project we are creating. You can see the details here.
Backend
The previous endpoints are useless if they do not have a backend. To link each endpoint with a backend, Lambda functions must be created that write to the database the parameters the endpoint receives. Once the Lambda functions are created, we enter their ARN in the “uri” parameter inside “x-amazon-apigateway-integration“.
A key aspect of Lambda functions is that they consist of a main method called handler that receives 2 parameters: message and context. Message is the input payload, and Context contains data about the function invocation and execution itself. All Lambda functions must receive an input and generate an output. You can learn more here.
The functions for each endpoint are very similar and only vary in the data the function needs to write to the corresponding table.
Function: createClient
Role: creates a record in the CLIENTS table of our DB
def handler(message, context):
try:
msg = json.loads(message["body"])
data = return_tuple(
msg["name"],
msg["lastname"],
msg["rut"],
msg["mail"],
msg["phone"]
)
conn = connect()
res = create_record(conn, INSERT_CLIENT, data)
return response_success(res["success"], res["message"])
except Exception as e:
print("Exception: ", e)Function: createPartner
Role: creates a record in the PARTNER table of our DB
def handler(message, context):
try:
crm_service = CRMService(message)
crm_service.create_partner()
return message
except Exception as e:
print(e)
return eFunction: createBenefit
Role: creates a record in the BENEFIT table of our DB
def handler(message, context):
try:
crm_service = CRMService(message)
r = crm_service.create_benefit()
return r
except Exception as e:
print(e)
return eIaaC - Infrastructure as Code
In the serverless.yml code we declare all the resources we are defining. For deployment, AWS CLI must be properly configured and then execute the command
$ sls deploy -s {stage} -r {my AWS region}This generates a CloudFormation stack that groups all the resources you declared. Learn more here.
In the Serverless.yml files you will see values like these:
${file(${self:provider.stage}.yml):db_pass}
These correspond to references to strings in other yml documents within the same path, pointing to a specific value. You can learn more about this way of working here.
API Gateway
For the REST API we will set up an API Gateway with a Serverless project.
The goal of the API is to receive requests from Step Functions, registering data in the database.
API Gateway will allow us to expose endpoints to which methods can be applied. In this project, we will only create POST methods.
We will show you the essentials of the project and you can see the details here.
OpenAPI Specification
An alternative to declare the API, its resources, and methods is to do it with OpenAPI. To learn more about OpenAPI, read this article we made about it.
This file is read by the API Gateway service and generates the API.
Important: if we want to create an API Gateway, it is necessary to add to OpenAPI an extension with information that only AWS can interpret. For example: the create_client endpoint that we call via POST receives a request body that a specific backend must process. That backend is a Lambda. The relationship between the endpoint and the Lambda function is declared in this extension. You can learn more about this here.
openapi: "3.0.1"
info:
title: "testapi"
version: "2021-01-21T15:44:04Z"
servers:
- url: "https://{api-id}.execute-api.{YOUR-REGION-HERE}.amazonaws.com/{basePath}"
variables:
basePath:
default: "/dev"
paths:
/create_client:
post:
responses:
200:
description: "200 response"
content:
application/json:
schema:
$ref: "#/components/schemas/ApiResponseBody"
requestBody:
description: req body to create client
content:
application/json:
schema:
$ref: "#/components/schemas/CreateClientRequestBody"
example:
name: alice
lastname: cooper
rut: 11111111-1
phone: 54545454
mail: acooper@alice.com
security:
- api_key: []
x-amazon-apigateway-integration:
uri: arn:aws:apigateway:{YOUR-REGION-HERE}:lambda:path/2015-03-31/functions/arn:aws:lambda:{YOUR-REGION-HERE}:{YOUR-ACCOUNT-ID-HERE}:function:${stageVariables.CreateClientBackLambdaFunction}/invocations
responses:
default:
statusCode: 200
credentials: arn:aws:iam::{YOUR-ACCOUNT-ID-HERE}:role/${stageVariables.ApiGatewayStepFunctionLambdaRole}
httpMethod: POST
passthroughBehavior: "when_no_match"
type: aws_proxy
/create_partner:
post:
responses:
200:
description: "200 response"
content:
application/json:
schema:
$ref: "#/components/schemas/ApiResponseBody"
requestBody:
description: req body to create partner
content:
application/json:
schema:
$ref: "#/components/schemas/CreatePartnerRequestBody"
example:
rut: 11111111-1
store: ESTACION_CENTRAL
security:
- api_key: []
x-amazon-apigateway-integration:
uri: arn:aws:apigateway:{YOUR-REGION-HERE}:lambda:path/2015-03-31/functions/arn:aws:lambda:{YOUR-REGION-HERE}:{YOUR-ACCOUNT-ID-HERE}:function:${stageVariables.CreatePartnerBackLambdaFunction}/invocations
responses:
default:
statusCode: 200
credentials: arn:aws:iam::{YOUR-ACCOUNT-ID-HERE}:role/${stageVariables.ApiGatewayStepFunctionLambdaRole}
httpMethod: POST
passthroughBehavior: "when_no_match"
type: aws_proxy
/create_benefit:
post:
responses:
200:
description: "200 response"
content:
application/json:
schema:
$ref: "#/components/schemas/ApiResponseBody"
requestBody:
description: req body to create benefit
content:
application/json:
schema:
$ref: "#/components/schemas/CreateBenefitRequestBody"
example:
rut: 11111111-1
wantsBenefit: true
security:
- api_key: []
x-amazon-apigateway-integration:
uri: arn:aws:apigateway:{YOUR-REGION-HERE}:lambda:path/2015-03-31/functions/arn:aws:lambda:{YOUR-REGION-HERE}:{YOUR-ACCOUNT-ID-HERE}:function:${stageVariables.CreateBenefitBackLambdaFunction}/invocations
responses:
default:
statusCode: 200
credentials: arn:aws:iam::{YOUR-ACCOUNT-ID-HERE}:role/${stageVariables.ApiGatewayStepFunctionLambdaRole}
httpMethod: POST
passthroughBehavior: "when_no_match"
type: aws_proxy








