Generators can be powerful tools for efficient data processing. But things get tricky when we add asynchronous calls into the mix. Asynchronous generators, however, come to the rescue by handling scenarios involving promises. They come in handy for a variety of real-world scenarios. And one of those came up recently.

In my day-to-day work, I write code for a product that you can think of as an extremely complex to-do list manager. To keep our product working, we run a battery of tests in our continuous integration (CI) pipeline. The other day, a colleague of mine was writing a cleanup script for one of our end-to-end tests. The script would remove old to-do items left behind by tests.

The cleanup script makes use of two APIs. One, a search API, returns lists of to-do items, chunked into ‘pages’. The other lets us delete individual to-do items. The script finds all the to-do items with a given prefix in the name and deletes any that are older than ten minutes.

My colleague’s script looked something like this:

const TEN_MINS = 10 * 60 * 1000;
const FAILURE_THRESHOLD = 3;

const cleanUpOldToDoItems = async () => {
    let startAt = 0;
    let response;
    let finished = false;
    let failures = 0;
    let deletionPromises = [];
    const now = Date.now();
    do {
        response = await searchForTodoItems(TEST_PREFIX, startAt);
        for (const todoItem of response.values) {
            if (now - todoItem.created > TEN_MINS) {
                deletionPromises.push(
                    deleteTodoItem(todoItem.id)
                        .catch(() => { failures++; })
                );
            }
        }
        startAt += response.values.length;
        finished = response.isLast;
    } while (!finished);
    await Promise.all(deletionPromises);
    if (failures >= FAILURE_THRESHOLD) {
        reportError(
            `Failure: More than ${FAILURE_THRESHOLD} failures while deleting old todo items.`
        );
    }
}

The script defines a bunch of variables to keep track of the state. Then it dives into a loop that keeps going until we run out of ‘pages’. Inside that loop, we have another nested loop that checks the date of to-do items and deletes the old ones. While it’s doing that, it also keeps track of all the promises created. This is so we can await with Promise.all() to ensure they all complete. Along the way, it keeps track of how many of these deletion promises fail. Once they all finish, it checks to see if we have enough failures to warrant sending an error report.

Now, to be clear, this code works. And working code (even if it’s messy) always trumps elegant code that doesn’t work or hasn’t been written. But with all the loops and state-tracking variables, it takes effort to figure out what this code does. Using async generators, though, we can rearrange this code. And in the process, tell a clearer story about what it’s doing.

To get started, let’s list the important actions this code makes. It:

  1. Fetches all the to-do items with a given prefix, keeping only the old tasks;
  2. Deletes each of the old tasks;
  3. Counts the number of failed delete; and
  4. Sends an error report if there are too many failures.

Notice how we never mention pagination in that list. The details of how the search API works are not critical to our cleanup script. Hence, we can hide that messy detail away inside a function.

Fetching search results

Let’s begin by hiding the fetching and pagination details away. We’ll create a function that fetches all the to-do items that match a given prefix. And we’ll make use of async generators to do it:

async function* fetchAllToDosWithPrefix(prefix) {
    let finished = false;
    let startAt = 0;
    while (!finished) {
        const response = await searchForTodoItems(prefix, startAt);
        for (let item of response.values) {
            yield item;
        }
        finished = response.isLast;
        startAt += response.values.length;
    }
}

Note how we define our function using both the async and function* keywords. This combination tells us that the function creates an AsyncGenerator. Inside the function, we keep fetching until the API tells us we’ve reached the last page. Every time we get a chunk of data, we run through and yield each item.

In theory, async generators aren’t necessary to make this work. We could, instead, write a function that collects all the results into a big array and returns it as a promise. It might work something like this:

// Bad example. Don't do this.
async function badFetchAllToDosWithPrefix(prefix) {
    let finished = false;
    let startAt = 0;
    let values = [];
    while (!finished) {
        const response = await searchForTodoItems(prefix, startAt);
        values = values.concat(response.values);
        finished = response.isLast;
        startAt += response.values.length;
    }
    return values;
}

This approach has drawbacks, though. First, if we have a significant number of to-do items, it may consume a significant amount of memory. Second, there’s no reason to wait until we’ve got all the results before we start processing them. Building up an array of all the results slows us down.

Using async generators, though, solves both these problems. We yield results as soon as they come in. This allows us to start processing the to-do items while the next batch is still loading. This gets rid of unnecessary delays.

Filtering

Once we have our to-do items coming in, we want to keep only the old ones. This sounds a lot like the kind of task we’d use .filter() for if we had an array. Let’s see if we can make a filter function that does the same sort of thing for async generators. It might look something like this:

const filter = (predicate) => async function*(items) {
    for await (let item of items) {
        if (predicate(item)) {
            yield item;
        }
    }
}

Notice how we have a for-await-of loop, rather than a regular for-of loop? This construct allows us to iterate over an async iterable. (And async generators implement the AsyncIterable protocol). Inside the loop, we only yield values that return true when passed to our predicate function.

For our scenario, the predicate function should tell us if the to-do item is older than ten minutes. It might look like so:

const isOlderThan10Mins = (now) => ({created}) => (now - created > TEN_MINS);

We pass now as a parameter rather than calling Date.now() every time the function runs. This keeps our function pure. Keeping the function pure helps with testing and avoids certain race conditions.

Putting those together, we get a function that expects an async generator of to-do items as input. And it yields only those older than ten minutes:

const filterOutOldTodoItems = filter(isOlderThan10Mins(now));

Deleting

