HTTP server - chung-leong/zigar GitHub Wiki

In this example we're going to build a simple multi-threaded HTTP server that runs within Node. The code makes use of the Zig 0.12.0 API. You'll need that version's compiler.

Per usual, we begin by creating basic app skeleton:

mkdir http
cd http
npm init -y
npm install node-zigar
mkdir src zig

We save the following code to the zig sub-directory as server.zig:

const std = @import("std");

const listen_options = .{
    .reuse_address = true,
    .reuse_port = true,
};

pub fn startServer(ip: []const u8, port: u16) !void {
    const address = try std.net.Address.resolveIp(ip, port);
    var server = try address.listen(listen_options);
    const connection = try server.accept();
    var read_buffer: [4096]u8 = undefined;
    var http = std.http.Server.init(connection, &read_buffer);
    var request = try http.receiveHead();
    try request.respond("Hello world", .{});
}

startServer() basically creates a TCP/IP socket, places it in listening mode, then awaits an incoming connection. When one shows up, std.http.Server is used to handle it. When we receive a request from the client via receiveHeader(), we send "Hello world" and exit.

We call our Zig function from index.js:

import { startServer } from '../zig/server.zig';

startServer('127.0.0.1', 8080);

Before we can run the app, we need to enable ESM and the node-zigar loader in package.json:

  "type": "module",
  "scripts": {
    "start": "node --loader=node-zigar --no-warnings src/index.js"
  },

Now we're ready:

npm run start

It'll take a moment for the Zig code to be compiled. When the compilation indicator goes away, open a browser and go to http://127.0.0.1:8080. You should see the server's response:

Browser

After sending it the server will terminate since we didn't use any loops in our code. Obviously, that's not how a real server behaves. Our next step is to expand on this rudimentary code. We want to run it in a loop in an independent thread.

We're going to create a struct called ServerThread. It has the following fields:

const std = @import("std");

const ServerThread = struct {
    address: std.net.Address,
    listen_options: std.net.Address.ListenOptions,
    thread: ?std.Thread = null,
    server: ?*std.net.Server = null,
    connection: ?*std.net.Server.Connection = null,
    last_error: ?(std.net.Address.ListenError || std.net.Server.AcceptError) = null,

address holds the IP address and port number. listen_options holds options for listen(). thread is the thread itself. server and connection are needed for shutting down the server. last_error is as the name implies--the last error seen by the thread.

The init function for ServerThread is entirely mundane:

    pub fn init(address: std.net.Address, listen_options: std.net.Address.ListenOptions) @This() {
        return .{
            .address = address,
            .listen_options = listen_options,
        };
    }

spawn() creates the actual thread:

    pub fn spawn(self: *@This()) !void {
        var futex_val = std.atomic.Value(u32).init(0);
        self.thread = try std.Thread.spawn(.{}, run, .{ self, &futex_val });
        std.Thread.Futex.wait(&futex_val, 0);
        if (self.last_error) |err| {
            return err;
        }
    }

It uses std.Thread.Futex.wait to wait for thread to either start listening or fail.

The run function, which runs in the newly created thread, receives the pointer to futex_val and uses it to tell the main thread whether it has begun listening for requests or not:

    fn run(self: *@This(), futex_ptr: *std.atomic.Value(u32)) void {
        var listen_result = self.address.listen(self.listen_options);
        if (listen_result) |*server| {
            self.server = server;
        } else |err| {
            self.last_error = err;
        }
        futex_ptr.store(1, .release);
        std.Thread.Futex.wake(futex_ptr, 1);
        var server = self.server orelse return;
        while (true) {
            var connection = server.accept() catch |err| {
                self.last_error = switch (err) {
                    std.net.Server.AcceptError.SocketNotListening => null,
                    else => err,
                };
                break;
            };
            self.handleConnection(&connection);
        }
        self.server = null;
    }

stop() shuts down any active connection as well as the listening server socket so that the thread would stop waiting and exit from any loop it's in:

    pub fn stop(self: *@This()) void {
        if (self.connection) |c| {
            std.posix.shutdown(c.stream.handle, .both) catch {};
        }
        if (self.server) |s| {
            std.posix.shutdown(s.stream.handle, .both) catch {};
        }
    }

join() is used during server shutdown, so that main thread can safely free allocated memory:

    pub fn join(self: *@This()) void {
        if (self.thread) |thread| {
            thread.join();
        }
    }

handleConnection() handles a client connection:

    fn handleConnection(self: *@This(), connection: *std.net.Server.Connection) void {
        var read_buffer: [4096]u8 = undefined;
        var http = std.http.Server.init(connection.*, &read_buffer);
        self.connection = connection;
        while (true) {
            var request = http.receiveHead() catch {
                break;
            };
            self.handleRequest(&request) catch |err| {
                std.debug.print("{any}\n", .{err});
            };
        }
        self.connection = null;
    }

While handleRequest() handles individual requests:

    fn handleRequest(_: *@This(), request: *std.http.Server.Request) !void {
        try request.respond("Hello world", .{});
    }
};

