Pointy Language Tutorial - nshaibu/volnux GitHub Wiki
Pointy Language is a powerful domain-specific language (DSL) designed for creating event-based workflows and pipeline orchestration. This tutorial will guide you from basic concepts to advanced features across all versions.
- Quick Start
- Getting Started (v1.0.0)
- Advanced Features (v1.1.0)
- Data Types & Variables (v1.1.1)
- Meta Events (v1.2.0)
- Namespace System
- Practical Guide
- Reference
- Next Steps
Install Volnux:
pip install volnux
volnux startproject my_proj
cd my_proj
volnux startworkflow my_workflowUpdate my_workflow.pty:
@retries = $env.MAX_RETRIES ?? 3
FetchData[retry_attempts=$retries] ->
ProcessData ->
SaveResultsRun it:
volnux run my_workflowNow let's learn the syntax in detail...
Pointy Language allows you to define workflows using an intuitive arrow-based syntax. Think of it as a visual way to connect tasks together, similar to drawing a flowchart but in code.
Let's start with the simplest possible workflow:
ReceiveOrderThis represents a single event called ReceiveOrder. An event is a unit of work - a task that needs to be executed.
Connect events with the arrow operator -> to execute them in sequence:
ReceiveOrder -> ValidateInventory -> ProcessPayment -> FulfillOrderWhat happens here:
-
ReceiveOrderexecutes first - Only if successful,
ValidateInventoryruns - Then
ProcessPayment - Finally,
FulfillOrder
If any step fails, the pipeline stops.
Use the parallel operator || to run events at the same time:
FetchUserData || FetchPreferences || FetchHistoryAll three events start simultaneously and run in parallel.
Pass data between events using the pipe operator |->:
FetchUserData |-> ProcessUserData |-> SaveToDatabaseThe output of FetchUserData becomes the input for ProcessUserData, and so on.
Handle different outcomes with descriptors (numeric codes):
ValidatePayment (
0 -> LogError -> NotifyCustomer,
1 -> ProcessOrder
)- Descriptor 0: Failure path (error occurred)
- Descriptor 1: Success path (validation passed)
- Descriptors 2-9: Custom conditions you define
Add retry logic with the * operator:
ConnectToDatabase * 3This will retry up to 3 times if the connection fails.
You can also combine retry with conditional branching:
ConnectToDatabase * 3 (
0 -> UseCache -> NotifyAdmin, # After all 3 retries failed
1 -> ExecuteQuery # Succeeded within 3 tries
)Build complex workflows by combining operators:
SubmitApplication ->
ValidateInformation * 2 (
0 -> RequestCorrections |-> NotifyApplicant,
1 -> PerformCreditCheck (
0 -> RejectApplication,
1 -> ApproveApplication
)
)Add comments to document your workflow:
# Validate the order before processing
ValidateOrder -> ProcessOrder # This step charges the customerDirectives are special configuration commands that control how the Pointy parser interprets and executes your workflow. Think of them as "settings" for your pipeline that you declare at the very beginning of your script.
Directives always:
- Start with the
@symbol - Must appear before any event expressions
- Affect the entire workflow globally
The basic syntax for directives is:
@directive_name:valueExamples:
@mode: "DAG"
@recursive-depth:3000The mode directive tells Pointy how to interpret the structure of your workflow.
Syntax:
@mode: "DAG" # or
@mode: "CFG"You can also use more intuitive aliases:
@mode:"strict" # Alias for DAG
@mode:"flexible" # Alias for CFGDAG Mode
DAG stands for "Directed Acyclic Graph" - fancy way of saying "no event reuse allowed."
@mode: "DAG"
FetchData -> ProcessData -> SaveData -> SendNotificationWhat DAG mode prevents:
@mode: "DAG"
A -> B (
0 -> C,
1 -> A # ERROR! Cannot reuse event A
)CFG Mode (Default)
CFG stands for "Control Flow Graph" - this mode allows event reuse within the workflow.
@mode: "CFG"
InitCounter -> ProcessItem (
0 -> Finalize, # Exit condition
1 -> TransformItem -> IncCounter -> ProcessItem # Reuse ProcessItem
)Important: Event reuse does NOT create infinite loops. Each execution is discrete:
@mode: "CFG"
# Retry pattern: FetchData can be reused
FetchData -> ValidateData (
0 -> LogError -> FetchData, # Try again
1 -> ProcessData
)
# Executes: FetchData → ValidateData → (fail) → LogError → FetchData → (success) → ProcessData
# This is NOT an infinite loop - it executes each event once per pathChoosing Between DAG and CFG:
| Scenario | Recommended Mode | Why |
|---|---|---|
| Data pipeline | DAG | Linear flow, no event reuse needed |
| ETL process | DAG | Sequential extract, transform, load |
| Polling with retries | CFG | Needs to reuse events until success |
| Batch processor with pagination | CFG | Loop through pages |
| Approval workflow | DAG | Linear approval chain |
| Game loop | CFG | Continuous state transitions |
This directive controls how deep your workflow can nest before the parser gives up.
NOTE: This directive will be deprecated as the runtime algorithm has been changed to be iterative and hence does not require any recursion.
Syntax:
@recursive-depth:numberDefault: 1000 (implementation-specific)
What Is Recursion Depth?
Think of it as how many "levels deep" your workflow can go. Each time you nest conditionals or have events call other events, you go one level deeper.
Example: Why You Might Need More Depth
@mode: "DAG"
@recursive-depth:3000
Level1 -> Level2 (
0 -> ErrorPath1 -> ErrorPath2 -> ErrorPath3 -> ErrorPath4,
1 -> Level3 (
0 -> FallbackA -> FallbackB -> FallbackC,
1 -> Level4 (
0 -> Recovery1 -> Recovery2 -> Recovery3,
1 -> Level5 -> Level6 (
0 -> AltPath1 -> AltPath2,
1 -> Level7 -> Level8 -> Level9 -> Level10
)
)
)
)Production Workflow Example:
@mode: "DAG"
@recursive-depth:2500
# Production pipeline with extensive error handling
local::FetchOrders * 3
-> pypi::ValidateOrders (
0 -> local::LogValidationError -> github::NotifyTeam,
1 -> pypi::EnrichOrderData || local::CalculateTaxes
|-> github::GenerateInvoice (
0 -> local::RetryInvoiceGeneration * 2,
1 -> pypi::SendToCustomer -> local::UpdateDatabase
)
)# WRONG!
LoadData -> ProcessData
@mode: "DAG" # ERROR: Directive must come first!✅ Fix:
@mode: "DAG"
LoadData -> ProcessData@mode: "DAG"
# This will cause an error!
StartLoop -> ProcessItem (
0 -> EndLoop,
1 -> DoWork -> StartLoop # ERROR: Event reuse in DAG mode!
)✅ Fix:
@mode: "CFG" # Use CFG mode for event reuse
StartLoop -> ProcessItem (
0 -> EndLoop,
1 -> DoWork -> StartLoop # OK in CFG mode
)@mode: "DAG"
@recursive-depth:2000
@mode:"CFG" # WARNING: Duplicate directive!What happens: The last value wins (CFG), but you'll get a warning.
✅ Fix: Specify each directive only once.
@mode: "GRAPH" # ERROR: Invalid mode value!✅ Fix: Use only "DAG", "CFG", "strict", or "flexible".
Attributes configure how events execute. They're specified in square brackets:
ProcessData[retry_attempts=3, executor="ThreadPoolExecutor"]1. Executor Selection
Choose how events run:
# Run in a thread pool
A[executor="ThreadPoolExecutor"]
# Run in a separate process
B[executor="ProcessExecutor"]
# Run via XML-RPC
C[executor="XMLRPCExecutor"]2. Retry Configuration
UnreliableService[retry_attempts=5]3. Executor Configuration
Pass configuration to the executor:
RemoteService[
executor="XMLRPCExecutor",
executor_config="{'host':'localhost', 'port':8090}"
]4. Execution Evaluation State
Control success/failure evaluation:
ProcessBatch[result_evaluation_strategy="ALL_MUST_SUCCEED"]Options:
-
ALL_MUST_SUCCEED(default) ANY_MUST_SUCCEEDMAJORITY_MUST_SUCCEEDNO_FAILURES_ALLOWED
Group events to share attributes using curly braces {}:
{LoadData -> ProcessData -> SaveData}[
retry_attempts=3,
executor="ThreadPoolExecutor"
]All three events inherit retry_attempts=3 and use ThreadPoolExecutor.
What Does Grouping Do?
Grouping creates a sub-engine with shared execution context:
# These share a ThreadPool instance (sub-engine)
{A -> B -> C}[executor="ThreadPoolExecutor"]
# These each get their own executor instance
A[executor="ThreadPoolExecutor"] ->
B[executor="ThreadPoolExecutor"] ->
C[executor="ThreadPoolExecutor"]Events connected by -> within {} share the same execution context:
# These run in the same context
{A -> B -> C}[executor="ThreadPoolExecutor"]
# These run in separate contexts
A -> B -> CCombine sequential and parallel execution:
A -> B || C -> DExecution flow:
-
Aexecutes -
BandCexecute in parallel -
Dwaits for bothBandCto complete -
Dexecutes
Different chains can run in parallel:
{A -> B}[retry_attempts=3] || {C -> D}[retry_attempts=1]Initialize ->
{DataProcessing -> Validation}[retry_attempts=3] ||
{LogSetup -> MonitoringSetup}[executor="XMLRPCExecutor"] ->
Finalize[retry_attempts=1]@count = 42
@negative = -17@pi = 3.14159
@rate = 2.5@enabled = true
@debug_mode = false@service_name = "UserAuthentication"
@environment = "production"@numbers = [1, 2, 3, 4, 5]
@names = ["Alice", "Bob", "Charlie"]
@mixed = [1, "text", true, 3.14]@config = {
"host": "localhost",
"port": 8080,
"ssl": true
}
@user = {
"name": "John",
"age": 30,
"active": true
}@database_config = {
"primary": {
"host": "db1.example.com",
"port": 5432
},
"replica": {
"host": "db2.example.com",
"port": 5432
},
"pool_size": 10
}Declare variables with @:
@retries = 4
@timeout = 30.5
@debug = true
@service = "DataProcessor"Reference variables with $:
@max_retries = 3
@delay = 1.5
FetchData[retry_attempts=$max_retries, delay=$delay]Copy values between variables:
@original = 10
@copy = $original
@backup = $copyEvaluation Order:
Variables are evaluated in declaration order. Later variables can reference earlier ones, but circular references are not allowed:
@a = 5
@b = $a + 10 # Error: Cannot perform arithmetic
@c = $b # OK: References earlier variable
# ❌ Circular reference - Not allowed
@x = $y
@y = $xAccess environment variables using the $env namespace:
@api_key = $env.API_KEY
@max_retries = $env.MAX_RETRIES
@debug_mode = $env.DEBUG
@database_url = $env.DATABASE_URLEnvironment variables are resolved at workflow execution time by the Volnux runtime.
Example:
@timeout = $env.SERVICE_TIMEOUT
@retries = $env.MAX_RETRIES
CallExternalService[
timeout=$timeout,
retry_attempts=$retries
]Provide defaults for potentially undefined variables using ??:
# If TIMEOUT is undefined, use 30
@timeout = $env.TIMEOUT ?? 30
# Chain multiple fallbacks
@host = $env.PRIMARY_HOST ?? $env.SECONDARY_HOST ?? "localhost"
# With other variables
@retries = $env.MAX_RETRIES ?? $default_retries ?? 3Practical Example:
@api_timeout = $env.API_TIMEOUT ?? 30.0
@max_retries = $env.MAX_RETRIES ?? 3
@enable_cache = $env.ENABLE_CACHE ?? true
@log_level = $env.LOG_LEVEL ?? "INFO"
CallExternalAPI[
timeout=$api_timeout,
retry_attempts=$max_retries,
use_cache=$enable_cache,
log_level=$log_level
]Conditional assignment based on comparisons:
# Basic ternary
@retries = $env.ENABLE_RETRIES == "true" ? 5 : 0
# With null check
@timeout = $env.TIMEOUT == NULL ? 30 : $env.TIMEOUT
# Comparing numbers
@batch_size = $env.LOAD_LEVEL == "high" ? 100 : 10More Examples:
@environment = $env.DEPLOY_ENV ?? "development"
@workers = $environment == "production" ? 16 : 4
@cache_enabled = $environment == "production" ? true : false
ProcessData[
worker_count=$workers,
enable_cache=$cache_enabled
]Available operators for ternary expressions:
-
==- Equal to -
!=- Not equal to -
>- Greater than -
<- Less than -
>=- Greater than or equal -
<=- Less than or equal
Examples:
@cpu_count = $env.CPU_COUNT ?? 4
@workers = $cpu_count > 8 ? 16 : 4
@use_cache = $env.CACHE_SIZE >= 1000 ? true : false
@priority = $env.QUEUE_SIZE < 100 ? "low" : "high"Pointy is designed for workflow orchestration, not computation. Complex logic belongs in events.
What You CANNOT Do:
# ❌ Cannot do arithmetic on variables
@total = $count + 10 # Not supported
@result = $a * $b # Not supported
# ❌ Cannot call functions
@uppercase = uppercase($name) # Not supported
@length = len($list) # Not supported
# ❌ Cannot do complex boolean logic
@result = $a > 5 && $b < 10 # Not supported
@flag = !$enabled # Not supported
# ❌ Cannot concatenate strings
@full_name = $first + " " + $last # Not supportedWhat You CAN Do:
# ✅ Simple comparisons
@is_production = $env.ENV == "production" ? true : false
# ✅ Null coalescing with multiple fallbacks
@value = $env.VAR1 ?? $env.VAR2 ?? $default ?? 0
# ✅ Nested ternary (use sparingly)
@level = $env.ENV == "prod" ? "high" : ($env.ENV == "staging" ? "medium" : "low")Rationale: Pointy focuses on workflow orchestration. For complex data transformations or calculations, implement them in your event code.
Variables have two scopes: global and local.
Variables declared outside event attributes are global and accessible everywhere:
# Global variables
@global_retries = 3
@global_timeout = 30
@global_executor = "ThreadPoolExecutor"
# Accessible in all events
FetchData[retry_attempts=$global_retries, timeout=$global_timeout]
ProcessData[retry_attempts=$global_retries, executor=$global_executor]
SaveData[timeout=$global_timeout]Attributes defined within [] are local to that event and cannot be referenced elsewhere:
@global_retries = 3
FetchData[
retry_attempts=$global_retries,
buffer_size=1024 # Local to FetchData
]
ProcessData[
retry_attempts=$global_retries,
buffer_size=2048, # Different local value, no conflict
chunk_size=512 # Local to ProcessData
]
# ❌ Cannot reference local attributes
SaveData[size=$buffer_size] # Error: buffer_size is not a global variableAll variables in Pointy are read-only (immutable):
@counter = 0
# ❌ Events cannot modify variables
IncrementCounter # Cannot change @counter from within event
# ❌ Cannot reassign variables
@counter = $counter + 1 # Not supportedRationale: Immutability prevents race conditions and makes workflows predictable and easier to debug.
Configuration with Fallbacks:
@api_timeout = $env.API_TIMEOUT ?? 30.0
@max_retries = $env.MAX_RETRIES ?? 3
@enable_cache = $env.ENABLE_CACHE ?? true
CallExternalAPI[
timeout=$api_timeout,
retry_attempts=$max_retries,
use_cache=$enable_cache
]Environment-Specific Behaviour:
@environment = $env.DEPLOY_ENV ?? "development"
@batch_size = $environment == "production" ? 1000 : 10
@log_level = $environment == "production" ? "WARNING" : "DEBUG"
@workers = $environment == "production" ? 16 : 4
ProcessData[
batch_size=$batch_size,
log_level=$log_level,
worker_count=$workers
]Safe Null Handling:
# ❌ Avoid this - will fail if undefined
@timeout = $env.TIMEOUT
# ✅ Do this - safe with default
@timeout = $env.TIMEOUT ?? 30
# ✅ Or this - explicit null check
@timeout = $env.TIMEOUT == NULL ? 30 : $env.TIMEOUTComplex Configuration:
@env_name = $env.ENVIRONMENT ?? "dev"
@is_prod = $env_name == "production" ? true : false
@is_staging = $env_name == "staging" ? true : false
@db_pool_size = $is_prod ? 50 : ($is_staging ? 20 : 5)
@cache_ttl = $is_prod ? 3600 : 300
@rate_limit = $is_prod ? 1000 : 100
{
ConnectDatabase[pool_size=$db_pool_size] ->
SetupCache[ttl=$cache_ttl] ->
ConfigureRateLimit[limit=$rate_limit]
}[executor="ThreadPoolExecutor"]@batch_size = 50
@retry_count = 3
@executor_type = "ThreadPoolExecutor"
{
LoadData ->
ProcessData ->
SaveData
}[
batch_size=$batch_size,
retry_attempts=$retry_count,
executor=$executor_type
]Valid names:
retriesTIMEOUT_VALUE_global_configserver1_urlisActive
Invalid names:
-
1variable(starts with a number) -
my-variable(contains hyphen) -
config.timeout(contains dot)
Meta Events are powerful abstractions for dynamic, data-driven workflows. They allow you to process collections of data with familiar functional programming patterns.
A Meta Event wraps a Template Event and executes it multiple times based on input data. Think of it as a loop or map operation, but declarative.
Syntax:
MODE<TemplateEvent>[attributes]Key Concept: Meta Events are mini-orchestrators that follow a monadic design:
Collection → [Meta Event + Template] → Collection → [Next Meta Event]
Each Meta Event:
- Takes a collection as input
- Applies a Template Event to items in the collection
- Outputs a (potentially transformed) collection
- Passes it to the next stage
This is why nested Meta Events are not allowed - it would be semantically ambiguous:
# ❌ Invalid - What would this even mean?
MAP<FILTER<ProcessItem>> # Filter what? The item or items within?
# ✅ Valid - Clear sequential composition
MAP<Transform> |-> FILTER<Validate>Execute a Template Event for each item in a collection:
@items = [1, 2, 3, 4, 5]
LoadItems[items=$items] |-> MAP<ProcessItem> |-> SaveResultsWhat happens:
-
ProcessItemruns 5 times (once per item) - Each execution gets one item as input
- Results are collected into a list
- The output list is passed to
SaveResults
With attributes:
MAP<ProcessItem>[
batch_size=10,
concurrent=true,
retry_attempts=3
]Keep only items that match a condition:
@users = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 17},
{"name": "Charlie", "age": 30}
]
LoadUsers |-> FILTER<IsAdult> |-> ProcessAdultsWhat happens:
-
IsAdultruns for each user - Returns descriptor
1(include) or0(exclude) - Output contains only users where
IsAdultreturned1 - Filtered list is passed to
ProcessAdults
Combine all items into a single result:
@numbers = [1, 2, 3, 4, 5]
REDUCE<Sum>[initial_value=0, collection=$numbers] -> DisplayTotalWhat happens:
- Start with
initial_value(0) - Process first item: 0 + 1 = 1
- Process second item: 1 + 2 = 3
- Process third item: 3 + 3 = 6
- Continue until final result: 15
- Single result (15) is passed to
DisplayTotal
Execute for side effects without changing the data:
@users = [{"id": 1}, {"id": 2}, {"id": 3}]
LoadUsers[collection=$users] |-> FOREACH<SendNotification> |-> LogCompletionWhat happens:
-
SendNotificationruns for each user - Results are ignored
- Original users list passes through unchanged
- Useful for logging, notifications, updates
Transform items and flatten nested results:
@categories = ["electronics", "books", "clothing"]
LoadCategories[collection=$categories] |->
FLATMAP<FetchProductsInCategory> |->
ProcessAllProductsWhat happens:
-
FetchProductsInCategoryreturns a list for each category - All lists are flattened into a single list
- Example:
[[prod1, prod2], [prod3], [prod4, prod5]]→[prod1, prod2, prod3, prod4, prod5] - Flattened list is passed to
ProcessAllProducts
Broadcast the same input to N instances:
@request = {"data": "payload"}
PrepareRequest[request=$request] |->
FANOUT<SendToReplica>[count=3, concurrent=true] ->
AggregateResponsesWhat happens:
- Same request sent to 3 replica instances
- All execute in parallel (if
concurrent=true) - Results collected from all instances
- All results passed to
AggregateResponses
Example: Send to 3 different replicas:
# Sends to:
# - replica instance 1
# - replica instance 2
# - replica instance 3
FANOUT<SendToReplica>[count=3]
There are three ways to pass collections to Meta Events:
Option 1: Pipe from previous event
LoadUsers |-> MAP<ProcessUser> |-> SaveResultsOption 2: Explicit collection attribute
@users = [{"id": 1}, {"id": 2}]
MAP<ProcessUser>[collection=$users]Option 3: Both (collection attribute overrides piped input)
@explicit_users = [{"id": 99}]
LoadUsers |-> MAP<ProcessUser>[collection=$explicit_users]
# Uses $explicit_users, not output from LoadUsersPrecedence: Explicit collection attribute takes priority over piped input.
Meta Events support two levels of attributes:
-
batch_size: Items per batch -
concurrent: Enable parallel execution -
max_concurrency: Max parallel tasks -
count: Number of instances (FANOUT only) -
initial_value: Starting value (REDUCE only) -
collection: Collection of items to process -
partial_success: Allow partial failures (default:false) -
concurrent: Execute tasks in parallel (default: True) -
concurrency_mode: Mode of concurrent execution (thread, process) -
max_workers: Maximum parallel executions (default: 4)
-
retry_attempts: Retry count per instance -
executor: Executor type -
stop_condition: The condition under which the workflow should stop -
bypass_event_checks: Check skip condition if it is met, skip the event (default: false) -
result_evaluation_strategy: The strategy to use for task result evaluation
Example:
MAP<ProcessItem>[
batch_size=10, # Meta-level
concurrent=true, # Meta-level
retry_attempts=3, # Template-level (inherited by each ProcessItem)
executor="ThreadPoolExecutor", # Template-level (inherited by each ProcessItem)
]
Combine multiple Meta Events in sequence:
LoadData |->
MAP<Parse> |->
FILTER<Validate> |->
REDUCE<Aggregate> ->
PublishResultsFlow:
-
LoadDatareturns a collection -
MAP<Parse>transforms each item -
FILTER<Validate>keeps only valid items -
REDUCE<Aggregate>combines into single result -
PublishResultsreceives the final result
Control how failures are handled:
MAP<ProcessItem>[partial_success=true, retry_attempts=3] (
0 -> HandleErrors, # Some or all items failed
1 -> SaveResults # All items succeeded
)Options:
-
partial_success=false(default): Fail if any item fails -
partial_success=true: Succeed with partial results, even if some items fail
Example with detailed error handling:
@items = [1, 2, 3, 4, 5]
LoadItems[items=$items] |->
MAP<ProcessItem>[
concurrent=true,
partial_success=true,
retry_attempts=2
] (
0 -> LogFailures -> NotifyAdmin,
1 -> ValidateResults -> SaveToDatabase
)
No Nested Meta Events:
# ❌ INVALID - Cannot nest Meta Events
MAP<FILTER<ProcessItem>>
# ✅ VALID - Use sequential composition
MAP<Transform> |-> FILTER<Validate>Reason: Meta Events follow a monadic design where each Meta Event:
- Takes a collection
- Applies a template to items
- Returns a collection
Nesting would create ambiguity: should the inner Meta Event operate on the whole item or items within the item?
@batch_size = 50
@retry_count = 3
# ETL Pipeline
FetchRawData |->
MAP<ParseRecord>[
batch_size=$batch_size,
concurrent=true,
retry_attempts=$retry_count
] (
0 -> LogParseErrors -> NotifyDataTeam,
1 -> FILTER<ValidateSchema>[concurrent=true] |->
MAP<TransformToTargetFormat>[concurrent=true] |->
REDUCE<BatchInsert>[batch_size=100] (
0 -> RollbackChanges -> AlertAdmin,
1 -> CommitTransaction -> SendSuccessNotification
)
)Differentiate events from different sources using the namespace syntax:
namespace::EventNamelocal - Local project (default)
local::ValidateInput -> local::ProcessLocally
pypi - Python Package Index
pypi::LoadData -> pypi::ProcessDatagithub - GitHub repositories
github::FetchConfig -> github::ApplyConfigCustom registries - Your organisation's event registry
# Load from PyPI package
pypi::LoadData -> pypi::ProcessData
# Load from GitHub repo
github::FetchConfig -> github::ApplyConfig
# Load from local project (default)
local::ValidateInput -> ProcessLocally # 'local::' is implicit
# Mixed sources
pypi::FetchUserData || github::FetchPreferences || local::FetchHistoryClarity:
# ✅ Good - Explicit sources
pypi::StandardProcessor -> local::CustomValidator -> github::SharedUtility
# ❌ Risky - Ambiguous sources
StandardProcessor -> CustomValidator -> SharedUtilityReusability:
- Pull standard events from PyPI packages
- Share team events via GitHub
- Override with local implementations when needed
Example:
@retries = $env.MAX_RETRIES ?? 3
# Use standard validation from PyPI
pypi::ValidateEmail[retry_attempts=$retries] ->
# Use custom business logic locally
local::ApplyBusinessRules ->
# Use shared utility from GitHub
github::SendNotificationVolnux provides a queryset-like abstraction for all data flowing through Pointy workflows, whether it's success data or error data.
Similar to Django's ORM querysets, Volnux querysets provide:
- Filtering: Select subsets of data
- Mapping: Transform items
- Aggregation: Combine data
- Lazy evaluation: Operations are optimized
FetchUsers |-> # Returns UserQuerySet
FilterActive |-> # Event filters the queryset
MAP<Transform> |-> # Meta Event works on queryset
SaveResults
Errors are also querysets, enabling powerful error handling:
ProcessBatch (
0 |-> FilterRetryable |-> # Filter error queryset
MAP<RetryItem>, # Retry filtered errors
1 -> Continue
)
from volnux import Event
class FilterCriticalErrors(Event):
def process(self, *args, **kwargs):
# Filter errors by type
critical = self.previous_result.filter(content__severity="critical")
# Chain filters
recent_critical = (
critical
.filter(timestamp__gte=datetime.now() - timedelta(hours=1))
)
return True, recent_critical| Method | Example | Description |
|---|---|---|
filter() |
qs.filter(type="TimeoutError") |
Keep matching items |
first() |
qs.first() |
Get first item |
all() |
qs.all() |
Get all items |
MAP<ProcessItem>[partial_success=true] (
0 |-> ClassifyErrors (
0 |-> HandleTimeout -> RetryWithBackoff,
1 |-> HandleAuth -> RefreshToken -> Retry,
2 |-> HandleRateLimit -> WaitAndRetry,
3 |-> HandleFatal -> LogAndFail
),
1 -> ValidateResults -> Commit
)
class ClassifyErrors(Event):
def process(self, *args, **kwargs):
timeout_error_qs = self.previous_results.filter(type="TimeoutError")
auth_error_qs = self.previous_results.filter(type="AuthError")
if not timeout_error_qs.is_empty():
self.goto(descriptor=0, result=list(timeout_error_qs))
elif not auth_error_qs.is_empty():
self.goto(descriptor=1, result=list(auth_error_qs))
# ... etc@max_retries = 3
MAP<ProcessOrder>[partial_success=true] (
0 |->
FILTER<IsTransient> |->
FILTER<BelowRetryLimit>[max=$max_retries] |->
MAP<IncrementRetryCount> |->
MAP<ProcessOrder>,
1 -> CommitSuccessful
)
MAP<ProcessBatch>[partial_success=true] (
0 |->
MAP<EnrichErrorContext> |->
REDUCE<GroupByErrorType> |->
MAP<CalculateErrorRate> |->
FILTER<ExceedsThreshold> (
0 -> ContinueMonitoring,
1 -> TriggerAlert -> PageOnCall
),
1 -> ProcessSuccess
)
ProcessOrders |->
MAP<ValidateOrder>[partial_success=true] (
# Handle errors
0 |->
MAP<ExtractOrderId> |->
FOREACH<MarkAsFailed> |->
REDUCE<CountFailures> |->
NotifyIfThresholdExceeded,
# Handle successes
1 |->
MAP<ChargePayment>[partial_success=true] (
0 |-> HandlePaymentFailures,
1 |-> FulfillOrders
)
)
1. Always use |-> when you need error details:
# ❌ Error details lost
ProcessData (0 -> LogError)
# ✅ Error available for inspection
ProcessData (0 |-> InspectError -> TakeAction)
2. Filter early, process less:
# ✅ Good - filter first
MAP<Process>[partial_success=true] (
0 |-> FILTER<IsRetryable> |-> MAP<Retry>
)
# ❌ Less efficient - processes all errors
MAP<Process>[partial_success=true] (
0 |-> MAP<AttemptRetry> # Processes non-retryable too
)
3. Use Meta Events for error collections:
# ✅ Good - declarative
ProcessBatch (
0 |-> FILTER<Critical> |-> FOREACH<Alert>
)
# ❌ More complex - custom event logic
ProcessBatch (
0 |-> CustomErrorHandler # Has to iterate manually
)
# Order processing with payment validation
@retry_count = 3
@timeout = 30.0
ReceiveOrder ->
ValidateOrder[timeout=$timeout] (
0 -> LogInvalidOrder -> NotifyCustomer,
1 -> CheckInventory (
0 -> SendOutOfStockNotice,
1 -> ProcessPayment[retry_attempts=$retry_count] (
0 -> RefundIfNeeded -> NotifyPaymentFailed,
1 -> ReserveInventory ->
ShipOrder || GenerateInvoice || UpdateAnalytics ->
SendConfirmationEmail
)
)
)@workers = 10
@batch = 100
# ETL with parallel processing
ExtractFromDatabase |->
MAP<CleanRecord>[
concurrent=true,
max_concurrency=$workers,
batch_size=$batch
] |->
FILTER<ValidateSchema> |->
MAP<EnrichWithExternalData>[concurrent=true] |->
REDUCE<AggregateByCategory> ->
LoadToDataWarehouse (
0 -> RollbackTransaction -> AlertDataTeam,
1 -> CommitAndNotify
)@service_timeout = 5.0
# User registration across services
ValidateRegistration ->
{
CreateUserAccount[timeout=$service_timeout] ||
SetupUserProfile[timeout=$service_timeout] ||
InitializePreferences[timeout=$service_timeout]
}[
executor="ThreadPoolExecutor",
retry_attempts=2
] (
0 -> RollbackAll -> SendErrorEmail,
1 -> GenerateWelcomeEmail ||
SendWelcomeSMS ||
CreateOnboardingTask ->
ScheduleFollowUp
)@recipients = [
{"email": "[email protected]", "name": "Alice"},
{"email": "[email protected]", "name": "Bob"},
{"email": "[email protected]", "name": "Charlie"}
]
LoadRecipients[recipients=$recipients] |->
FILTER<HasValidEmail>[concurrent=true] |->
MAP<PersonalizeMessage>[concurrent=true] |->
FOREACH<SendEmail>[
concurrent=true,
continue_on_error=true,
max_concurrency=5
] |->
LogResults# Fetch from different sources and aggregate
{
FetchFromDatabase |-> MAP<ParseDBRecord>
} ||
{
FetchFromAPI * 3 |-> MAP<ParseAPIResponse>
} ||
{
FetchFromCache |-> FILTER<IsFresh>
} |->
REDUCE<MergeDataSources> ->
ValidateCompleteness (
0 -> FetchMissingData -> RetryAggregation,
1 -> CacheResults -> ServeToClient
)@train_split = 0.8
@epochs = 10
LoadDataset |->
FILTER<RemoveOutliers> |->
MAP<NormalizeFeatures>[concurrent=true] ->
SplitTrainTest[split_ratio=$train_split] ->
{
TrainModel[epochs=$epochs] ->
ValidateModel (
0 -> TuneHyperparameters -> TrainModel,
1 -> SaveModel
)
} ||
{
GenerateReport ->
VisualizeResults
} ->
DeployModel@approval_timeout = 86400 # 24 hours
SubmitRequest ->
AutoValidate (
0 -> RejectImmediately,
1 -> RequiresApproval (
0 -> SendToSupervisor[timeout=$approval_timeout] (
0 -> EscalateToManager,
1 -> ApproveRequest,
3 -> RequestMoreInfo |-> NotifySubmitter
),
1 -> AutoApprove
)
) ->
ExecuteApprovedAction ||
LogApproval ||
NotifyStakeholders# ✅ Good
ValidateCustomerCreditCard -> ChargeCustomerAccount
# ❌ Bad
Validate -> Charge# ✅ Good - Explicit error handling
ProcessPayment (
0 -> LogPaymentError -> NotifyCustomer,
1 -> FulfillOrder
)
# ❌ Bad - No error handling
ProcessPayment -> FulfillOrder# ✅ Good - Configurable
@retries = $env.MAX_RETRIES ?? 3
@timeout = $env.TIMEOUT ?? 30.0
ProcessData[retry_attempts=$retries, timeout=$timeout]
# ❌ Bad - Hardcoded
ProcessData[retry_attempts=3, timeout=30.0]# ✅ Good - Grouped context
{LoadData -> ProcessData -> SaveData}[
executor="ThreadPoolExecutor",
retry_attempts=2
]
# ❌ Bad - Repeated attributes
LoadData[executor="ThreadPoolExecutor", retry_attempts=2] ->
ProcessData[executor="ThreadPoolExecutor", retry_attempts=2] ->
SaveData[executor="ThreadPoolExecutor", retry_attempts=2]# Validate user input before processing
ValidateInput ->
# Process in parallel for better performance
ProcessTypeA || ProcessTypeB || ProcessTypeC ->
# Aggregate results and commit transaction
AggregateResults -> CommitTransaction# ✅ Good - Use FOREACH for side effects
LoadUsers |-> FOREACH<SendNotification> |-> ContinueFlow
# ❌ Bad - Use MAP when results aren't needed
LoadUsers |-> MAP<SendNotification> |-> ContinueFlow# ✅ Good - Explicit sources
pypi::StandardProcessor -> local::CustomValidator -> github::SharedUtility
# ❌ Risky - Ambiguous sources
StandardProcessor -> CustomValidator -> SharedUtility# ✅ Good - Safe with defaults
@timeout = $env.TIMEOUT ?? 30
@retries = $env.MAX_RETRIES ?? 3
# ❌ Risky - May fail if undefined
@timeout = $env.TIMEOUT
@retries = $env.MAX_RETRIESCallExternalService[
circuit_breaker_threshold=0.5,
circuit_breaker_window=100
] (
0 -> UseCache -> NotifyCircuitOpen,
1 -> ProcessResponse
)FetchFromPrimary (
0 -> FetchFromBackup (
0 -> UseStaleCache,
1 -> ContinueFlow
),
1 -> ContinueFlow
)SplitRequest ->
ServiceA || ServiceB || ServiceC ->
AggregateResponsesStep1 (
0 -> CompensateStep1,
1 -> Step2 (
0 -> CompensateStep2 -> CompensateStep1,
1 -> Step3 (
0 -> CompensateStep3 -> CompensateStep2 -> CompensateStep1,
1 -> Complete
)
)
)@base_delay = 1.0
CallUnreliableService * 3 [
retry_delay=$base_delay,
backoff_strategy="exponential"
] (
0 -> NotifyFailure,
1 -> ProcessResponse
)Test individual events before building complex pipelines:
# Test event in isolation
ProcessData
# Then add complexity
LoadData -> ProcessData
# Then add error handling
LoadData -> ProcessData (
0 -> HandleError,
1 -> Continue
)# During development
MAP<ProcessItem>[batch_size=1, concurrent=false]
# In production
MAP<ProcessItem>[batch_size=100, concurrent=true]# ✅ Good
@api_timeout_seconds = 30
@max_retry_attempts = 3
# ❌ Bad
@t = 30
@r = 3# Step 1: Validate incoming data
ValidateInput ->
# Step 2: Process in parallel (CPU-bound)
{TransformA || TransformB || TransformC}[executor="ProcessExecutor"] ->
# Step 3: Aggregate and persist
AggregateResults -> SaveToDatabase@counter = 0
IncrementCounter # Cannot modify @counter from eventWhy: Variables are immutable configuration, not state.
@mode: "CFG"
A -> B -> A # Executes A→B→A once, not infinite loopWhy: Each execution is discrete; no automatic cycling.
MAP<FILTER<Process>> # InvalidWhy: Meta Events are monadic; nesting is semantically unclear.
Fix: Chain them:
MAP<Transform> |-> FILTER<Validate>
@timeout = $env.TIMEOUT # May fail if TIMEOUT is undefinedFix: Use null coalescing:
@timeout = $env.TIMEOUT ?? 30# May overwhelm resources
MAP<HeavyTask>[concurrent=true, max_concurrency=1000]Fix: Set reasonable limits:
MAP<HeavyTask>[concurrent=true, max_concurrency=10]| Operator | Name | Example | Description |
|---|---|---|---|
-> |
Sequential | A -> B |
Execute B after A |
| ` | ->` | Pipe Result | `A |
| ` | ` | Parallel | |
* |
Retry | A * 3 |
Retry A up to 3 times |
() |
Conditional | A (0 -> B, 1 -> C) |
Branch based on descriptor |
{} |
Grouping | {A -> B} |
Group events (sub-engine) |
[] |
Attributes | A[retries=3] |
Configure event |
?? |
Null Coalescing | $env.VAR ?? 5 |
Provide default value |
? : |
Ternary | $x == 5 ? A : B |
Conditional assignment |
| Mode | Input | Output | Use Case |
|---|---|---|---|
MAP |
Collection | Collection | Transform each item |
FILTER |
Collection | Subset | Keep matching items |
REDUCE |
Collection | Single value | Aggregate items |
FOREACH |
Collection | Original | Side effects only |
FLATMAP |
Collection | Flattened | Transform and flatten |
FANOUT |
Any | Collection | Broadcast to N instances |
| Code | Meaning |
|---|---|
0 |
Failure/Error |
1 |
Success |
2-9 |
Custom conditions (defined by your events) |
| Operator | Meaning |
|---|---|
== |
Equal to |
!= |
Not equal to |
> |
Greater than |
< |
Less than |
>= |
Greater than or equal |
<= |
Less than or equal |
| Directive | Values | Default | Description |
|---|---|---|---|
@mode |
"DAG", "CFG", "strict", "flexible"
|
"CFG" |
Control event reuse |
@recursive-depth |
Number | 1000 | Max nesting depth (deprecated) |
| Attribute | Type | Example | Description |
|---|---|---|---|
executor |
String | "ThreadPoolExecutor" |
Execution engine |
retry_attempts |
Integer | 3 |
Number of retries |
result_evaluation_strategy |
String | "ALL_MUST_SUCCEED" |
Success criteria |
executor_config |
String | "{'host':'localhost'}" |
Executor configuration |
Meta-Level:
-
batch_size- Items per batch -
concurrent- Enable parallel execution -
max_concurrency- Max parallel tasks -
count- Number of instances (FANOUT) -
initial_value- Starting value (REDUCE) -
collection- Input collection -
partial_success- Allow partial failures
Template-Level:
-
retry_attempts- Retry count per instance -
executor- Executor type
| Namespace | Example | Description |
|---|---|---|
local:: |
local::ProcessData |
Local project (default) |
pypi:: |
pypi::ValidateEmail |
Python Package Index |
github:: |
github::SendNotification |
GitHub repositories |
| Syntax | Example | Description |
|---|---|---|
@var = value |
@retries = 3 |
Declare variable |
$var |
$retries |
Reference variable |
$env.VAR |
$env.TIMEOUT |
Access environment variable |
$var ?? default |
$env.TIMEOUT ?? 30 |
Null coalescing |
condition ? A : B |
$x == 5 ? 10 : 20 |
Ternary expression |
This tutorial covers the Pointy Language syntax. To build and deploy complete workflows, you'll need to:
-
Learn the Volnux Runtime - Refer to the Volnux Runtime Documentation
-
Define Events - Implement your event logic in Python:
from volnux import Event class FetchData(Event): def process(self, *arg, **kwargs): # Your event logic here return True, result # Or with a function from volnux.decorators import event @event(name="FetchData") def fetch_data(self, *args, **kwargs): # your event logic return True, result
-
Configure Runtime - Set up
config.pyfor your deployment -
Test and Deploy - Use Volnux CLI tools
Learn to implement events that integrate with Pointy workflows:
- Input/output type definitions
- Error handling and descriptors
- Integration with external systems
- Testing individual events
- Workflow Versioning - Handle breaking changes in deployed workflows
- Performance Optimization - Tune batch sizes and concurrency
- Monitoring and Alerting - Leverage OpenTelemetry integration
- Custom Executors - Build specialized execution engines
- Event Registry - Publish and share events via namespaces
Congratulations! You now have a comprehensive understanding of Pointy Language, from basic syntax to advanced Meta Events. You've learned:
✅ Core operators (->, |->, ||, *)
✅ Conditional branching with descriptors
✅ Variables and environment configuration
✅ Advanced directives (DAG/CFG modes)
✅ Meta Events for data-driven workflows
✅ Namespace system for event reuse
✅ Best practices and common patterns
Next Steps:
- Try the Quick Start example
- Build a simple workflow from the examples
- Read the Volnux Runtime documentation
- Join the community
Happy orchestrating! 🎯
Pointy Language is part of the Volnux workflow orchestration framework.
Version: 1.2.0 | Last Updated: December 2025