I’m working on a Node.js project where I need to process a huge JSON file using streams and store data in a MariaDB database. I have a lookup table with about 50 unique entries that I check against frequently (millions of times) during processing.
To avoid hitting the database every time, I’m using a JavaScript Set to cache these values. When I find a new value that’s not in the Set, I want to insert it into the database and then add it to the Set.
The issue I’m running into is that sometimes the same value gets processed twice before the first database operation completes. This causes duplicate key errors when trying to insert the same record multiple times.
I’ve tried using async/await but I think something in my streaming setup is breaking the synchronous flow. Here’s my current approach:
const dbPool = mariadb.createPool({
host: 'localhost',
port: '3306',
user: 'myuser',
password: 'mypass'
});
export async function checkCategory(item, connection, cache) {
let category = item.category || "Default";
if (category && !cache.has(category)) {
// This is where the race condition happens
db.addCategory(category, connection)
.then(response => {
cache.add(category);
});
}
return category;
}
export async function addCategory(categoryName, connection) {
try {
await connection.query(sql.categoryInsert, categoryName);
} catch (error) {
console.log(error);
}
}
The main processing loop looks like this:
let categoryCache = new Set();
const fileStream = fs.createReadStream(dataFile, 'utf8');
const jsonParser = JSONStream.parse('*');
fileStream.pipe(jsonParser)
.on('data', async (record) => {
let recordData = [
record.id,
record.name,
record.value
];
let itemMap = new Map();
for (let item of record.items) {
let itemCategory = await helpers.checkCategory(item, conn, categoryCache);
if (!itemMap.has(itemCategory)) {
itemMap.set(itemCategory, 0);
}
itemMap.set(itemCategory, itemMap.get(itemCategory) + 1);
}
})
.on('end', () => {
dbPool.end();
});
I’m wondering if the streaming nature of the processing is causing async/await to not work as expected. Maybe I need a different approach or tool for this kind of synchronous database operation?