As we don't want the JavaScript side to know anything about how the serrver is implemented, we'll define an opaque type as well as a pointer to it:

const ServerOpaque = opaque {};
const ServerOpaquePointer = *align(@alignOf(ServerThread)) ServerOpaque;

Placing server options inside a struct make sense too, since their number is bound to grow:

const ServerOptions = struct {
    ip: []const u8,
    port: u16 = 80,
};

startServer() now looks like this:

var gpa = std.heap.GeneralPurposeAllocator(.{}){};

pub fn startServer(options: ServerOptions) !ServerOpaquePointer {
    const allocator = gpa.allocator();
    const address = try std.net.Address.resolveIp(options.ip, options.port);
    const thread = try allocator.create(ServerThread);
    errdefer allocator.destroy(thread);
    thread.* = ServerThread.init(address, .{ .reuse_address = true });
    try thread.spawn();
    return @ptrCast(thread);
}

Our Zig code is using its own allocator instead of one provided by Zigar, since we want to data to stay in the same place and not get moved around by JavaScript's garbage collector.

We're also going to add stopServer(), which tells the server thread to shut down and then deallocate memory allocated for it:

pub fn stopServer(opaque_ptr: ServerOpaquePointer) void {
    const thread: *ServerThread = @ptrCast(opaque_ptr);
    thread.stop();
    thread.join();
    allocator.destroy(thread);
}

Since startServer() will no longer block when we call it, we need to modify our JavaScript code to keep Node from exiting. For this purpose we'll use readline:

import readline from 'readline/promises';
import { startServer, stopServer } from '../zig/server.zig';

const server = startServer({ ip: '127.0.0.1', port: 8080 });
const { stdin: input, stdout: output } = process;
const rl = readline.createInterface({ input, output });
while (true) {
    const cmd = await rl.question('> ');
    if (cmd === 'quit') {
        break;
    }
}
rl.close();
stopServer(server);

Our server will continue to serve "Hello world" to clients until we type "quit" into the terminal:

Terminal window

Now that we have our server thread working, we can make our server create a number of them:

const Server = struct {
    threads: []ServerThread,
    allocator: std.mem.Allocator,

    pub fn init(allocator: std.mem.Allocator, options: ServerOptions) !Server {
        const address = try std.net.Address.resolveIp(options.ip, options.port);
        const threads = try allocator.alloc(ServerThread, options.thread_count);
        errdefer allocator.free(threads);
        const listen_options = .{ .reuse_address = true };
        for (threads) |*thread| {
            thread.* = ServerThread.init(address, listen_options);
        }
        return .{ .threads = threads, .allocator = allocator };
    }

    pub fn deinit(self: @This()) void {
        self.allocator.free(self.threads);
        self.storage.deinit();
    }

    pub fn start(self: *@This()) !void {
        errdefer self.stop();
        for (self.threads) |*thread| {
            try thread.spawn();
        }
    }

    pub fn stop(self: *@This()) void {
        for (self.threads) |*thread| {
            thread.stop();
        }
        for (self.threads) |*thread| {
            thread.join();
        }
    }
};

We change the opaque pointer so it points to a Server struct instead. We also add thread_count to the ServerOptions struct:

const ServerOpaquePointer = *align(@alignOf(Server)) ServerOpaque;
const ServerOptions = struct {
    ip: []const u8,
    port: u16 = 80,
    thread_count: usize = 1,
};

startServer() and stopServer() now look like this:

pub fn startServer(options: ServerOptions) !ServerOpaquePointer {
    const allocator = gpa.allocator();
    const server = try allocator.create(Server);
    errdefer allocator.destroy(server);
    server.* = try Server.init(allocator, options);
    errdefer server.deinit();
    try server.start();
    return @ptrCast(server);
}

