• ¶

    raftjs

    This annotated source is a single page containing almost all of the source code in the raftjs project.

    A bolded filename.js above a horizontal rule in the left column next to /** filename.js */ in the right-side column indicates the beginning of a new file.

    The annotated files are presented in reverse-dependency order, starting from the entry point to the raftjs application: cli/index.js. For those more interested in seeing the protocol implementation, server/index.js and server/state/*.js are the best places to start.

    References to the Raft paper are indicated with a quote containing a section sign (§) and number (e.g. “5.2”), and are italicized.

    Generally speaking, most of the annotations highlight areas of the code that closely correspond to writing in the Raft paper. Other parts of the code, such as the TCP transport, which are useful within the raftjs project but are not essential to the Raft protocol, are less heavily annotated.

  • ¶

    cli/index.js


    /** cli/index.js*/
  • ¶
    import * as yargs from 'yargs';
    import * as command from './command';
  • ¶

    Can be used by a CLI program to start a single raftjs server.

    export function execute(args) {
        yargs.strict()
            .demandCommand()
            .command(command.start)
            .parse(args);
    }
    export default { execute };
  • ¶

    cli/command/index.js


    /** cli/command/index.js*/
  • ¶

    Re-exports raftjs server CLI commands.

    export { default as start } from './start';
  • ¶

    cli/command/start/index.js


    /** cli/command/start/index.js*/
  • ¶

    A yargs CLI command module for the raftjs server start command.

    import handler from './handler';
    import builder from './builder';
    export default {
        builder,
        command: 'start',
        description: 'Start RaftJS server.',
        handler
    };
  • ¶

    cli/command/start/builder.js


    /** cli/command/start/builder.js*/
  • ¶

    A yargs builder for the raftjs server start command that validates that a --config-file argument is present and references an existing file.

    import * as fs from 'fs';
    export default function builder(yargs) {
        return yargs.strict()
            .option('config-file', {
            coerce: function (path) {
                if (!fs.existsSync(path))
                    throw new Error(`Config file ${path} does not exist`);
                return path;
            },
            required: true,
            type: 'string'
        });
    }
  • ¶

    cli/command/start/handler.js


    /** cli/command/start/handler.js*/
  • ¶

    A yargs command handler for the raftjs server start command that parses a --config-file and starts a raftjs server.

    import { createServer } from '../../../server';
    import { parseConfigFile } from './config-file-parser';
    export default function handler(argv) {
        const serverOptions = parseConfigFile(argv['config-file']), server = createServer(serverOptions);
        server.start();
        function exit() {
            server.stop();
        }
        process.on('SIGINT', exit);
        process.on('SIGTERM', exit);
    }
  • ¶

    cli/command/start/config-file-parser.js


    /** cli/command/start/config-file-parser.js*/
    import * as fs from 'fs';
    import * as path from 'path';
    import * as logger from '../../../logger';
    import { createEndpoint } from '../../../net';
  • ¶

    A config file parser that accepts a file path, validates the contents, and returns a configuration object to be used for creating and starting a raft server. An example config file is:

    {
      "cluster": {
        "servers": {
          "server-a": { "host": "127.0.0.1", "port": 9080 },
          "server-b": { "host": "127.0.0.1", "port": 9081 },
          "server-c": { "host": "127.0.0.1", "port": 9082 }
        }
      },
      "dataDir": "../data/server-a",
      "id": "server-a",
      "logger": {
        "level": "debug",
        "pretty": true
      }
    }
    export function parseConfigFile(configFile) {
        const config = JSON.parse(fs.readFileSync(configFile).toString());
        if (!config['id'])
            throw new Error(`Config is missing required key 'id'.`);
        const id = config['id'];
        if (!config['cluster'])
            throw new Error(`Config is missing required key 'cluster'.`);
        const cluster = parseCluster(config['cluster']);
        if (!(id in cluster.servers))
            throw new Error(`Config defines a server 'id' not present in 'cluster.servers'.`);
        if (!config['dataDir'])
            throw new Error(`Config is missing required key 'dataDir'.`);
        const dataDir = parseDataDir(configFile, config['dataDir']);
        const logger = parseLogger(config['logger']);
        return {
            cluster,
            dataDir,
            id,
            logger
        };
    }
  • ¶

    Parses the cluster configuration.

    function parseCluster(config) {
        if (!config['servers'])
            throw new Error(`Config is missing required key 'cluster.servers'.`);
        const servers = parseClusterServers(config['servers']);
        return {
            servers
        };
    }
  • ¶

    Parses the cluster members.

    function parseClusterServers(config) {
        const result = {};
        return Object.keys(config).reduce(function (result, key) {
            result[key] = createEndpoint({
                host: config[key]['host'],
                port: config[key]['port']
            });
            return result;
        }, result);
    }
  • ¶

    Parses the absolute path to the directory where persistent state such as the current vote and term are stored.

    function parseDataDir(configFile, dataDir) {
        if (path.isAbsolute(dataDir)) {
            return dataDir;
        }
        else {
            return path.resolve(path.dirname(configFile), dataDir);
        }
    }
  • ¶

    Parses the logger level.

    function parseLogLevel(level) {
        if (!(typeof level == 'string'))
            throw new Error("Config 'logger.level' must be a string: " + level);
        if (!['fatal', 'error', 'warn', 'info', 'debug', 'trace'].includes(level))
            throw new Error("Config 'logger.level' must be one of [fatal, error, warn, info, debug, trace]: " + level);
        return level;
    }
  • ¶

    Parses the logger machine-readable flag.

    function parseLogMachineReadable(machineReadable) {
        if (!(typeof machineReadable == 'boolean'))
            throw new Error("Config 'logger.machineReadable' must be a boolean: " + machineReadable);
        return machineReadable;
    }
  • ¶

    Parses the logger configuration.

    function parseLogger(config) {
        if (config && !(typeof config == 'object'))
            throw new Error("Config 'logger' must be an objct: " + config);
        const level = config && config['level']
            ? parseLogLevel(config['level'])
            : 'error';
        const machineReadable = config && config['machineReadable']
            ? parseLogMachineReadable(config['machineReadable'])
            : false;
        return logger.createLogger({
            level,
            machineReadable
        });
    }
  • ¶

    server/index.js


    /** server/index.js*/
  • ¶

    The central component of the raftjs project, Server, listens on a socket for RPC commands from other Server instances and transitions between follower, candidate and leader states.

    import * as path from 'path';
    import { createDurableInteger, createDurableString } from '../storage';
    import { createLog } from '../log';
    import { createLogger } from '../logger';
    import { isMessage, isRequest, isResponse } from '../message';
    import { isEndpoint } from '../net';
    import { createRpcService } from '../rpc';
    import { createTimer } from './timer';
    import { createState } from './state';
    class Server {
        constructor(options) {
            this._cluster = options.cluster;
            this.electionTimer = options.electionTimer;
            this.endpoint = options.cluster.servers[options.id];
            this.id = options.id;
            this.log = options.log;
            this.logger = options.logger;
  • ¶

    noop is not a state specified by the Raft protocol. It is used here as a “null object” to ensure that Server can always call enter and exit on this._state.

            this._state = createState('noop', null);
            this.rpcService = options.rpcService;
            this._term = options.term;
            this._vote = options.vote;
        }
        get cluster() {
            return this._cluster;
        }
        get state() {
            return this._state;
        }
  • ¶

    The receive method can be used to register a receiver of RPC requests from other Raft Server instances.

        receive(receiver) {
            return this.rpcService.receive(receiver);
        }
  • ¶

    RPC requests can be sent to other Raft Server instances with the send method.

        send(arg0, arg1 = null) {
            let message;
            let endpoints;
            if (isMessage(arg0)) {
                endpoints = Object.keys(this.cluster.servers)
                    .filter(serverId => serverId != this.id)
                    .map(serverId => this.cluster.servers[serverId]);
                message = arg0;
            }
            else {
                if (arg0 instanceof Array) {
                    endpoints = arg0;
                }
                else if (isEndpoint(arg0)) {
                    endpoints = [arg0];
                }
                message = arg1;
            }
            if (isRequest(message)) {
                return this.rpcService.send(endpoints, message);
            }
            else if (isResponse(message)) {
  • ¶

    Before responding to an RPC request, the recipient Server updates persistent state on stable storage.

    §5. “…(Updated on stable storage before responding)…”
    §5.6 “…Raft’s RPCs…require the recipient to persist…”

                return this.updatePersistentState()
                    .then(() => this.rpcService.send(endpoints, message));
            }
        }
  • ¶

    After the Server loads and initializes its term and vote, and binds to a socket for RPC calls, it transitions to the follower state:

    §5. “…When servers start up, they begin as followers…”

        start() {
            this.logger.info(`Starting Raftjs server ${this.id}`);
            this.logger.debug('Loading persistent state');
            return Promise.all([
  • ¶

    The current term is…

    §5. “…initialized to zero on first boot…”

                this._term.read(null)
                    .then((function (value) {
                    if (value == null) {
                        this.logger.debug('Term was not found on persistent storage; setting to zero');
                        this.term = 0;
                    }
                }).bind(this)),
                this._vote.read()
            ])
                .then(() => {
                this.logger.debug('Starting RPC service');
                return this.rpcService.listen(this.endpoint);
            })
                .then(() => {
                this.logger.debug('Transitioning to follower');
                this.transitionTo('follower');
            })
                .then(() => this.logger.info(`Started Raftjs server ${this.id}`));
        }
        stop() {
            this.logger.info(`Stopping Raftjs server ${this.id}`);
            this.logger.debug('Exiting current state');
            this.state.exit();
            this.logger.debug('Stopping RPC service');
            return this.rpcService.close()
                .then(() => this.logger.info(`Stopped Raftjs server ${this.id}`));
        }
  • ¶

    A Server transitions between multiple states: follower, candidate, and leader. The state design pattern is used here to facilitate separating the rules of these states into separate components while allowing those components access to Server data and methods.

        transitionTo(state) {
            const newState = typeof state == 'string' ? createState(state, this) : state;
            if (this._state.type == newState.type)
                return;
            this._state.exit();
            this._state = newState;
            this._state.enter();
        }
  • ¶

    The term is used by a Server in a cluster to determine if it is ahead or behind of another Server in the same cluster.

    §5. “…latest term server has seen…” §5.1. “…Raft divides time into terms…”

        get term() {
            return this._term.value;
        }
  • ¶

    When the term is updated, it is not immediately persisted because, as the Raft paper says, the term is part of persistent state that is:

    §5. “…(Updated on stable storage before responding)…”
    to RPC requests.

        set term(newTerm) {
            this._term.value = newTerm;
  • ¶

    The Server vote is the candidate the Server voted for in the current term…

    §5. “…or null if none…” When a Server enters a new election, it has not yet voted for a candidate, so its vote is set here to null.

            this.vote = null;
        }
  • ¶

    Persistent state is data read from and written to stable storage (i.e. a disk).

    §5. “…Persistent state on all servers…”

        updatePersistentState() {
            return this.log.write()
                .then(() => this._term.write())
                .then(() => this._vote.write());
        }
  • ¶

    The Server vote is the… §5. “…candidateId that received vote in current term…”

        get vote() {
            return this._vote.value;
        }
  • ¶

    When the vote is updated, it is not immediately persisted because, as the Raft paper says, the vote is part of persistent state that is…

    §5. “…(Updated on stable storage before responding)…”
    …to RPC requests.

        set vote(candidateId) {
            this._vote.value = candidateId;
        }
    }
  • ¶

    The createServer method produces a Server configured by the provided options.

    export function createServer(options) {
        let term;
        if ('term' in options) {
            term = options.term;
        }
        else if ('dataDir' in options) {
            term = createDurableInteger(path.join(options.dataDir, 'term'));
        }
        else {
            throw new Error('Must supply either term or dataDir');
        }
        let vote;
        if ('vote' in options) {
            vote = options.vote;
        }
        else if ('dataDir' in options) {
            vote = createDurableString(path.join(options.dataDir, 'vote'));
        }
        else {
            throw new Error('Must supply either vote or dataDir');
        }
        return new Server({
            cluster: options.cluster,
            electionTimer: options.electionTimer || createTimer(),
            id: options.id,
            log: options.log || createLog(),
            logger: options.logger || createLogger(),
            rpcService: options.rpcService || createRpcService(),
            term,
            vote
        });
    }
  • ¶

    server/state/index.js


    /** server/state/index.js*/
    import { compilerError } from '../../util/compiler-error';
    import { createNoopState } from './noop';
    import { createLeaderState } from './leader';
    import { createCandidateState } from './candidate';
    import { createFollowerState } from './follower';
  • ¶

    createState is a convenience function for creating State implementations by name.

    export function createState(stateType, server) {
        switch (stateType) {
            case 'candidate':
                return createCandidateState(server);
            case 'follower':
                return createFollowerState(server);
            case 'leader':
                return createLeaderState(server);
            case 'noop':
                return createNoopState();
            default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                return compilerError(stateType);
        }
    }
  • ¶

    server/state/follower.js


    /** server/state/follower.js*/
    import * as RequestVote from '../../message/request-vote';
    import { BaseState } from './base';
  • ¶

    Followers:

    §5. “…Respond to RPC requests from candidates and leaders…”

    Followers remain in that state until either:

    §5. “…an election timeout elapses without receiving AppendEntries RPC from curren leader or granting vote to candidate…”

    class FollowerState extends BaseState {
        constructor(server) {
            super(server, 'follower');
            this.onAppendEntriesRequest1 = this.onAppendEntriesRequest1.bind(this);
            this.onRequestVoteRequest1 = this.onRequestVoteRequest1.bind(this);
            this.onTimeout = this.onTimeout.bind(this);
        }
        enter() {
            super.enter();
            super.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'append-entries',
                callType: 'request',
                notify: this.onAppendEntriesRequest1
            }));
            super.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'request-vote',
                callType: 'request',
                notify: this.onRequestVoteRequest1
            }));
            this.server.electionTimer.on('timeout', this.onTimeout);
            this.server.logger.debug(`Starting election timer with timeout ${this.server.electionTimer.timeout}ms`);
            this.server.electionTimer.start();
        }
        exit() {
            this.server.electionTimer.stop();
            this.server.electionTimer.off('timeout', this.onTimeout);
            super.exit();
        }
  • ¶

    One of the conditions for a follower resetting its election timer is:

    §5. “…receiving AppendEntries RPC from current leader…”

        onAppendEntriesRequest1(endpoint, message) {
            this.server.electionTimer.reset();
        }
  • ¶
        onRequestVoteRequest1(endpoint, message) {
            const { term: currentTerm, vote: currentVote, electionTimer } = this.server,
  • ¶

    A receiver of RequestVote RPC will:

    §5. “…reply false if term < currentTerm…”

            voteGranted = message.arguments.term >= currentTerm
                && (currentVote == null || currentVote == message.arguments.candidateId);
            if (voteGranted) {
                this.server.logger.trace(`Granting vote request from server ${endpoint.toString()}`);
            }
            else {
                this.server.logger.trace(`Denying vote request from server ${endpoint.toString()}`);
            }
            this.server.send(endpoint, RequestVote.createResponse({
                term: currentTerm,
                voteGranted
            })).then(function () {
  • ¶

    One of the conditions for a follower resetting its election timer is:

    §5. “…granting vote to candidate…”

                if (voteGranted) {
                    electionTimer.reset();
                }
            });
        }
  • ¶

    When the election timeout elapses without the follower receiving either an AppendEntries RPC from the leader or granting a vote to a candidate, the follower begins an election by converting to a candidate.

    §5.1 * “If a follower receives no communication, it becomes a candidate and initiates an election.”

        onTimeout() {
            this.server.logger.debug('Timer elapsed; transitioning to candidate');
            this.transitionTo('candidate');
        }
    }
  • ¶
    export function createFollowerState(server) {
        return new FollowerState(server);
    }
  • ¶

    server/state/candidate.js


    /** server/state/candidate.js*/
    import * as RequestVote from '../../message/request-vote';
    import { BaseState } from './base';
  • ¶

    A candidate:

    §5.1 “…votes for itself and issues RequestVote RPCs in parallel…”

    A candidate continues in this state until either:

    §5.1 “…(a) it wins the election…”
    §5.1 “…(b) another server establishes itself as leader…”
    §5.1 “…(c) a period of time goes by with no winner…”

    class CandidateState extends BaseState {
        constructor(server) {
            super(server, 'candidate');
            this.onTimeout = this.onTimeout.bind(this);
        }
  • ¶

    Upon transitioning from a follower to a candidate, a candidate immediately starts an election.

    §5. “…On conversion to candidate, start election…”

        enter() {
            super.enter();
            super.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'append-entries',
                callType: 'request',
                notify: this.onAppendEntriesRequest1.bind(this)
            }));
            super.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'request-vote',
                callType: 'response',
                notify: this.onRequestVoteResponse.bind(this)
            }));
            this.server.electionTimer.on('timeout', this.onTimeout);
            this.startElection();
        }
        exit() {
            this.server.electionTimer.stop();
            this.server.electionTimer.off('timeout', this.onTimeout);
            super.exit();
        }
        incrementTerm() {
            const nextTerm = this.server.term + 1;
            this.server.logger.trace(`Incrementing term to ${nextTerm}`);
            this.server.term = nextTerm;
        }
  • ¶

    A candidate obtains a majority when it receives (# servers / 2) + 1 votes.

        isMajorityObtained() {
            const numServers = Object.keys(this.server.cluster.servers).length;
            const majority = Math.floor(numServers / 2) + 1;
            return this.serverVotes.size >= majority;
        }
  • ¶

    When a candidate receives an AppendEntries RPC request from a leader with a term greater or equal to its own, it converts to a follower.

    §5. “…If AppendEntries RPC received…”
    §5.2. “…While waiting for votes…”

        onAppendEntriesRequest1(endpoint, message) {
            if (message.arguments.term >= this.server.term) {
                this.server.logger.trace(`Received append-entries request from ${endpoint.toString}; transitioning to follower`);
                this.transitionTo('follower');
            }
        }
  • ¶
        onRequestVoteResponse(endpoint, message) {
            if (!message.results.voteGranted)
                return;
            this.tallyVote(endpoint);
        }
  • ¶

    When the election timeout elapses, a candidate starts a new election. This occurs when a candidate neither obtains a majority of votes from followers nor receives an AppendEntries RPC from the leader.

    §5. “…If election timeout elapses…”
    §5.2. “…third possible outcome…”

        onTimeout() {
            this.server.logger.trace('Timer elapsed; restarting election');
            this.startElection();
        }
  • ¶
        requestVotes() {
            this.server.logger.trace('Requesting votes from other servers');
            const lastLogIndex = this.server.log.getLastIndex();
            this.server.send(RequestVote.createRequest({
                candidateId: this.server.id,
                lastLogIndex,
                lastLogTerm: this.server.log.getEntry(lastLogIndex).term,
                term: this.server.term
            }));
        }
  • ¶

    After transitioning to a candidate, the server increments its current term, votes for itself, resets the election timer, and requests votes from all other servers. §5.2. “…To begin an election…”

        startElection() {
            this.server.logger.trace('Starting election');
            this.serverVotes = new Set();
  • ¶

    §5.2. “…increments its term…”

            this.incrementTerm();
  • ¶

    §5.2. “…votes for itself…”

            this.voteForSelf();
  • ¶

    §5. “…reset election timer…”

            this.server.logger.trace(`Resetting election timer with timeout ${this.server.electionTimer.timeout}ms`);
            this.server.electionTimer.reset();
  • ¶

    §5.2 “…issues RequestVote RPCs…”

            this.requestVotes();
        }
  • ¶

    When a candidate has received votes from the majority of servers, it becomes the leader.

    §5. “…votes received from majority…”
    §5.2. “…a candidate wins an election if…”

        tallyVote(endpoint) {
            this.server.logger.trace(`Tallying vote received from ${endpoint.toString()}`);
            this.serverVotes.add(endpoint.toString());
            if (this.isMajorityObtained()) {
                this.server.logger.debug('Votes obtained from cluster majority; transitioning to leader');
                this.transitionTo('leader');
            }
        }
  • ¶
        voteForSelf() {
            this.server.vote = this.server.id;
            this.tallyVote(this.server.endpoint);
        }
    }
    export function createCandidateState(server) {
        return new CandidateState(server);
    }
  • ¶

    server/state/leader.js


    /** server/state/leader.js*/
    import { createRequest as createAppendEntriesRequest } from '../../message/append-entries';
    import { BaseState } from './base';
  • ¶

    Leaders:

    §5.2 “…send periodic heartbeats…to all followers…to maintain their authority”

    Leaders are also responsible for accepting request from clients and replicating log entries to followers. At the present time, this implementation does not implement those requirements.

    class LeaderState extends BaseState {
        constructor(server) {
            super(server, 'leader');
            this.sendHeartbeats = this.sendHeartbeats.bind(this);
        }
  • ¶

    Upon election:

        enter() {
            super.enter();
  • ¶

    §5 “…send initial empty AppendEntries RPCs (heartbeat) to each server…”

            this.sendHeartbeats();
  • ¶

    §5 “…repeat during idle periods to prevent election timeouts…”

            this.intervalId = setInterval(this.sendHeartbeats, 50);
        }
        exit() {
            clearInterval(this.intervalId);
            super.exit();
        }
        sendHeartbeats() {
            this.server.send(createAppendEntriesRequest({
                entries: [],
                term: this.server.term
            }));
        }
    }
    export function createLeaderState(server) {
        return new LeaderState(server);
    }
  • ¶

    server/state/base.js


    /** server/state/base.js*/
    import * as AppendEntries from '../../message/append-entries';
    import { compilerError } from '../../util/compiler-error';
  • ¶

    The base server state is not named as such in the Raft paper, but is used in the raftjs project as a way to share functionality with the named states (follower, candidate, leader).

    export class BaseState {
        constructor(server, stateType) {
            this.rpcEventListeners = new Set();
            this.server = server;
            this.onAppendEntriesRequest0 = this.onAppendEntriesRequest0.bind(this);
            this.onRequestOrResponse = this.onRequestOrResponse.bind(this);
            this.transitionTo = this.transitionTo.bind(this);
            this.type = stateType;
        }
        addRpcEventListener(rpcEventListener) {
            this.rpcEventListeners.add(rpcEventListener);
        }
        enter() {
            this.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'append-entries',
                callType: 'request',
                notify: this.onAppendEntriesRequest0
            }));
            this.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'append-entries',
                callType: 'request',
                notify: this.onRequestOrResponse
            }));
            this.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'append-entries',
                callType: 'response',
                notify: this.onRequestOrResponse
            }));
            this.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'request-vote',
                callType: 'request',
                notify: this.onRequestOrResponse
            }));
            this.addRpcEventListener(this.server.rpcService
                .receive({
                procedureType: 'request-vote',
                callType: 'response',
                notify: this.onRequestOrResponse
            }));
        }
        exit() {
            for (let rpcEventListener of this.rpcEventListeners) {
                this.rpcEventListeners.delete(rpcEventListener);
                rpcEventListener.detach();
            }
        }
  • ¶

    This method is a stub for a Raft response to an AppendEntries RPC request. At the present time, it only handles responding to heartbeats.

    §5. “…Receiver implementation:…”

        onAppendEntriesRequest0(endpoint, message) {
            this.server.send(endpoint, AppendEntries.createResponse({
  • ¶

    When another Server makes an AppendEntries RPC request with a term less than the term on this Server, the RPC request is rejected.

    §5. “…false if term < currentTerm…”

                success: message.arguments.term >= this.server.term,
                term: this.server.term
            }));
        }
        onRequestOrResponse(endpoint, message) {
            this.server.logger.trace(`Received ${message.procedureType} ${message.callType} from ${endpoint.toString()}`);
            const callType = message.callType, procedureType = message.procedureType;
            let term;
            switch (callType) {
                case 'request':
                    term = message.arguments.term;
                    break;
                case 'response':
                    term = message.results.term;
                    break;
                default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                    compilerError(callType);
                    break;
            }
            const server = this.server;
  • ¶

    Whenever Raft server’s communicate to each other, they exchange their current term, and, if one server’s term is less than anothers, it updates it’s own term to the other’s, and converts to a follower.

    §5. “…If RPC request or response contains…”
    §5.1. “…If one server’s current term is smaller…”

            if (term > server.term) {
                this.server.logger.trace(`Received a message with a term (${term}) higher than the server term (${server.term}); transitioning to follower`);
                server.term = term;
                this.transitionTo('follower');
            }
        }
        transitionTo(stateType) {
            if (stateType != this.type) {
                this.server.transitionTo(stateType);
            }
        }
    }
    export function createBaseState(server, stateType) {
        return new BaseState(server, stateType);
    }
  • ¶

    server/state/noop.js


    /** server/state/noop.js*/
  • ¶

    The “no-op” state is not part of the Raft protocol. It is used by Server as a null object to simplify guarding against null references.

    export function createNoopState() {
        return {
            enter() { },
            exit() { },
            type: null
        };
    }
  • ¶

    server/timer.js


    /** server/timer.js*/
  • ¶

    The Raft paper recommends an election timeout interval of 150–300 milliseconds:

    *§9.3 “…We recommend using a conservative election timeout such as 150–300ms…”

    export const DEFAULT_TIMEOUT_INTERVAL = [150, 300];
  • ¶

    Raft uses timers to trigger the conversion of followers to candidates, and candidates to restart elections.

    class Timer {
        constructor(options = {}) {
            this._running = false;
            this.listeners = {};
            this._running = false,
                this.timeoutChooser = options.timeoutChooser
                    ? options.timeoutChooser
                    : createTimeoutChooser({});
            this._timeout = this.timeoutChooser.choose();
            this.timeoutId = null;
        }
  • ¶

    Notify listeners about the event.

        notifyListeners(event) {
            if (this.listeners[event]) {
                for (let listener of this.listeners[event]) {
                    listener(event);
                }
            }
        }
  • ¶

    Register a listener for one or more events.

        on(events, listener) {
            if (!Array.isArray(events))
                events = [events];
            for (let event of events) {
                if (!this.listeners[event])
                    this.listeners[event] = new Set();
                this.listeners[event].add(listener);
            }
        }
  • ¶

    Deregister a listener from one or more events.

        off(events, listener) {
            if (!Array.isArray(events))
                events = [events];
            for (let event of events) {
                if (this.listeners[event])
                    this.listeners[event].delete(listener);
            }
        }
  • ¶

    Reset the timer.

        reset() {
            this.stop();
            this.start();
            this.notifyListeners('reset');
        }
        get running() {
            return this._running;
        }
  • ¶

    Start the timer.

        start() {
            if (this._running)
                return;
            this._running = true;
            this.notifyListeners('started');
            this.timeoutId = setTimeout((function () {
                this.stop();
                this.notifyListeners('timeout');
            }).bind(this), this._timeout);
        }
  • ¶

    Stop the timer.

        stop() {
            if (!this._running)
                return;
            this._running = false;
            clearTimeout(this.timeoutId);
            this.notifyListeners('stopped');
        }
        get timeout() {
            return this._timeout;
        }
    }
  • ¶

    Raft uses:

    *§5.2 “…randomized election timeouts to ensure that split votes are rare…”

    class TimeoutChooser {
        constructor(options = {}) {
            this.interval = options.interval
                ? options.interval
                : DEFAULT_TIMEOUT_INTERVAL;
        }
        /**
         * Get a random integer between the interval upper and lower bound.
         * See: https://mzl.la/2Vw9OmR
         */
        choose() {
            return Math.floor(Math.random() * (this.interval[1] - this.interval[0] + 1)) + this.interval[0];
        }
    }
    export function createTimer(options = {}) {
        return new Timer(options);
    }
    export function createTimeoutChooser(options = {}) {
        return new TimeoutChooser(options);
    }
  • ¶

    rpc/index.js


    /** rpc/index.js*/
    import { createRpcReceiverRegistry } from './receiver-registry';
    import { createDefaultCodec, createDefaultTransport } from './defaults';
  • ¶

    Encodes, sends, decodes and receives RPC messages.

    class RpcService {
        constructor(options = {}) {
            this.codec = options.codec
                ? options.codec
                : createDefaultCodec();
            this.receivers = createRpcReceiverRegistry();
            this.transport = options.transport
                ? options.transport
                : createDefaultTransport();
  • ¶

    When data is received from the underlying transport, decode the data to an RPC message and notify any receivers of the message.

            this.transport.receive({
                data: (function (endpoint, data) {
                    this.notifyReceivers(endpoint, this.codec.decode(data));
                }).bind(this),
                failure(failure) { }
            });
        }
        close() {
            return this.transport.close();
        }
        listen(endpoint) {
            return this.transport.listen(endpoint);
        }
        notifyReceivers(endpoint, message) {
            for (let receiver of this.receivers.getAll(message.procedureType, message.callType)) {
                receiver.notify(endpoint, message);
            }
        }
  • ¶

    Register to receiver messages of a particular procedure and call type, e.g. receive('append-entries', 'request', {...}).

        receive(receiver) {
            this.receivers.add(receiver);
            return {
                detach: (function () {
                    this.receivers.remove(receiver);
                }).bind(this)
            };
        }
  • ¶

    Send a message to all endpoints in parallel.

        send(endpoints, message) {
            const encoded = this.codec.encode(message);
            return Promise.all(endpoints.map((function (endpoint) {
                return new Promise((function (resolve) {
                    this.transport.send(endpoint, encoded).then(() => resolve(), () => resolve());
                }).bind(this));
            }).bind(this)));
        }
    }
    export function createRpcService(options = {}) {
        return new RpcService(options);
    }
  • ¶

    rpc/defaults.js


    /** rpc/defaults.js*/
    import { createFlatbuffersCodec } from '../codec';
    import { createTcpTransport } from '../transport';
  • ¶

    By default, RpcService uses FlatBuffers to encode and decode RPC messages to and from binary data.

    export function createDefaultCodec() {
        return createFlatbuffersCodec();
    }
  • ¶

    By default, RpcService uses TCP as its underlying data transport.

    export function createDefaultTransport() {
        return createTcpTransport();
    }
  • ¶

    rpc/receiver-registry.js


    /** rpc/receiver-registry.js*/
    import { compilerError } from '../util/compiler-error';
  • ¶

    A data structure that maintains an RpcReceiver set for each kind of RPC message.

    class RpcReceiverRegistry {
        constructor() {
            this.appendEntriesRequestReceivers = new Set();
            this.appendEntriesResponseReceivers = new Set(),
                this.requestVoteRequestReceivers = new Set(),
                this.requestVoteResponseReceivers = new Set();
        }
  • ¶

    Add the provided RpcReceiver to the registry.

        add(receiver) {
            const _procedureType = receiver.procedureType, _callType = receiver.callType;
            this.internalGetAll(_procedureType, _callType).add(receiver);
        }
  • ¶

    Get every RpcReceiver registered for the provided RPC message call or response.

        getAll(procedureType, callType) {
            const _procedureType = procedureType, _callType = callType, _receiverSet = this.internalGetAll(_procedureType, _callType);
            return _receiverSet;
        }
  • ¶

    Get a set of registered receivers based on the provided procedure and call type.

        internalGetAll(procedureType, callType) {
            const _callType = callType, _procedureType = procedureType;
            switch (_procedureType) {
                case 'append-entries':
                    switch (_callType) {
                        case 'request':
                            return this.appendEntriesRequestReceivers;
                        case 'response':
                            return this.appendEntriesResponseReceivers;
                        default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                            return compilerError(_callType);
                    }
                case 'request-vote':
                    switch (_callType) {
                        case 'request':
                            return this.requestVoteRequestReceivers;
                        case 'response':
                            return this.requestVoteResponseReceivers;
                        default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                            return compilerError(_callType);
                    }
                default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                    return compilerError(_procedureType);
            }
        }
  • ¶

    Remove the provided RpcReceiver to the registry.

        remove(receiver) {
            const _procedureType = receiver.procedureType, _callType = receiver.callType;
            this.internalGetAll(_procedureType, _callType).delete(receiver);
        }
    }
    export function createRpcReceiverRegistry() {
        return new RpcReceiverRegistry();
    }
  • ¶

    transport/index.js


    /** transport/index.js*/
    export { createTcpTransport } from './tcp';
  • ¶

    transport/tcp.js


    /** transport/tcp.js*/
    import * as net from 'net';
    import { createEndpoint } from '../net';
    import { createConnectionRegistry } from './connection-registry';
  • ¶

    A TCP transport that can accept and create TCP sockets, and tries to re-use sockets when possible.

    class TcpTransport {
        constructor(options) {
            this.endpoint = null;
            this.endpoint = null;
            this.receivers = [];
            this.server = net.createServer(this.accept.bind(this));
            this.sockets = options.sockets || createConnectionRegistry();
        }
  • ¶

    Store a socket that was produced as the result of an incoming or outgoing connection.

        accept(socket) {
            const endpoint = createEndpoint({
                host: socket.remoteAddress,
                port: socket.remotePort
            });
  • ¶

    If the socket has already been accepted, do nothing.

            if (this.sockets.has(endpoint))
                return true;
  • ¶

    Try to store the socket, and set up ‘end’ and ‘data’ handlers.

            if (this.sockets.save(endpoint, socket)) {
                socket.on('end', (function () {
                    socket.destroy();
                    this.sockets.remove(endpoint);
                }).bind(this));
                socket.on('data', (function (data) {
                    for (let receiver of this.receivers) {
                        receiver.data(endpoint, data);
                    }
                }).bind(this));
                return true;
            }
            return false;
        }
  • ¶

    When closing a transport, close all client connections and stop listening for connections on the transport endpoint.

        close() {
            const promise = new Promise((function (resolve, reject) {
                const wrappedResolve = (function () {
                    if (_clientsClosed && _serverClosed) {
                        this.endpoint = null;
                        resolve();
                    }
                }).bind(this);
                let _clientsClosed = false, _serverClosed = false;
                this.server.close(function (err) {
                    _serverClosed = true;
                    wrappedResolve();
                });
                this.sockets.removeEach(function (socket) {
                    socket.destroy();
                }, function () {
                    _clientsClosed = true;
                    wrappedResolve();
                });
            }).bind(this));
            return promise;
        }
  • ¶

    Make an outgoing TCP connection to the provided endpoint.

        connect(endpoint) {
            return new Promise((function (resolve, reject) {
                if (this.sockets.has(endpoint)) {
                    resolve(this.sockets.get(endpoint));
                }
                else {
                    const socket = net.createConnection(endpoint.port, endpoint.host, (function () {
                        if (this.accept(socket)) {
                            resolve(socket);
                        }
                        else {
                            socket.end();
                            reject('failed to save socket');
                        }
                    }).bind(this));
  • ¶

    Set up a socket error handler to capture errors such as ‘connection refused’.

                    socket.on('error', function (err) {
                        reject(err);
                    });
                }
            }).bind(this));
        }
  • ¶

    Listen for incoming TCP connections on the provided endpoint.

        listen(endpoint) {
            if (this.endpoint != null)
                return Promise.reject(`TCP transport is already listening on ${endpoint.toString()}`);
            this.endpoint = endpoint;
            return new Promise((function (resolve, reject) {
                this.server.listen(this.endpoint.port, this.endpoint.host, function () {
                    resolve();
                });
            }).bind(this));
        }
  • ¶

    Register a receiver.

        receive(receiver) {
            this.receivers.push(receiver);
        }
  • ¶

    Send data to the provided endpoint.

        send(endpoint, data) {
            return new Promise((function (resolve, reject) {
                this.connect(endpoint).then(function (socket) {
                    socket.write(data, null, function (err) {
                        if (err) {
                            reject(err);
                        }
                        else {
                            resolve();
                        }
                    });
                }, reject);
            }).bind(this));
        }
    }
    export function createTcpTransport(options = {}) {
        return new TcpTransport(options);
    }
  • ¶

    transport/connection-registry.js


    /** transport/connection-registry.js*/
    import { parseEndpoint } from '../net';
  • ¶

    A map of endpoint to connections, with methods to iterate over or remove all endpoints at once.

    export default class ConnectionRegistry {
        constructor() {
            this.connections = {};
            this.numConnections = 0;
        }
        count() {
            return this.numConnections;
        }
        get(endpoint) {
            const connectionId = endpoint.toString();
            return this.connections[connectionId];
        }
  • ¶

    Iterate over each connection, invoking an optional callback when all connectinos have been processed.

        forEach(onEach, onDone = null) {
            for (let connectionId in this.connections) {
                const endpoint = parseEndpoint(connectionId), connection = this.connections[connectionId];
                onEach(endpoint, connection);
            }
            if (onDone)
                onDone();
        }
  • ¶

    Get all registered connections.

        getAll() {
            const values = [];
            for (var key in this.connections) {
                values.push(this.connections[key]);
            }
            return values;
        }
        has(endpoint) {
            const connectionId = endpoint.toString();
            return connectionId in this.connections;
        }
        remove(endpoint) {
            const connectionId = endpoint.toString();
            delete this.connections[connectionId];
        }
  • ¶

    Remove all connections, invoking an optional callback after all connections have been removed.

        removeEach(onRemove, onDone = null) {
            this.forEach((function (endpoint, connection) {
                this.remove(endpoint);
                onRemove(connection);
            }).bind(this), function () {
                if (onDone)
                    onDone();
            });
        }
        save(endpoint, connection) {
            if (this.has(endpoint))
                return false;
            const connectionId = endpoint.toString();
            this.connections[connectionId] = connection;
            this.numConnections++;
            return true;
        }
    }
    export function createConnectionRegistry() {
        return new ConnectionRegistry();
    }
  • ¶

    codec/index.js


    /** codec/index.js*/
    export { createFlatbuffersCodec } from './flatbuffers';
  • ¶

    codec/flatbuffers.js


    /** codec/flatbuffers.js*/
  • ¶

    Encodes messages to data, decodes data to messages, using FlatBuffers.

    import { flatbuffers } from 'flatbuffers';
    import { compilerError } from '../util/compiler-error';
  • ¶

    flatbuffers_generated.ts is generated from flatbuffers.fbs during the build process. Neither file is included in the annotated source.

    import * as Schema from './flatbuffers_generated';
  • ¶
    export function createFlatbuffersCodec() {
        return { decode, encode };
    }
  • ¶

    Decode data to a message.

    function decode(data) {
        const buffer = new flatbuffers.ByteBuffer(data), schema = Schema.Message.getRootAsMessage(buffer), procedureType = schema.procedureType();
        switch (procedureType) {
            case Schema.ProcedureType.AppendEntries:
                return decodeAppendEntries(schema);
            case Schema.ProcedureType.RequestVote:
                return decodeRequestVote(schema);
            default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                return compilerError(procedureType);
        }
    }
  • ¶

    Decode data to an AppendEntries message.

    function decodeAppendEntries(schema) {
        const callType = schema.callType();
        switch (callType) {
            case Schema.CallType.Request:
                return decodeAppendEntriesRequest(schema);
            case Schema.CallType.Response:
                return decodeAppendEntriesResponse(schema);
            default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                return compilerError(callType);
        }
    }
  • ¶

    Decode data to an AppendEntries request.

    function decodeAppendEntriesRequest(schema) {
        const args = schema.arguments(new Schema.AppendEntriesArguments());
        return {
            callType: 'request',
            procedureType: 'append-entries',
            arguments: {
                entries: [],
                term: args.term()
            },
        };
    }
  • ¶

    Decode data to an AppendEntries response.

    function decodeAppendEntriesResponse(schema) {
        const results = schema.results(new Schema.AppendEntriesResults());
        return {
            callType: 'response',
            procedureType: 'append-entries',
            results: {
                success: results.success(),
                term: results.term()
            }
        };
    }
  • ¶

    Decode data to a RequestVote message.

    function decodeRequestVote(schema) {
        const callType = schema.callType();
        switch (callType) {
            case Schema.CallType.Request:
                return decodeRequestVoteRequest(schema);
            case Schema.CallType.Response:
                return decodeRequestVoteResponse(schema);
            default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                return compilerError(callType);
        }
    }
  • ¶

    Decode data to a RequestVote request.

    function decodeRequestVoteRequest(schema) {
        const args = schema.arguments(new Schema.RequestVoteArguments());
        return {
            callType: 'request',
            procedureType: 'request-vote',
            arguments: {
                candidateId: args.candidateId(),
                lastLogIndex: args.lastLogIndex(),
                lastLogTerm: args.lastLogTerm(),
                term: args.term()
            }
        };
    }
  • ¶

    Decode data to a RequestVote response.

    function decodeRequestVoteResponse(schema) {
        const results = schema.results(new Schema.RequestVoteResults());
        return {
            callType: 'response',
            procedureType: 'request-vote',
            results: {
                term: results.term(),
                voteGranted: results.voteGranted()
            }
        };
    }
  • ¶

    Encode a message to data.

    function encode(message) {
        const builder = new flatbuffers.Builder();
        let args = null, results = null, callType, procedureType;
        switch (message.procedureType) {
            case 'append-entries':
                procedureType = Schema.ProcedureType.AppendEntries;
                switch (message.callType) {
                    case 'request':
                        args = encodeAppendEntriesArguments(builder, message.arguments);
                        callType = Schema.CallType.Request;
                        break;
                    case 'response':
                        callType = Schema.CallType.Response;
                        results = encodeAppendEntriesResults(builder, message.results);
                        break;
                    default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                        compilerError(message);
                }
                break;
            case 'request-vote':
                procedureType = Schema.ProcedureType.RequestVote;
                switch (message.callType) {
                    case 'request':
                        args = encodeRequestVoteArguments(builder, message.arguments);
                        callType = Schema.CallType.Request;
                        break;
                    case 'response':
                        callType = Schema.CallType.Response;
                        results = encodeRequestVoteResults(builder, message.results);
                        break;
                    default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                        compilerError(message);
                }
                break;
            default:
  • ¶

    Used by TypeScript for exhaustiveness checks.

                compilerError(message);
        }
        Schema.Message.startMessage(builder);
        Schema.Message.addArguments(builder, args);
        Schema.Message.addCallType(builder, callType);
        Schema.Message.addProcedureType(builder, procedureType);
        Schema.Message.addResults(builder, results);
        const offset = Schema.Message.endMessage(builder);
        builder.finish(offset);
        return builder.asUint8Array();
    }
  • ¶

    Encode AppendEntries arguments to a flatbuffers offset.

    function encodeAppendEntriesArguments(builder, args) {
        Schema.AppendEntriesArguments.startAppendEntriesArguments(builder);
        Schema.AppendEntriesArguments.addTerm(builder, args.term);
        return Schema.AppendEntriesArguments.endAppendEntriesArguments(builder);
    }
  • ¶

    Encode AppendEntries results to a flatbuffers offset.

    function encodeAppendEntriesResults(builder, results) {
        Schema.AppendEntriesResults.startAppendEntriesResults(builder);
        Schema.AppendEntriesResults.addSuccess(builder, results.success);
        Schema.AppendEntriesResults.addTerm(builder, results.term);
        return Schema.AppendEntriesResults.endAppendEntriesResults(builder);
    }
  • ¶

    Encode RequestVote arguments to a flatbuffers offset.

    function encodeRequestVoteArguments(builder, args) {
        Schema.RequestVoteArguments.startRequestVoteArguments(builder);
        Schema.RequestVoteArguments.addTerm(builder, args.term);
        return Schema.RequestVoteArguments.endRequestVoteArguments(builder);
    }
  • ¶

    Encode RequestVote results to a flatbuffers offset.

    function encodeRequestVoteResults(builder, results) {
        Schema.RequestVoteResults.startRequestVoteResults(builder);
        Schema.RequestVoteResults.addTerm(builder, results.term);
        Schema.RequestVoteResults.addVoteGranted(builder, results.voteGranted);
        return Schema.RequestVoteResults.endRequestVoteResults(builder);
    }
  • ¶

    util/compiler-error.js


    /** util/compiler-error.js*/
  • ¶

    Used by TypeScript, mainly for exhaustiveness checks.

    export function compilerError(x) {
        throw new Error('Unreachable code was reached');
    }
  • ¶

    message/index.js


    /** message/index.js*/
    import * as AppendEntries_ from './append-entries';
    import * as RequestVote_ from './request-vote';
    export { AppendEntries_ as AppendEntries };
    export { RequestVote_ as RequestVote };
  • ¶

    Verify that the value is an RPC message. This utility is mainly used by TypeScript as a user-defined type guard.

    export function isMessage(message) {
        return !!message
            && message.callType
            && typeof message.callType == 'string'
            && message.procedureType
            && typeof message.procedureType == 'string';
    }
  • ¶

    Verify that the value is an RPC request. This utility is mainly used by TypeScript as a user-defined type guard.

    export function isRequest(message) {
        return isMessage(message) && message.callType === 'request';
    }
  • ¶

    Verify that the value is an RPC response. This utility is mainly used by TypeScript as a user-defined type guard.

    export function isResponse(message) {
        return isMessage(message) && message.callType === 'response';
    }
  • ¶

    message/request-vote.js


    /** message/request-vote.js*/
  • ¶

    Create a RequestVote RPC request with the provided arguments.

    export function createRequest(args) {
        return {
            callType: 'request',
            procedureType: 'request-vote',
            arguments: args
        };
    }
  • ¶

    Create a RequestVote RPC response with the provided results.

    export function createResponse(results) {
        return {
            callType: 'response',
            procedureType: 'request-vote',
            results
        };
    }
  • ¶

    message/append-entries.js


    /** message/append-entries.js*/
  • ¶

    Create an AppendEntries RPC request with the provided arguments.

    export function createRequest(args) {
        return {
            callType: 'request',
            procedureType: 'append-entries',
            arguments: args
        };
    }
  • ¶

    Create an AppendEntries RPC response with the provided results.

    export function createResponse(results) {
        return {
            callType: 'response',
            procedureType: 'append-entries',
            results
        };
    }
  • ¶

    logger.js


    /** logger.js*/
    const pino = require('pino');
  • ¶

    Create a logger.

    export function createLogger(options = {}) {
        return pino({
            level: options.level ? options.level : 'error',
            prettyPrint: 'machineReadable' in options ? !options.machineReadable : true
        });
    }
  • ¶

    log.js


    /** log.js*/
  • ¶

    This is stub for for a persistent log of entries. Currently, raftjs does not implement handling of client requests or replication of log entries from leaders to followers.

    Until those features are implemented, no implementation of a persistent entry log is required.

    class Log {
        constructor(options) {
            this.entries = [
                { term: options ? options.term : 0 }
            ];
            this.lastIndex = 0;
        }
        getEntry(index) {
            return this.entries[index];
        }
        getLastIndex() {
            return this.lastIndex;
        }
        write() {
            return Promise.resolve();
        }
    }
    export function createLog(options) {
        return new Log(options);
    }
  • ¶

    storage.js


    /** storage.js*/
    import * as fs from 'fs';
  • ¶

    Wraps a value that can be read and written to a path on disk. Can be configured with a serializer and a deserializer for transforming values read from or written to disk.

    class BaseDurableValue {
        constructor(options) {
            this.deserializer = options.deserializer;
            this._value = null;
            this.path = options.path;
            this.serializer = options.serializer;
            this.inSync = false;
        }
  • ¶

    Returns a Promise containing the deserialized contents of the path on disk, if present and non empty; otherwise, returns the defaultValue.

    If the path on disk cannot be read due, for example to insufficient permissions, a rejected Promise is returned.

        read(defaultValue = null) {
            if (this.inSync)
                return Promise.resolve(this._value);
            return new Promise((function (resolve, reject) {
                fs.readFile(this.path, (function (err, data) {
                    if (err == null || err.code == 'ENOENT') {
                        let value = null;
                        if (err != null) {
                            value = defaultValue;
                        }
                        else {
                            value = this.deserializer(data);
                        }
                        this.inSync = true;
                        this.value = value;
                        resolve(this.value);
                    }
                    else {
                        reject('Failed to read durable value: ' + err);
                    }
                }).bind(this));
            }).bind(this));
        }
  • ¶

    Returns a Promise that is fulfilled unless the value cannot be written to the disk path.

        write() {
            if (this.inSync) {
                return Promise.resolve();
            }
            return new Promise((function (resolve, reject) {
                fs.writeFile(this.path, this.serializer(this.value), { flag: 'w' }, (function (err) {
                    if (err) {
                        reject('Failed to write durable value: ' + err);
                    }
                    else {
                        resolve(this.value);
                    }
                    this.inSync = true;
                }).bind(this));
            }).bind(this));
        }
  • ¶

    Returns the currently known value. The value is null until it is read from the disk path, and is null when read is called but the disk path does not exist or is empty.

        get value() {
            return this._value;
        }
  • ¶

    Updates the value. Changes to the value are not persisted to disk until write is called.

        set value(newValue) {
            this.inSync = false;
            this._value = newValue;
        }
    }
  • ¶

    A durable value that serializes and deserializes integers.

    class DurableInteger extends BaseDurableValue {
        constructor(path) {
            super({
                deserializer(data) {
                    return parseInt(data.toString('utf-8'));
                },
                path,
                serializer(value) {
                    if (value == null) {
                        return Buffer.alloc(0);
                    }
                    else {
                        return Buffer.from(value + '');
                    }
                }
            });
        }
    }
  • ¶

    A durable value that serializes and deserializes strings.

    class DurableString extends BaseDurableValue {
        constructor(path) {
            super({
                deserializer(data) {
                    return data.toString('utf-8');
                },
                path,
                serializer(value) {
                    if (value == null) {
                        return Buffer.alloc(0);
                    }
                    else {
                        return Buffer.from(value);
                    }
                }
            });
        }
    }
    export function createDurableInteger(path) {
        return new DurableInteger(path);
    }
    export function createDurableString(path) {
        return new DurableString(path);
    }
  • ¶

    net.js


    /** net.js*/
  • ¶

    A simple wrapper around a { host, port } object, with helpful methods for equality comparisons and exporting to a string.

    class Endpoint {
        constructor({ host, port }) {
            this.host = host;
            this.port = port;
        }
        equals(endpoint) {
            return endpoint.port === this.port && endpoint.host === this.host;
        }
        toString() {
            return `${this.host}:${this.port}`;
        }
    }
  • ¶

    Create an Endpoint from a { host, port } object.

    export function createEndpoint(options) {
        return new Endpoint(options);
    }
  • ¶

    Verify that the provided object is an Endpoint. Used by TypeScript as a user-defined type guard.

    export function isEndpoint(endpoint) {
        return !!endpoint
            && typeof endpoint.host === 'string'
            && typeof endpoint.port === 'number'
            && typeof endpoint.equals === 'function';
    }
  • ¶

    Parse the provided string and return an Endpoint, if valid.

    export function parseEndpoint(value) {
        const parts = value.split(':');
        if (parts.length != 2)
            throw new Error('Invalid format; requires <host>:<port>');
        const host = parts[0], port = parseInt(parts[1]);
        if (isNaN(port) || !Number.isInteger(port))
            throw new Error('Invalid port; must be an integer');
        return createEndpoint({ host, port });
    }