Little Guy: Perception Node - rollthecloudinc/hedge GitHub Wiki
To implement the new architecture with Neo4j for saving data, Kafka for streaming, and a cost-effective alternative for AWS streaming, here's the complete solution:
-
Overall Architecture Onboard Edge (Kafka-based): • Use Apache Kafka for streaming YOLO, MIDS, and ORB-SLAM3 data. • Kafka runs locally on edge devices or lightweight servers. AWS Cloud (Cost-effective streaming alternative): • Replace Kafka with Amazon Simple Queue Service (SQS) or Amazon Kinesis Data Streams for lightweight streaming on AWS. • SQS is a more cost-effective option for event-driven workflows compared to Kafka. Storage: • Use Neo4j for storing spatial models and relationships between data. Workflow:
- Streaming Data: ◦ Kafka on edge sends YOLO, MIDS, and ORB-SLAM3 data to the AWS cloud. ◦ AWS receives the data using SQS or Kinesis.
- Processing: ◦ AWS Lambda functions process the data. ◦ AWS Step Functions orchestrate tasks.
- Storage: ◦ Save spatial models to Neo4j hosted on AWS.
-
Complete Architecture Implementation Code for Kafka Producer and Consumer (Edge Devices) Kafka Producer (Edge Device):
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
type InputData struct {
YOLOObstacles []YOLOObstacle `json:"yolo_obstacles"`
MIDSDepth [][]float64 `json:"mids_depth"`
ORBMapPoints []MapPoint `json:"orb_map_points"`
}
type YOLOObstacle struct {
Type string `json:"type"`
BoundingBox [4]int `json:"bounding_box"`
}
type MapPoint struct {
X float64 `json:"x"`
Y float64 `json:"y"`
Z float64 `json:"z"`
}
func main() {
// Kafka producer configuration
topic := "spatial-data"
brokerAddress := "localhost:9092" // Replace with your Kafka broker address
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{brokerAddress},
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
// Example data to send
input := InputData{
YOLOObstacles: []YOLOObstacle{
{Type: "car", BoundingBox: [4]int{10, 20, 30, 40}},
},
MIDSDepth: [][]float64{
{1.0, 2.0, 3.0},
},
ORBMapPoints: []MapPoint{
{X: 1.0, Y: 1.0, Z: 1.0},
},
}
data, err := json.Marshal(input)
if err != nil {
log.Fatalf("Failed to serialize data: %v", err)
}
// Send data to Kafka
err = writer.WriteMessages(nil, kafka.Message{
Key: []byte("key"),
Value: data,
})
if err != nil {
log.Fatalf("Failed to send message: %v", err)
}
fmt.Println("Data sent to Kafka")
}Kafka Consumer (Edge Device):
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
type InputData struct {
YOLOObstacles []YOLOObstacle `json:"yolo_obstacles"`
MIDSDepth [][]float64 `json:"mids_depth"`
ORBMapPoints []MapPoint `json:"map_point"`
}
type YOLOObstacle struct {
Type string `json:"type"`
BoundingBox [4]int `json:"bounding_box"`
}
type MapPoint struct {
X float64 `json:"x"`
Y float64 `json:"y"`
Z float64 `json:"z"`
}
func main() {
// Kafka consumer configuration
topic := "spatial-data"
brokerAddress := "localhost:9092" // Replace with your Kafka broker address
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic,
GroupID: "spatial-data-consumer-group",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatalf("Failed to read message: %v", err)
}
var input InputData
err = json.Unmarshal(msg.Value, &input)
if err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
// Process the incoming data
fmt.Printf("Received data: %+v\n", input)
// Send the data to the cloud (e.g., via REST API or AWS SQS)
}
}- AWS Implementation: Lightweight Streaming with SQS For the AWS implementation, we replace Kafka with Amazon SQS, which is cost-effective and serverless. SQS allows edge devices to push data to the cloud, and AWS Lambda can process the messages. Step 1: Create an SQS Queue • Create an SQS queue in AWS using the AWS Management Console or CLI: aws sqs create-queue --queue-name SpatialDataQueue Step 2: Edge Device Sends Data to SQS Modify the Kafka producer to send data to SQS instead of Kafka: package main
import ( "encoding/json" "fmt" "log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
type InputData struct {
YOLOObstacles []YOLOObstacle json:"yolo_obstacles"
MIDSDepth [][]float64 json:"mids_depth"
ORBMapPoints []MapPoint json:"orb_map_points"
}
type YOLOObstacle struct {
Type string json:"type"
BoundingBox [4]int json:"bounding_box"
}
type MapPoint struct {
X float64 json:"x"
Y float64 json:"y"
Z float64 json:"z"
}
func main() { // AWS SQS configuration queueURL := "https://sqs..amazonaws.com//SpatialDataQueue" sess := session.Must(session.NewSession()) svc := sqs.New(sess)
// Example data to send
input := InputData{
YOLOObstacles: []YOLOObstacle{
{Type: "car", BoundingBox: [4]int{10, 20, 30, 40}},
},
MIDSDepth: [][]float64{
{1.0, 2.0, 3.0},
},
ORBMapPoints: []MapPoint{
{X: 1.0, Y: 1
AI-generated content may be incorrect continue Certainly! Here's the continuation of the implementation where the edge device sends data to the AWS SQS queue:
SQS Producer (Edge Device) - Continued }, }
data, err := json.Marshal(input)
if err != nil {
log.Fatalf("Failed to serialize data: %v", err)
}
// Send data to SQS
_, err = svc.SendMessage(&sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(string(data)),
})
if err != nil {
log.Fatalf("Failed to send message to SQS: %v", err)
}
fmt.Println("Data sent to SQS")
} This implementation pushes the serialized data to the AWS SQS queue, which will trigger AWS Lambda for processing.
AWS Lambda for Processing SQS Messages The Lambda function retrieves messages from the SQS queue, processes the data, and saves the spatial model to Neo4j. Lambda Function Code package main
import ( "context" "encoding/json" "fmt" "log" "os"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
type InputData struct {
YOLOObstacles []YOLOObstacle json:"yolo_obstacles"
MIDSDepth [][]float64 json:"mids_depth"
ORBMapPoints []MapPoint json:"orb_map_points"
}
type YOLOObstacle struct {
Type string json:"type"
BoundingBox [4]int json:"bounding_box"
}
type MapPoint struct {
X float64 json:"x"
Y float64 json:"y"
Z float64 json:"z"
}
// Neo4j configuration var ( neo4jUri = os.Getenv("NEO4J_URI") neo4jUsername = os.Getenv("NEO4J_USER") neo4jPassword = os.Getenv("NEO4J_PASS") )
func handler(ctx context.Context, sqsEvent events.SQSEvent) error { // Initialize Neo4j driver driver, err := neo4j.NewDriver(neo4jUri, neo4j.BasicAuth(neo4jUsername, neo4jPassword, "")) if err != nil { return fmt.Errorf("failed to connect to Neo4j: %v", err) } defer driver.Close()
// Process each SQS message
for _, message := range sqsEvent.Records {
var input InputData
err := json.Unmarshal([]byte(message.Body), &input)
if err != nil {
log.Printf("Failed to unmarshal message: %v", err)
continue
}
err = saveToNeo4j(driver, input)
if err != nil {
log.Printf("Failed to save to Neo4j: %v", err)
continue
}
log.Printf("Successfully processed message: %s", message.MessageId)
}
return nil
}
func saveToNeo4j(driver neo4j.Driver, input InputData) error { session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) defer session.Close()
_, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
// Create a node for the spatial model
_, err := tx.Run(
`CREATE (m:SpatialModel {model_id: $model_id, timestamp: timestamp()})
RETURN m`,
map[string]interface{}{
"model_id": fmt.Sprintf("model-%d", time.Now().UnixNano()),
},
AI-generated content may be incorrect continue Certainly! Here's the continuation of the Lambda function code where spatial data is saved to Neo4j:
Lambda Function for Saving to Neo4j - Continued if err != nil { return nil, err }
// Process YOLO obstacles and create corresponding nodes
for _, obstacle := range input.YOLOObstacles {
_, err := tx.Run(
`CREATE (o:YOLOObstacle {type: $type, bounding_box: $bounding_box})`,
map[string]interface{}{
"type": obstacle.Type,
"bounding_box": obstacle.BoundingBox,
},
)
if err != nil {
return nil, err
}
}
// Process ORB-SLAM3 map points and create corresponding nodes
for _, point := range input.ORBMapPoints {
_, err := tx.Run(
`CREATE (p:MapPoint {x: $x, y: $y, z: $z})`,
map[string]interface{}{
"x": point.X,
"y": point.Y,
"z": point.Z,
},
)
if err != nil {
return nil, err
}
}
// Process MIDS depth data and create corresponding nodes
for y, row := range input.MIDSDepth {
for x, depth := range row {
_, err := tx.Run(
`CREATE (d:MIDSDepth {x: $x, y: $y, depth: $depth})`,
map[string]interface{}{
"x": x,
"y": y,
"depth": depth,
},
)
if err != nil {
return nil, err
}
}
}
return nil, nil
})
if err != nil {
return fmt.Errorf("failed to execute Neo4j transaction: %v", err)
}
return nil
}
- Deployment Automation For deployment, use AWS CloudFormation or Terraform to set up the infrastructure, including the SQS queue, Lambda function, and Neo4j database hosted on AWS. CloudFormation Template Below is a CloudFormation template for setting up the SQS queue, Lambda function, and necessary IAM permissions: Resources: SpatialDataQueue: Type: AWS::SQS::Queue Properties: QueueName: SpatialDataQueue
ProcessSpatialDataLambda: Type: AWS::Lambda::Function Properties: FunctionName: ProcessSpatialDataLambda Handler: main Runtime: go1.x Code: S3Bucket: your-code-bucket S3Key: function.zip Role: !GetAtt LambdaExecutionRole.Arn Environment: Variables: NEO4J_URI: bolt://:7687 NEO4J_USER: neo4j NEO4J_PASS: your-password MemorySize: 512 Timeout: 120
LambdaExecutionRole: Type: AWS::IAM::Role Properties: RoleName: LambdaSpatialDataRole AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: LambdaSQSPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: sqs:ReceiveMessage Resource: !GetAtt SpatialDataQueue.Arn - PolicyName: AI-generated content may be incorrect continue Here's the continuation of the CloudFormation template with remaining resources and policies:
CloudFormation Template - Continued - PolicyName: LambdaBasicExecution PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: "arn:aws:logs:::*"
LambdaEventSourceMapping: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 10 EventSourceArn: !GetAtt SpatialDataQueue.Arn FunctionName: !GetAtt ProcessSpatialDataLambda.Arn Enabled: true
Outputs: SQSQueueURL: Value: !Ref SpatialDataQueue Description: URL of the SQS queue
LambdaFunctionName: Value: !Ref ProcessSpatialDataLambda Description: Name of the Lambda function
-
Neo4j Deployment on AWS Option 1: Deploy Neo4j Using AWS Marketplace • Use the Neo4j AMI from the AWS Marketplace for a managed Neo4j instance. • Configure the instance to be accessible by your Lambda function (ensure security groups allow access). Option 2: Deploy Neo4j in a Docker Container If you want more control over Neo4j, you can deploy it in a Docker container on an EC2 instance or AWS ECS. Here's an example Docker configuration: docker run
--name neo4j
-p 7474:7474 -p 7687:7687
-e NEO4J_AUTH=neo4j/your-password
neo4j:latest -
Workflow Summary Edge (Onboard Kafka): • Kafka producer sends YOLO, MIDS, and ORB-SLAM3 data to the edge Kafka broker. • Kafka consumer on the edge pushes data to the AWS SQS queue. AWS Cloud (SQS and Lambda): • The SQS queue receives data from the edge. • Lambda function processes the data and saves it to Neo4j. Neo4j (Graph Database): • Stores spatial models, YOLO obstacles, ORB-SLAM3 map points, and MIDS depth as nodes. • Relationships can be added between nodes (e.g., "OBSERVES", "PART_OF", etc.) for advanced querying.
-
Cost Optimization Onboard Edge: • Use Kafka locally to avoid running a managed service like Amazon MSK. • Reduce the volume of data sent to the cloud by preprocessing data on the edge and sending only essential information. AWS Cloud: • Replace Kafka with SQS or Amazon Kinesis Data Streams for lightweight and serverless streaming. • Use AWS Lambda for processing to avoid maintaining servers. • Host Neo4j on a single EC2 instance or use Neo4j AuraDB Free Tier (if applicable) for cost-effective graph database hosting.
-
Benefits of the Architecture
- Scalability: ◦ Kafka on the edge handles high-throughput data streaming. ◦ SQS scales automatically in the cloud.
- Cost-Effectiveness: ◦ Lightweight AWS services (SQS, Lambda) reduce costs compared to running Kafka on AWS.
- Advanced Spatial Queries: ◦ Neo4j enables complex spatial queries and relationships.
- Event-Driven Workflow: ◦ AWS Step Functions orchestrate tasks with fault tolerance and retries.
This complete solution ensures low costs, modularity, and high scalability. Let me know if you need further clarification or implementation