pub fn stopServer(opaque_ptr: ServerOpaquePointer) void {
    const allocator = gpa.allocator();
    const server: *Server = @ptrCast(opaque_ptr);
    server.stop();
    server.deinit();
    allocator.destroy(server);
}

In index.js, we set the thread count to 8:

const server = startServer({ ip: '127.0.0.1', port: 8080, thread_count: 8 });

That's it! Now we have a multi-threaded HTTP server. It's admittedly totally useless. We'll make our example a little more realistic by fetching different text from a hash table. Here's the struct that will be shared among all threads:

const ServerStorage = struct {
    const AtomicU32 = std.atomic.Value(u32);
    const Map = std.hash_map.StringHashMap([]const u8);

    map: Map,
    allocator: std.mem.Allocator,
    reader_count: AtomicU32 = AtomicU32.init(0),
    writer_count: AtomicU32 = AtomicU32.init(0),

    pub fn init(allocator: std.mem.Allocator) @This() {
        const map = Map.init(allocator);
        return .{ .allocator = allocator, .map = map };
    }

    pub fn deinit(self: *@This()) void {
        var it = self.map.iterator();
        while (it.next()) |kv| {
            self.allocator.free(kv.key_ptr.*);
            self.allocator.free(kv.value_ptr.*);
        }
        self.map.deinit();
    }

    pub fn get(self: *@This(), uri: []const u8) ?[]const u8 {
        std.Thread.Futex.wait(&self.writer_count, 1);
        _ = self.reader_count.fetchAdd(1, .acquire);
        defer {
            const count = self.reader_count.fetchSub(1, .release);
            if (count == 0) {
                std.Thread.Futex.wake(&self.reader_count, 1);
            }
        }
        return self.map.get(uri);
    }

    pub fn put(self: *@This(), uri: []const u8, text: []const u8) !void {
        // create copies
        const key = try self.allocator.dupe(u8, uri);
        errdefer self.allocator.free(key);
        const value = try self.allocator.dupe(u8, text);
        errdefer self.allocator.free(value);
        // prevent reading until operation finishes
        _ = self.writer_count.store(1, .release);
        while (true) {
            // make sure server threads are done reading
            const reader_count = self.reader_count.load(.acquire);
            if (reader_count == 0) {
                break;
            }
            std.Thread.Futex.wait(&self.reader_count, reader_count);
        }
        defer {
            _ = self.writer_count.store(0, .release);
            std.Thread.Futex.wake(&self.writer_count, std.math.maxInt(u32));
        }
        try self.map.put(key, value);
    }
};

The above code might look somewhat obtuse. It's basically a hash table protected by a read-write lock. We'll create one copy of it and distribute it to the server threads:

const Server = struct {
    threads: []ServerThread,
    allocator: std.mem.Allocator,
    storage: *ServerStorage,

    pub fn init(allocator: std.mem.Allocator, options: ServerOptions) !Server {
        const address = try std.net.Address.resolveIp(options.ip, options.port);
        const threads = try allocator.alloc(ServerThread, options.thread_count);
        errdefer allocator.free(threads);
        const listen_options = .{ .reuse_address = true };
        const storage = try allocator.create(ServerStorage);
        storage.* = ServerStorage.init(allocator);
        for (threads) |*thread| {
            thread.* = ServerThread.init(address, listen_options, storage);
        }
        return .{ .threads = threads, .storage = storage, .allocator = allocator };
    }

    pub fn deinit(self: @This()) void {
        self.allocator.free(self.threads);
        self.storage.deinit();
        self.allocator.destroy(self.storage);
    }

Adding it to ServerThread:

const ServerThread = struct {
    address: std.net.Address,
    listen_options: std.net.Address.ListenOptions,
    thread: ?std.Thread = null,
    server: ?*std.net.Server = null,
    connection: ?*std.net.Server.Connection = null,
    last_error: ?(std.net.Address.ListenError || std.net.Server.AcceptError) = null,
    storage: *ServerStorage,
    request_count: u64 = 0,

    pub fn init(
        address: std.net.Address,
        listen_options: std.net.Address.ListenOptions,
        storage: *ServerStorage,
    ) @This() {
        return .{
            .address = address,
            .listen_options = listen_options,
            .storage = storage,
        };
    }

We use the opportunity to add request_count as well. We increment the field in handleRequest(), which now obtains contents from ServerStorage:

    fn handleRequest(self: *@This(), request: *std.http.Server.Request) !void {
        self.request_count += 1;
        if (self.storage.get(request.head.target)) |text| {
            try request.respond(text, .{});
        } else {
            try request.respond("Not found", .{ .status = .not_found });
        }
    }

Then we add a function that we can call from JavaScript:

pub fn storeText(opaque_ptr: ServerOpaquePointer, uri: []const u8, text: []const u8) !void {
    const server: *Server = @ptrCast(opaque_ptr);
    return server.storage.put(uri, text);
}

And another one for obtain stats about the server:

pub fn getServerStats(allocator: std.mem.Allocator, opaque_ptr: ServerOpaquePointer) !ServerStats {
    const server: *Server = @ptrCast(opaque_ptr);
    return server.getStats(allocator);
}

We need to define ServerStats:

const ServerStats = struct {
    request_count: u64,
    threads: []const ServerThreadStats,
};
const ServerThreadStats = struct {
    request_count: u64,
};

And add a function to Server:

    pub fn getStats(self: *@This(), allocator: std.mem.Allocator) !ServerStats {
        const thread_stats = try allocator.alloc(ServerThreadStats, self.threads.len);
        for (self.threads, 0..) |thread, index| {
            thread_stats[index] = thread.getStats();
        }
        const request_count: u64 = sum: {
            var n: u64 = 0;
            for (thread_stats) |stats| {
                n += stats.request_count;
            }
            break :sum n;
        };
        return .{ .request_count = request_count, .threads = thread_stats };
    }

And ServerThread:

    pub fn getStats(self: *const @This()) ServerThreadStats {
        return .{ .request_count = self.request_count };
    }

We enable the new features in your command-line interface:

import readline from 'readline/promises';
import { getServerStats, startServer, stopServer, storeText } from '../src/server.zig';

const server = startServer({ ip: '127.0.0.1', port: 8080, thread_count: 8 });
const { stdin: input, stdout: output } = process;
const rl = readline.createInterface({ input, output });
while (true) {
    const cmd = await rl.question('> ');
    if (cmd === 'quit') {
        break;
    } else if (cmd === 'add') {
        const uri = await rl.question('Enter URI: ');
        const text = await rl.question('Enter Text: ');
        storeText(server, uri, text);
    } else if (cmd === 'stats') {
        const stats = getServerStats(server);
        console.log(stats.valueOf());
    }
}
rl.close();
stopServer(server);

When we run the server now, it'll start out with no contents at all. We have to use the add command to place some text at a particular URI:

Terminal window

After that we'll be able to access it through a browser:

Browser window

Getting stats from the server:

Terminal window

Configuring the app for deployment

Follow the same steps as described in the the hello world example. First change the import statement so it loads a Zigar module instead of a Zig file:

import { getServerStats, startServer, stopServer, storeText } from '../lib/server.zigar';

Then create node-zigar.config.json, mapping the module to the correct source file, at the same declaring the platforms you intend to support:

{
  "optimize": "ReleaseSmall",
  "sourceFiles": {
    "lib/server.zigar": "zig/server.zig"
  },
  "targets": [
    { "platform": "linux", "arch": "x64" },
    { "platform": "linux", "arch": "arm64" },
    { "platform": "linux-musl", "arch": "x64" },
    { "platform": "linux-musl", "arch": "arm64" }
  ]
}

With the config file in place, build the shared libraries using the following command:

npx node-zigar build

If you have Docker installed, run the following command to test the server in a cloud environment:

docker run --rm -it -v ./:/test -w /test --net=host node:alpine npm run start

Source dode

You can find the complete source code for this example here.

Conclusion

While reading this text the question might have crossed your mind: "What's the purpose of all this?" Node.js provides a full-featured HTTP server, after all. While the performance of JavaScript code is generally comparable to that of native code (thanks to Just-in-Time compilation), its reliance on garbage collection means that memory footprint of the server can become very high when it's under heavy load. Garbage collection also makes response time inconsistent. For this and other reasons Node applications generally would generally sit behind a reverse proxy server like Nginx in a production environment. The idea of this example is that perhaps we can replace the external server with one that runs within Node itself.

The server presented here is, of course, merely a proof-of-concept. My hope is that it'll be the starting point for a full-featured module, capable of handling the needs of actual Node applications.

As always, comments are welcomed. Please make use of the discussion forum provided by GitHub.