How to properly handle async operations when processing large JSON files with streaming in Node.js

I’m working on a Node.js project where I need to process a huge JSON file using streams and store data in a MariaDB database. I have a lookup table with about 50 unique entries that I check against frequently (millions of times) during processing.

To avoid hitting the database every time, I’m using a JavaScript Set to cache these values. When I find a new value that’s not in the Set, I want to insert it into the database and then add it to the Set.

The issue I’m running into is that sometimes the same value gets processed twice before the first database operation completes. This causes duplicate key errors when trying to insert the same record multiple times.

I’ve tried using async/await but I think something in my streaming setup is breaking the synchronous flow. Here’s my current approach:

const dbPool = mariadb.createPool({
    host: 'localhost',
    port: '3306',
    user: 'myuser',
    password: 'mypass'
});

export async function checkCategory(item, connection, cache) {
    let category = item.category || "Default";

    if (category && !cache.has(category)) {
        // This is where the race condition happens
        db.addCategory(category, connection)
            .then(response => {
                cache.add(category);
            });
    }

    return category;
}

export async function addCategory(categoryName, connection) {
    try {
        await connection.query(sql.categoryInsert, categoryName);
    } catch (error) {
        console.log(error);
    }
}

The main processing loop looks like this:

let categoryCache = new Set();
const fileStream = fs.createReadStream(dataFile, 'utf8');
const jsonParser = JSONStream.parse('*');

fileStream.pipe(jsonParser)
    .on('data', async (record) => {
        let recordData = [
            record.id,
            record.name,
            record.value
        ];

        let itemMap = new Map();

        for (let item of record.items) {
            let itemCategory = await helpers.checkCategory(item, conn, categoryCache);
            
            if (!itemMap.has(itemCategory)) {
                itemMap.set(itemCategory, 0);
            }
            itemMap.set(itemCategory, itemMap.get(itemCategory) + 1);
        }
    })
    .on('end', () => {
        dbPool.end();
    });

I’m wondering if the streaming nature of the processing is causing async/await to not work as expected. Maybe I need a different approach or tool for this kind of synchronous database operation?

you need to await your db calls. move cache.add(category) right after you start the insert - this prevents dupes while db is working. just be careful with async ops in loops!

You’re not awaiting the database operation in your checkCategory function - that’s creating a race condition. When multiple identical values hit before the first insert finishes, they all pass the cache.has() check at the same time.

I hit this exact problem in a similar pipeline. What fixed it was tracking pending operations. Use a Map to track ongoing database ops by their key, then check both your cache AND pending operations before starting a new insert.

Here’s the approach: keep a pendingInserts Map next to your cache Set. Before hitting the database, make sure the value isn’t in either collection. Add it to pendingInserts right when you start the operation, then move it to cache and remove from pendingInserts once the database call finishes.

This stops duplicate database calls without blocking your stream. The streaming isn’t breaking async/await - it’s just exposing a timing issue that was already lurking there.

The fire-and-forget pattern in your checkCategory function is causing this. You’re calling db.addCategory without awaiting it, so multiple identical categories slip through before any database operation finishes. I’ve hit this exact issue processing large CSV imports. Best solution I found was a simple semaphore using Promises. Create a Map with category names as keys and their insert promises as values. Before checking the database, make sure the category isn’t already being processed by checking your cache AND this pending operations Map. When you start a new insert, immediately store the promise in your Map and add the category to cache right away (optimistic caching). Once the database operation finishes, remove the promise from the Map. This stops race conditions while keeping performance good since you’re not blocking the stream unnecessarily. The streaming isn’t breaking anything - it’s just exposing the timing issue in your async flow.

This topic was automatically closed 6 hours after the last reply. New replies are no longer allowed.