Once we’ve filtered out the newer to-do items, the next step is to delete the remaining ones. We do that by making another API call that returns a promise. But, if we can, we’d like to stay with our async generator abstraction. We need a way to handle this promise-based API call that lets us do that.

One option is to convert our promise into a single-value async generator. It’s analogous to how we’d convert a plain value into a single value array. For example, with arrays, we can write:

const toSingleValueArray = (val) => [val];

To convert a promise to an async generator, we’d use something like:

const fromPromise = async function*(val) { yield val; }

Using this, we can convert our delete call into an async generator:

const deleteAndYield = ({id}) => {
    return fromPromise(deleteTodoItem(id));
};

Now that we have our async generator, the question is: What do we do with it? We have a bunch of to-do items, and we want to process them one by one. That sounds a bit like a map() operation. But a plain mapping approach won’t work. If we return an async generator in a map(), we get async generators inside async generators. What we want is a function that flattens as it maps, so we don’t end up with nesting. Something comparable to .flatMap() for arrays. We can make something like that using the handy yield* keyword:

const flatMap = (func) => {
    return async function*(items) {
        for await (let item of items) {
            yield* func(item);
        }
    };
};

That yield* keyword lets us take an AsyncIterable and yield each value inside it. In other words, it does the flattening for us.

We’re now ready to combine our deleteAndYield() function with flatMap(). The result is a function that takes a collection of to-do items and deletes them one by one:

const deleteTodoItems = flatMap(deleteAndYield);

Counting failures

We now have code that will delete the old to-do items. Once we’ve done that, though, we want to count how many of the deletion calls failed. To do that, we first want to change our deleteAndYield() call a little. Our deletion API returns a success flag, along with a message. But if the underlying fetch() call fails, it will throw an error. We would like to catch that and convert it, should that happen. It might look something like this:

const deleteAndYield = ({id, name}) => fromPromise(
    deleteTodoItem(id)
        .then(({success, message}) => ({ success, name, message}))
        .catch((error) => ({ success: false, name, error: error.message}))
);

With this change, we can capture any errors and ensure that the success property is set to false. And we then pass that along with the appropriate error message.

Once we have that in place, we still need to do some counting. As mentioned earlier, we want to avoid building large arrays to store all the results. That might consume excessive memory. Instead, we can maintain a running count of failures as they occur. For this, we want a function analogous to .reduce() for arrays. It might work like so:

const reduceToPromise = (reduceFn, init) => async (items) => {
    let acc = init;
    for await (let item of items) acc = reduceFn(acc, item);
    return acc;
}

Unlike the array version of .reduce(), we can’t reduce down to any old value. This is because we’re working with async code. Once we enter the realm of Promises, we can’t leave. Hence, we call our function reduceToPromise() to reflect this.

We can use our shiny new reduceToPromise() function like so:

const countAllFailures = reduceToPromise(
    (tally, {success}) => (!success) ? tally + 1 : tally,
    0
);

Here, countAllFailures() uses reduceToPromise(). It passes a reducer function that increments the tally for failed delete operations. (That is, when success is false). The initial value for the tally is set to 0.

With countAllFailures(), we can track the number of failed delete calls. And do so without storing an excessive number of to-do items in memory.

Putting it all together

We now have all the pieces necessary to put our whole script together. One way to do that would be with function composition. But the nested brackets and whatnot might get a little confusing. Instead, we’ll make use of the pipe() function I’ve written about elsewhere. Putting all the pieces together might look like so:

const TEN_MINS = 10 * 60 * 1000;
const FAILURE_THRESHOLD = 3;
const now = Date.now();

const cleanUpOldToDoItems = async () => {
    const failureCount = await pipe(
        fetchAllToDosWithPrefix('TEST'),
        filter(isOlderThan10Mins(now)),
        flatMap(deleteAndYield),
        reduceToPromise((tally, {success}) => (!success) ? tally + 1 : tally, 0),
    );
    if (failureCount >= FAILURE_THRESHOLD) {
        reportError(
            `Failure: More than ${FAILURE_THRESHOLD} failures while deleting old to-do items.`
        );
    }
}

Looking at this, someone might object. The code we’ve written is longer (if we include all the sub-functions). In the first version, you could see everything happening at once. Here, we’ve hidden the details away. Suppose you want to know how the fetching happens. To find the details, you have to dig into fetchAllToDosWithPrefix(). If lines of code were our sole measure of quality, this code is objectively worse.

Part of the problem is that we had to write our own utility functions. If the async iterator helpers proposal goes ahead, though, we wouldn’t have to do that. Instead, we could call .filter(), .flatMap() and .reduce(), much like we do with arrays. It might look like so:

const TEN_MINS = 10 * 60 * 1000;
const FAILURE_THRESHOLD = 3;
const now = Date.now();

const cleanUpOldToDoItems = async () => {
    const failureCount = await fetchAllToDosWithPrefix('TEST')
        .filter(isOlderThan10Mins(now)),
        .flatMap(deleteAndYield),
        .reduce((tally, {success}) => (!success) ? tally + 1 : tally, 0);
    if (failureCount >= FAILURE_THRESHOLD) {
        reportError(
            `Failure: More than ${FAILURE_THRESHOLD} failures while deleting old to-do items.`
        );
    }
}

Even without async iterator helpers, this refactor is still an improvement. Our code now tells a story as we read it from top to bottom. It makes what it’s doing clearer by hiding some details away in separate functions. What’s more, the way we’ve broken it down, each function deals with a specific concern. The logic for deleting an item is no longer mixed up with the logic for searching a paginated search API. It’s easier to read, easier to test, and easier to change.