
Five Go Concurrency Patterns, every Go Developer must know - Pipeline
With an example implementation of E-Commerce Order Processing
This is a part of a multi-part series on Go Concurrency Patterns:
Go Concurrency Patterns - Pipeline
The Pipeline concurrency pattern is a design paradigm where:
Data flows through multiple stages, each performing a specific task.
Each stage processes input, transforms it, and passes the result to the next stage.
Stages run concurrently, improving throughput.
Scenario: E-Commerce Order Processing
In an e-commerce platform, consider the following workflow for processing customer orders:
Order Validation: Ensure all necessary details are presentk and are valid.
Inventory Check: Verify the stock availability of the ordered products.
Payment Processing: Process the payment securely with a payment gateway
Shipment Preparation: Prepare the items for shipping.
Problem
Processing above tasks sequentially can be slow, especially during peak times.
High concurrency with multiple independent goroutines can make error handling and task management complex.
Solution
So, we use the Pipeline concurrency pattern:
Each stage is responsible for a specific step in the order processing workflow.
Data (orders) flows from one stage to the next via channels.
Each stage operates concurrently, improving efficiency and scalability.
Image explaining four stages and each stage manages separate input and output channel. There are four stages and five channels
Keypoint: Each stage subroutine processes an order independently/concurrently, while the rest of the stages work on its own functionality on its own order concurrently. Hence increasing throughput. However, due to sequential and queuing nature of channel ensures each order goes through the stages in the predefined order always, ie. stage1 →stage2→stage3→stage4. for example: when stage1 validates order4, stage3 performs payment for order3 and stage4 process shipping on order1 - all at once concurrently.
Important: Each stage runs in its own goroutine, allowing multiple orders to be processed simultaneously at different stages
Channels pass orders between stages, ensuring thread-safe communication.
Implementation:
Order : This object is a subset of full Order object as it does n’t have complete order detail such as order qty and price etc. this just has some order status to represent the status of the current processing stage such as Valid, Stocked, Paid, Shipped.
validateOrder : Its the first stage of the pipeline and performs: a). Takes in an input channel to read Order object from main function b). Spinup a new goroutine to run validation in a separate goroutine. c). Performs validation and marks order status as order.Valid=true or false d). publishes validated Order to an output channel (which will be sent to the next stage in the process).
checkInventory : Its the second stage of the pipeline and performs: a). Takes in an input channel (passed from previous stage) to read Order object from previous stage b). Spinup a new goroutine to run inventory checking in a separate goroutine. c). Performs stock inventory availability check and marks order status as order.Stocked=true or false d). publishes inventory checked Order to an output channel (which will be sent to the next stage in the process).
processPayment : Its the third stage of the pipeline and performs: a). Takes in an input channel (passed from previous stage) to read Order object from previous stage b). Spinup a new goroutine to process payment in a separate goroutine. c). Process payment perhaps via a separate payment gateway and marks order status as order.Paid=true or false d). publishes Payment done Order to an output channel (which will be sent to the next stage in the process).
prepareShipment : Its the fourth and final stage of the pipeline and performs: a). Takes in an input channel (passed from previous stage) to read Order object from previous stage b). Spinup a new goroutine to prepare for shipment in a separate goroutine. c). performs shipment and marks order status as order.Shipped=true or false d). publishes shipped Order to an output channel (which will be sent to the main function again to display the result).
main: main function mostly works as the orchestrator of the order processing. For now, it just dynamically generates five order objects and publishes on the `orders` channel which will be passed on to the first stage of the pipeline. Its handled in a separate goroutine so that it will happen on a separate goroutine without blocking main function flow.
In the next step it composes order processing pipeline stages.
Order pipeline is completely loosely coupled, should there be a need for another stage, it can be easily built and added at the pipeline without impacting any other stages or processing.
Each stage takes in a input channel from previous stage and creates an output channel and returns it only to pass on to next stage. That way whole pipeline is built.
Processes the result from the channel from final stage and displays the final order status to the user.
We use unbuffered channel throught the solution so that writing to and reading from the channel is not blocked improving more throughput while maintaining the order of `order` processing.