Inventory Management Lambda - sastry-25/unboard GitHub Wiki
import json
API CONSTS
PATH_INVENTORY = "/inventory-management/inventory"
PATH_ITEM = "/inventory-management/inventory/items"
PATH_ITEM_ID = "/inventory-management/inventory/items/{id}"
def lambda_handler(event, context):
inventory = [
{
"id": 1,
"name": "Strategy Master",
"description": "Perfect for strategic thinkers! Build your empire and outwit your opponents.",
"price": 9.99,
"players": "2-4 players",
"time": "60-90 min",
"age": "12+",
"quantity": 15
},
{
"id": 2,
"name": "Family Fun Night",
"description": "Easy to learn, fun for all ages. The perfect game for family gatherings.",
"price": 14.99,
"players": "3-6 players",
"time": "30-45 min",
"age": "8+",
"quantity": 20
},
{
"id": 3,
"name": "Quick Draw Deluxe",
"description": "Fast-paced drawing and guessing game. Creativity meets competition!",
"price": 19.99,
"players": "4-8 players",
"time": "20-30 min",
"age": "10+",
"quantity": 12
},
{
"id": 4,
"name": "Mystery Manor",
"description": "Solve the mystery before time runs out. Clues, suspects, and surprises!",
"price": 24.99,
"players": "2-6 players",
"time": "45-60 min",
"age": "14+",
"quantity": 25
},
{
"id": 5,
"name": "Adventure Quest",
"description": "Epic cooperative adventure. Work together to save the kingdom!",
"price": 29.99,
"players": "1-5 players",
"time": "90-120 min",
"age": "12+",
"quantity": 18
}
]
try:
path = event["requestContext"]["resourcePath"]
method = event["requestContext"]["httpMethod"]
query_params = event.get("queryStringParameters")
path_params = event.get("pathParameters")
if path == PATH_INVENTORY and method == "GET":
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(inventory)
}
elif path == PATH_ITEM and method == "GET":
if query_params is None or "name" not in query_params:
return {
'statusCode': 400,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Missing query parameter: name"})
}
item_name = query_params["name"]
for item in inventory:
if item["name"].lower() == item_name.lower():
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(item)
}
return {
'statusCode': 404,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Item not found"})
}
elif path == PATH_ITEM_ID and method == "GET":
if path_params is None or "id" not in path_params:
return {
'statusCode': 400,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Missing path parameter: id"})
}
item_id = int(path_params["id"])
for item in inventory:
if item["id"] == item_id:
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(item)
}
return {
'statusCode': 404,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Item not found"})
}
return {
"statusCode": 400,
"headers": {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
"body": json.dumps({"error": "Request not recognized", "path": path, "method": method})
}
except Exception as e:
print(f"ERROR: {str(e)}")
return {
'statusCode': 500,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Internal server error", "message": str(e)})
}
With DB connection:
import json
import pymysql
import pymysql.cursors
DB_HOST = "" # host address
DB_USER = "admin"
DB_PASSWORD = "" # password
DB_SCHEMA = "unboard"
SQL_GET_INVENTORY = "SELECT * FROM ITEM"
SQL_GET_ITEM = "SELECT * FROM ITEM WHERE name = %s"
SQL_GET_ITEM_ID = "SELECT * FROM ITEM WHERE id = %s"
PATH_INVENTORY = "/inventory-management/inventory"
PATH_ITEM = "/inventory-management/inventory/items"
PATH_ITEM_ID = "/inventory-management/inventory/items/{id}"
def lambda_handler(event, context):
try:
conn = pymysql.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
db=DB_SCHEMA,
cursorclass=pymysql.cursors.DictCursor
)
path = event["requestContext"]["resourcePath"]
method = event["requestContext"]["httpMethod"]
query_params = event.get("queryStringParameters")
path_params = event.get("pathParameters")
if path == PATH_INVENTORY and method == "GET":
with conn.cursor() as cursor:
cursor.execute(SQL_GET_INVENTORY)
inventory = cursor.fetchall()
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(inventory)
}
elif path == PATH_ITEM and method == "GET":
if query_params is None or "name" not in query_params:
return {
'statusCode': 400,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Missing query parameter: name"})
}
item_name = query_params["name"]
with conn.cursor() as cursor:
cursor.execute(SQL_GET_ITEM, (item_name))
item = cursor.fetchone()
if item:
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(item)
}
return {
'statusCode': 404,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Item not found"})
}
elif path == PATH_ITEM_ID and method == "GET":
if path_params is None or "id" not in path_params:
return {
'statusCode': 400,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Missing path parameter: id"})
}
item_id = int(path_params["id"])
with conn.cursor() as cursor:
cursor.execute(SQL_GET_ITEM_ID, (item_id))
item = cursor.fetchone()
if item:
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(item)
}
return {
'statusCode': 404,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Item not found"})
}
return {
"statusCode": 400,
"headers": {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
"body": json.dumps({"error": "Request not recognized", "path": path, "method": method})
}
except Exception as e:
print(f"ERROR: {str(e)}")
return {
'statusCode': 500,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Internal server error", "message": str(e)})
}
finally:
if conn and conn.open:
conn.close()
After adding shipping and payment lambdas
import pymysql
import pymysql.cursors
import os
DB_HOST = os.environ["DB_HOST"]
DB_USER = os.environ["DB_USER"]
DB_PASSWORD = os.environ["DB_PASSWORD"]
DB_SCHEMA = os.environ["DB_SCHEMA"]
SQL_GET_INVENTORY = "SELECT * FROM ITEM"
SQL_GET_ITEM = "SELECT * FROM ITEM WHERE name = %s"
SQL_GET_ITEM_ID = "SELECT * FROM ITEM WHERE id = %s"
PATH_INVENTORY = "/inventory-management/inventory"
PATH_ITEM = "/inventory-management/inventory/items"
PATH_ITEM_ID = "/inventory-management/inventory/items/{id}"
def lambda_handler(event, context):
conn = None
try:
conn = pymysql.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
db=DB_SCHEMA,
cursorclass=pymysql.cursors.DictCursor
)
path = event["requestContext"]["resourcePath"]
method = event["requestContext"]["httpMethod"]
query_params = event.get("queryStringParameters")
path_params = event.get("pathParameters")
if path == PATH_INVENTORY and method == "GET":
with conn.cursor() as cursor:
cursor.execute(SQL_GET_INVENTORY)
inventory = cursor.fetchall()
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(inventory)
}
elif path == PATH_ITEM and method == "GET":
if query_params is None or "name" not in query_params:
return {
'statusCode': 400,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Missing query parameter: name"})
}
item_name = query_params["name"]
with conn.cursor() as cursor:
cursor.execute(SQL_GET_ITEM, (item_name))
item = cursor.fetchone()
if item:
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(item)
}
return {
'statusCode': 404,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Item not found"})
}
elif path == PATH_ITEM_ID and method == "GET":
if path_params is None or "id" not in path_params:
return {
'statusCode': 400,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Missing path parameter: id"})
}
item_id = int(path_params["id"])
with conn.cursor() as cursor:
cursor.execute(SQL_GET_ITEM_ID, (item_id))
item = cursor.fetchone()
if item:
return {
'statusCode': 200,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps(item)
}
return {
'statusCode': 404,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Item not found"})
}
return {
"statusCode": 400,
"headers": {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
"body": json.dumps({"error": "Request not recognized", "path": path, "method": method})
}
except Exception as e:
print(f"ERROR: {str(e)}")
return {
'statusCode': 500,
'headers': {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Access-Control-Allow-Methods': 'GET,OPTIONS'
},
'body': json.dumps({"error": "Internal server error", "message": str(e)})
}
finally:
if conn and conn.open:
conn.close()