panyam/gocurrent
Go concurrency primitives library implementing Reader, Writer, Mapper, Reducer, FanIn, FanOut patterns with full type safety and resource management.
gocurrent
A Go library providing utilities for common concurrency patterns with customizable behavior. This package implements several concurrency primitives inspired by Rob Pike's concurrency patterns from his talk "Go Concurrency Patterns".
Installation
go get github.com/panyam/gocurrentQuick Start
import "github.com/panyam/gocurrent"
// Create a reader that generates numbers
reader := gocurrent.NewReader(func() (int, error) {
return rand.Intn(100), nil
})
defer reader.Stop()
// Read from the channel
for msg := range reader.RecvChan() {
if msg.Error != nil {
log.Printf("Error: %v", msg.Error)
continue
}
fmt.Printf("Received: %d\n", msg.Value)
}Components
Reader
A goroutine wrapper that continuously calls a reader function and sends results to a channel.
// Create a reader that reads from a data source
reader := gocurrent.NewReader(func() (string, error) {
// Your data reading logic here
return "data", nil
})
defer reader.Stop()
// Monitor for reader completion or errors
go func() {
select {
case err := <-reader.ClosedChan():
if err != nil {
log.Printf("Reader terminated with error: %v", err)
} else {
log.Println("Reader completed successfully")
}
}
}()
// Process messages
for msg := range reader.RecvChan() {
if msg.Error != nil {
log.Printf("Read error: %v", msg.Error)
break
}
fmt.Printf("Read: %s\n", msg.Value)
}Writer
A goroutine for serializing writes using a writer callback method.
// Create a writer that processes data
writer := gocurrent.NewWriter(func(data string) error {
// Your data writing logic here
fmt.Printf("Writing: %s\n", data)
return nil
})
defer writer.Stop()
// Monitor for writer completion or errors
go func() {
select {
case err := <-writer.ClosedChan():
if err != nil {
log.Printf("Writer terminated with error: %v", err)
} else {
log.Println("Writer completed successfully")
}
}
}()
// Send data to writer
writer.Send("Hello")
writer.Send("World")Mapper
Transform and/or filter data between channels.
inputChan := make(chan int, 10)
outputChan := make(chan string, 10)
// Create a mapper that converts integers to strings
mapper := gocurrent.NewMapper(inputChan, outputChan, func(i int) (string, bool, bool) {
// Return: (output, skip, stop)
return fmt.Sprintf("Number: %d", i), false, false
})
defer mapper.Stop()
// Send data
inputChan <- 42
inputChan <- 100
// Read transformed data
result := <-outputChan // "Number: 42"Reducer
Collect and reduce values from an input channel with configurable time windows. The Reducer has three type parameters:
T- the input event typeC- the intermediate collection type (where events are batched)U- the output type after reduction
Reducers use functional options for configuration:
// Simple case: Use NewIDReducer to collect events into a slice
// With all defaults (creates its own channels, 100ms flush period)
reducer := gocurrent.NewIDReducer[int]()
defer reducer.Stop()
// With custom configuration
inputChan := make(chan int, 10)
outputChan := make(chan []int, 10)
reducer := gocurrent.NewIDReducer[int](
gocurrent.WithInputChan[int, []int, []int](inputChan),
gocurrent.WithOutputChan[int, []int, []int](outputChan),
gocurrent.WithFlushPeriod[int, []int, []int](100 * time.Millisecond))
defer reducer.Stop()
// Send data
for i := 0; i < 5; i++ {
reducer.Send(i)
}
// After FlushPeriod, receive the collected batch
batch := <-outputChan // []int{0, 1, 2, 3, 4}For custom collection and reduction logic, use NewReducer directly:
// Custom reducer: collect strings into a map, reduce to a summary
type WordCount map[string]int
inputChan := make(chan string, 10)
outputChan := make(chan string, 10)
reducer := gocurrent.NewReducer[string, WordCount, string](
gocurrent.WithInputChan[string, WordCount, string](inputChan),
gocurrent.WithOutputChan[string, WordCount, string](outputChan),
gocurrent.WithFlushPeriod[string, WordCount, string](100 * time.Millisecond))
reducer.CollectFunc = func(word string, counts WordCount) (WordCount, bool) {
if counts == nil {
counts = make(WordCount)
}
counts[word]++
return counts, false // Return true to trigger immediate flush
}
reducer.ReduceFunc = func(counts WordCount) string {
return fmt.Sprintf("Counted %d unique words", len(counts))
}Custom Flush Triggers
The CollectFunc can signal when to flush by returning true as the second return value. This enables custom flush criteria beyond time-based flushing:
// Length-based flush: flush when collection reaches 100 items
reducer.CollectFunc = func(input int, collection []int) ([]int, bool) {
newCollection := append(collection, input)
shouldFlush := len(newCollection) >= 100
return newCollection, shouldFlush
}
// Custom criteria: flush when sum exceeds threshold
reducer.CollectFunc = func(input int, sum int) (int, bool) {
newSum := sum + input
shouldFlush := newSum > 1000
return newSum, shouldFlush
}
// Manual flush is also available
reducer.Flush()Pipe
Connect a reader and writer channel with identity transform.
inputChan := make(chan string, 10)
outputChan := make(chan string, 10)
// Create a pipe (identity mapper)
pipe := gocurrent.NewPipe(inputChan, outputChan)
defer pipe.Stop()
inputChan <- "hello"
result := <-outputChan // "hello"FanIn
Merge multiple input channels into a single output channel.
// Create input channels
chan1 := make(chan int, 10)
chan2 := make(chan int, 10)
chan3 := make(chan int, 10)
// Create fan-in
fanIn := gocurrent.NewFanIn[int](nil)
defer fanIn.Stop()
// Add input channels
fanIn.Add(chan1, chan2, chan3)
// Send data to different channels
chan1 <- 1
chan2 <- 2
chan3 <- 3
// Read merged output
for i := 0; i < 3; i++ {
result := <-fanIn.RecvChan()
fmt.Printf("Received: %d\n", result)
}FanOut
Distribute messages from one channel to multiple output channels.
// Create fan-out
fanOut := gocurrent.NewFanOut[string](nil)
defer fanOut.Stop()
// Add output channels
out1 := fanOut.New(nil) // No filter
out2 := fanOut.New(func(s *string) *string {
// Filter: only pass strings longer than 5 chars
if len(*s) > 5 {
return s
}
return nil
})
// Send data
fanOut.Send("hello") // Goes to out1 only
fanOut.Send("hello world") // Goes to both out1 and out2
// Read from outputs
select {
case msg := <-out1:
fmt.Printf("Out1: %s\n", msg)
case msg := <-out2:
fmt.Printf("Out2: %s\n", msg)
}Map
A thread-safe map with read/write locking capabilities.
// Create a thread-safe map
safeMap := gocurrent.NewMap[string, int]()
// Basic operations
safeMap.Set("key1", 42)
value, exists := safeMap.Get("key1")
if exists {
fmt.Printf("Value: %d\n", value)
}
// Transaction-style operations
safeMap.Update(func(m map[string]int) {
m["key2"] = 100
m["key3"] = 200
})
safeMap.View(func() {
val1, _ := safeMap.LGet("key1", false) // No lock needed in View
val2, _ := safeMap.LGet("key2", false)
fmt.Printf("Sum: %d\n", val1 + val2)
})Features
- Type Safety: All components are fully generic and type-safe
- Resource Management: Proper cleanup and lifecycle management
- Composability: Components can be easily combined
- Customizable: Configurable behavior and filtering
- Thread Safety: Built-in synchronization where needed
- Error Handling: Comprehensive error propagation with completion signaling
- Monitoring: Built-in channels for monitoring goroutine completion and errors
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the GPL License.