Writing our own Concurrent Thread safe Queue

Watch the video explanation ➔

Write an engineering article with a concrete timeline in Markdown format from the following transcript.

queues are everywhere you can find them in message profiles event driven system thread pools just to name a few a 9q implementation is not thread safe at all which means when multiple threads adds and removes from a queue it may lead to incorrect results and words runtime exceptions in this video we start with understanding what are concurrent cues why we need them and then we write our own concurrent threads at Q in goli to get a deeper understanding of it and then we touch upon some real world applications as well so here I'm using a cloud-based IDE called replit to code this out because it is cloud-based I do not have to set up anything locally and I just need a browser to get started replit has an amazing llm assistant named Ghostwriter which is extremely powerful because it is vertically trained on programming and software development knowledge base and the best part is it spits out answers and suggestions in the context of the code that you are writing apart from this replete has something called as bounties which can help you make some money on the site as a bounty companies post paid projects on web applications AI tools Discord Bots and so much more you can pick the one that interests you complete it and get paid just sign up on replied with my link in the description down below once you sign up on the home page itself you can see a section called bounties when you click on export you can see the projects that are listed there and the amount of money you can make out of that pick the one that interests you apply for it and just get started so the link for the sign up is in the description down below go through that sign up for my link and this to be honest is a great way to make some money on the site doing what you love so do check that out so now let's head back to the video so concurrent thread safe queues allows multiple threads to read and write data concurrently from it a queue typically has two operations NQ and DQ and NQ operations look something like this where you do index plus plus and then you put then then you in the array Q of index you set the element that you are trying to insert same in case of DQ operation you are picking the element from the index and then doing index minus minus it's a pretty standard array based implementation but if you look closely both of these operations index plus plus index minus minus and Q of index is equal to e are not creative at all so overall an IQ implementation is not thread safe we would write a very simple golang code to see this in action that why it is not threads if what happens what is happening behind the scenes and then we change our implementation and make it concurrent thread safe implementation using pessimistic login so let's jump into the code straight away we start with defining a straw or card concurrent concurrent Cube and in which we would have an element Q not a channel we would just have an element Q of type in 32 right and then what we do is we'll Define a function called enqueue in which we are taking an element and putting it over here uh we'll take name it as item of type in 32 and what we'll do is we do Q dot Q equal to open q dot Q of item right basically what our up and operate our NK operation would look like any operation is adding something at the end of the queue it's same as append operation which is what we are doing and then we Define our DQ operation and our DQ operation would look something like this in which a DQ operation by the way our DQ operation would be what picking the first element and throwing it out and effectively it means that a q is same as all the elements except the first element right so our DK operation would be Q dot Q is equal to Q dot Q of one colon right which means all the element except the first element I set it as my new queue but before that I would have to pick my item to be C dot Q of 0 and then return that item right and my DQ operation returns in 32 and it does not take any argument or say and I do this but here there is an edge case what if my queue is empty then Q dot Q of 0 will give me an error so I still need to check c q dot Q is equal to 0 then we raise a panic and we say cannot BQ from an empty shoe right now to test this looks like a pretty simple queue that we implemented we'll just Define one more function called size which just Returns the length of the cube that's it right and in the main function what we do we Define q1 is equal to a new instance of concurrent Q in which we would initialize our Q attribute to an in32 of size 0. and then we do q1 dot NQ I put 1 and then I put 2 I put 3 and then we do three DQ operations print Ln q1 dot DQ we do q1 dot DQ again q1 dot DQ again and let's read let's say we do it the fourth time we have to import fmt package over here life I do this I built a simple queue did three and Q operations did four DQ operations for the fourth DQ operations the Q length will be empty so we should see an exception but before that we should see one two three getting printed because we inserted one two three DQ one two three and then we should see an exception let's see if we see that we see one two and three cannot dig you from an empty queue correct so we just built a very simple queue in goli okay now what do we do we would want to see that it is not thread safe which means let's spin up multiple threads to do it so far I is equal to 0 I is less than let's say I spin up one million thread so one two three four five six I forgot one like this and I plus plus and I move my NQ operation inside the for Loop and I say q1 dot n q and I pass in my rank function and I pass it in 31. so any NQ any random value in that and we remove all other statements for now right now this means that we are inserting one million elements inside my cube right one million elements in the queue now this is a synchronous code Perfect by input run just find out in queue all of those things and my process 30 minutes right okay now let's do it in a thread so I'll spin up I write an anonymous function to do the same operation in through a go routine right and ideally we should be waiting for our go routine to complete so I'll Define a where WG weight group WG for encoding animators wge every time I spin up a new thread I do wge Dot add one and then wge Dot done over here so through this we are waiting for all the go routines to complete before we print the size of the queue as I find P dot println q1 dot size and so what we are doing is we are spinning of 1 million go routines all enqueuing into the queue and then waiting for Threads to complete and we are printing q1 dot size so what would the q1 dot size be it should be 1 million let's run and see if we get 1 million Dot when we run we do not see one million we see 958 000 what's happening here we see that the operation through which we are appending into the end of the queue is not thread safe why because it is happening that two threads got like two threads are literally writing at the same index inside the uh slice inside the array that we have because of which the final size that we would ideally expect that because we are running 1 million threads all doing NQ once we are waiting for all threads to go oh wait wait w g dot wait now we are waiting for all threads to complete and then let's run w g e dot weight all threads to complete now we run and see the size of it now let's see what happens we see 977050 it's still not one million we expect 1 million to be there but it's not 1 million it's 976 000 something something right here we see that my append operation is not thread safe right this is the problem my queue the queue that we built a knife Q is not thread sir because as soon as multiple threads try to enqueue from that queue a lot of them wrote at the exact same location they were not added after that so we need to make it thread safe how do we do it right the way we do it is by using mutexes the key root cause the root cause of this problem is that this operation is not Atomic in nature how do we make it Atomic if it is not Atomic in nature what would we do we would wrap it in a mutex this way what would happen is only one thread would be allowed to execute this one line at and even though you have 1 million threads but only one thread would be allowed to do this so what we do is we Define a mutex sync Dot mutex and before we do n Cube we do mu Dot Lock so but Q dot mu DOT log and then before my NQ exits I do defer Q dot mu dot unlock okay for folks who don't know golang defer function or whatever you write after defer will be executed when your function exits so you don't have to write it all places like it this code is equivalent to US writing Q dot mu DOT log over here right but I'm just because it's going I'm just writing default space Q dot mu dot unlock so it would always happen at the exit of my function it would release the lock which is what we need right so it is same as writing that mu dot unlock afterwards now when I run this let's see what happens now with this implementation we would see it's exactly one million it's exactly 1 million that no matter how many times I run I would see that my Q my NQ operation is happening exactly 1 million times and exactly 1 million elements are getting added at the end of the queue I should also add my Q my Q unlock and defer Q dot mu dot unlock over here so that in case someone invoke size in the middle of the operation not at the end it would still show the consistent value right so we are acquiring a lot so what we are doing is we are making any thread to acquire just to one like like no matter how many threads we have you would wait like all n minus one thread would wait one one thread is executing the critical section that's what we are doing right similarly if I add one more thing or let's say 1 million NK operations we did let's say we do one million DQ operations on this NQ and DQ and I'm dqing 1 million values and I'll have another weight group for all the DQ operations separately and w g e w g d and wge Dot weight we do w g d dot weight and then I print the size and I just need to Define wgd div stands for DQ FYI so w g t w g e and now what we should see is okay well I have to make this this thing uh C dot mu dot nice I Q dot mu Dot Lock and defer Q dot mu dot unlock right now if I run and we see we should see the size to be zero no matter what because we are doing one million in key operations one will indicate operations the size eventually becomes zero right this is how we make we make our Q thread safe we wrapped what we just did is we just wrapped our NQ and DQ operations in the mutex that we have now we can have as many instances of current of basically concurrent queue no matter how many threads are trying to write into this queue there would be no quote unquote race condition per se no inconsistencies no incorrectness nothing this is how we have to write concurrent queue this is huge for a normal real world application why because in real world you do not have just one thread doing all the things you have multiple threads interacting with this queue which is where again as I always say writing multi-thread programs is easy but writing correct multi-threaded program is really difficult you have to think of it every single step what would be happening behind the scenes so that you know that the output that you would expect you are exactly getting that it's correct right correctness ensuring correctness is one of the most essential things in writing multi-threaded programs and this is how you write a concurrent queue the core idea of that is we wrap the enqueue and the DQ operation with explicit Mutual exclusive explicit mutexes this means at any given point in time only one thread would be doing any operation on the Queue making it thread safe so we are actually serializing the parallel execution that we have right again it would bring down the performance of your code but correctness is more important than performance right so you have to ensure that if user says I have inserted 1 million items you should see 1 million items in the queue we have n short time right now before we wrap up let me speak about few very interesting things where concurrent cues play a very important role first of all advantages first of all it's trade safe we see why thread safety is important and how we are ensuring that second is scalability because now it improves like it allows multiple things or multiple uh threads to run simultaneously and interact with this queue and ensuring it is correct right it's really important so whenever you are writing multi-threaded program and if there is a queue shared between them ensure that you are picking up concurrent queue you don't have to implement it every time depending on the language that you are using you might find some of the other implementations of it already there but where does it lack synchronization over it we saw one million threads trying to end you from that but only one is allowed to enqueue everyone else is just waiting for that right so there is a huge synchronization over it because of our pessimistic implementation there is a huge amount of wait time because the threads is waiting to be Moving on but you are not allowing it to do that because of the mutex that you used right so it degrades the performance but you get correctness out of your code now few real world applications for that one of the most fascinating thing is thread pools in case I'm not aware they're just a Google search away internal implementation of a threat thread tool all uses a blocking concurrent Cube because multiple concurrents running in parallel multiple threads trying to put into the thread pull get from that pretty interesting application of that and then it is heavy it is very heavily used in doing batch processing so for example you have multiple threads trying to do something they are synchronizing through this queue you put a lot of data into this queue so that you can do a batch right into a backend database for example let's say there are multi or let's say there are multiple threads which are trying to scrape a particular website they would all put the scraped web pages or the script data into this shared queue so that one of the worker or few of the workers can pick a batch of items and write it into the database now here you need parallelism you need multiple threads to be scraping the website so that it scrapes faster you take it put it into the queue and then few set of threads they are extracting from this n items at a time and writing it into the database very real world applications of concurrent cues where if it was not concurrent then you would not get correct results why because imagine you scrubbed 1 million pages but as we just saw in the example without logs without making a thread so if you could say 950 950k only so you are losing 50k Pages if if I may just translate this example as this right so concurrent queue plays a very important role into writing a into writing correct multi-threaded programs that requires queue so now you know how to write concurrent cues how to write your own from scratch and this is exactly what I wanted to cover in this one so yeah that's what that's it for this one I hope you found it interesting hope you found it amusing I really want all of you to implement concurrent cues in your favorite programming language CEC plus plus Java python pick your favorite language but implement it make it thread safe it's fascinating the world of multi-threaded programming is really fascinating really encourage you to implement one so yeah that's it for this one I'll see in the next one time Satan [Music] [Music] thank you foreign [Music]

Here's the video ⤵

Courses I teach

Alongside my daily work, I also teach some highly practical courses, with a no-fluff no-nonsense approach, that are designed to spark engineering curiosity and help you ace your career.

System Design Masterclass

A no-fluff masterclass that helps SDE-2, SDE-3, and above form the right intuition to design and implement highly scalable, fault-tolerant, extensible, and available systems.

Details →

System Design for Beginners

An in-depth and self-paced course for absolute beginners to become great at designing and implementing scalable, available, and extensible systems.

Details →

Redis Internals

A self-paced and hands-on course covering Redis internals - data structures, algorithms, and some core features by re-implementing them in Go.

Details →

Writings and Learnings

Knowledge Base



Arpit's Newsletter read by 100,000 engineers

Weekly essays on real-world system design, distributed systems, or a deep dive into some super-clever algorithm.