📚 ordered-concurrently - Awesome Go Library for Data Structures and Algorithms
Go module that processes work concurrently and returns output in a channel in the order of input.
🏷️ Data Structures and Algorithms
📂 Frameworks for performing ELT / ETL
⭐ 38 stars
Detailed Description of ordered-concurrently
Ordered Concurrently
A library for parallel processing with ordered output in Go. This module processes work concurrently / in parallel and returns output in a channel in the order of input. It is useful in concurrently / parallelly processing items in a queue, and get output in the order provided by the queue.
Usage
Get Module
go get github.com/tejzpr/ordered-concurrently/v3
Import Module in your source code
import concurrently "github.com/tejzpr/ordered-concurrently/v3"
Create a work function by implementing WorkFunction interface
// Create a type based on your input to the work function
type loadWorker int
// The work that needs to be performed
// The input type should implement the WorkFunction interface
func (w loadWorker) Run(ctx context.Context) interface{} {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
return w * 2
}
Demo
Run
Example - 1
func main() {
max := 10
inputChan := make(chan concurrently.WorkFunction)
ctx := context.Background()
output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
go func() {
for work := 0; work < max; work++ {
inputChan <- loadWorker(work)
}
close(inputChan)
}()
for out := range output {
log.Println(out.Value)
}
}
Example - 2 - Process unknown number of inputs
func main() {
inputChan := make(chan concurrently.WorkFunction, 10)
ctx := context.Background()
output := concurrently.Process(ctx, inputChan, &concurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
ticker := time.NewTicker(100 * time.Millisecond)
done := make(chan bool)
wg := &sync.WaitGroup{}
go func() {
input := 0
for {
select {
case <-done:
return
case <-ticker.C:
inputChan <- loadWorker(input)
wg.Add(1)
input++
default:
}
}
}()
var res []loadWorker
go func() {
for out := range output {
res = append(res, out.Value.(loadWorker))
wg.Done()
}
}()
time.Sleep(1600 * time.Millisecond)
ticker.Stop()
done <- true
close(inputChan)
wg.Wait()
// Check if output is sorted
isSorted := sort.SliceIsSorted(res, func(i, j int) bool {
return res[i] < res[j]
})
if !isSorted {
log.Println("output is not sorted")
}
}
Credits
- u/justinisrael for inputs on improving resource usage.
- mh-cbon for identifying potential deadlocks.