Go concurrency model - P1

a curious engineer.
One of the cooler things about go the language is its simplicity combined with the power it provides to introduce concurrent (not parallel) processes. The goal here is to talk a little bit about some go concurrency patterns. To start simple, I created a simple go project, with just a simple function call in the main func. This has nothing to do with concurrent programming (yet).
package main
import (
"fmt"
"math/rand"
)
func getNumber(limit int) {
r := rand.Intn(limit)
fmt.Println(r)
}
func main() {
getNumber(10)
}
To introduce a function running concurrently with the main program, we introduce a simple go keyword before the function call. Now when we actually try and run the below code snippet, we wouldn’t see anything in our terminal output. This is because spawning a go routine means that go spins up a lightweight unit of execution which takes up the execution of the function, but the main function’s/goroutine’s excecution continues. This means that the execution of that function is decoupled from the main function’s execution, meaning that as soon as the main func’s execution is over, the program would exit, which happens faster than the goroutine’s completion.
package main
import (
"fmt"
"math/rand"
)
func getNumber(limit int) {
r := rand.Intn(limit)
fmt.Println(r)
}
func main() {
go getNumber(10)
}
No output:
aneeshseth@Aneeshs-MacBook-Air go-concurrency % go run main.go
aneeshseth@Aneeshs-MacBook-Air go-concurrency %
To see the output, we will add a sleep for 2 seconds to the main function, which would cause the main function to wait/sleep for 2 seconds, which will be enough for the goroutine to execute.
package main
import (
"fmt"
"math/rand"
"time"
)
func getNumber(limit int) {
r := rand.Intn(limit)
fmt.Println(r)
}
func main() {
go getNumber(10)
time.Sleep(time.Second * 2)
}
aneeshseth@Aneeshs-MacBook-Air go-concurrency % go run main.go
3
aneeshseth@Aneeshs-MacBook-Air go-concurrency %
Channels:
Channels are essentially ways for goroutines to communicate with each other. Channels can simply be thought of FIFO queues, where any data that is entered into a channel which can happen from one goroutine can be consumed by a receiever, which may live in another goroutine.
package main
import "fmt"
func insertIntoChan(current chan<- string) {
current <- "hello world"
}
func main() {
chEg := make(chan string)
go insertIntoChan(chEg)
msg := <- chEg
fmt.Println(msg)
}
The above example shows us initializing a channel, using it in another goroutine to send data to, and receiving from it in the main func (which is also a goroutine), and printing the message receieved. This is obviously just a single piece of data, but you can also send a stream of data into a channel which would be streamed to the main func in this case / the listener to that channel.
Select:
The select syntax is basically just a way to specify multiple options for actions, and the condition that is true has its code executed, a bit like the switch case syntax.
The below code has 2 channels we’re sending data to from 2 different goroutines, and whichever channel receives the message first will print the associated statement. This can be variable, since in the below output I got inserted into chaneg2: hello world a few times but then got inserted into chaneg: hello worl in my terminal once.
package main
import "fmt"
func insertIntoChan(current chan<- string) {
current <- "hello world"
}
func insertIntoChan2(current chan<- string) {
current <- "hello world"
}
func main() {
chEg := make(chan string)
chEg2 := make(chan string)
go insertIntoChan2(chEg2)
go insertIntoChan(chEg)
select {
case msg := <-chEg:
fmt.Printf("inserted into chaneg: %s", msg)
case msg2 := <-chEg2:
fmt.Printf("inserted into chaneg2: %s", msg2)
}
}
Unbuffered vs buffered channels:
Unbuffered channels are types of channels where the capacity isn’t specified on initialization, like the above examples we’ve seen so far. However, these channels constantly require a receiever to be running as data is being published to these channels. Not having a receiever on our main goroutine would mean that go would error out, below is an example:
package main
import "time"
func insertIntoChan(current chan<- string) {
current <- "hello world"
}
func main() {
chEg := make(chan string)
insertIntoChan(chEg)
time.Sleep(time.Second * 2)
// msg := <- chEg
// fmt.Println(msg)
}
Output:
aneeshseth@Aneeshs-MacBook-Air go-concurrency % go run main.go
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.insertIntoChan(...)
/Users/aneeshseth/Desktop/go-concurrency/main.go:6
main.main()
/Users/aneeshseth/Desktop/go-concurrency/main.go:12 +0x38
exit status 2
aneeshseth@Aneeshs-MacBook-Air go-concurrency %
Below is an example usage of buffered channels, where we publish data in a goroutine separately, and then listen to it in another for loop which is in another goroutine, as data is streamed in. Adding a capacity to the above code snippet that had failed would not cause the program to error out, since receievers dont need to be present at all times for buffered channels.
package main
import (
"fmt"
)
func publishData(arrChan chan<- string, arr []string) {
defer close(arrChan)
for _, letter := range arr {
arrChan <- letter
}
}
func receieveData(arrChan <-chan string) {
for letter := range arrChan {
fmt.Println(letter)
}
}
func main() {
chEg := make(chan string, 3)
arr := []string{"a", "b", "c"}
go publishData(chEg, arr)
go receieveData(chEg)
}
Done channel pattern:
The idea behind the done channel pattern is to clean up anything as per needed when we’re finished with what needs to be done by the program / error occurs.
package main
import (
"fmt"
)
func insertIntoChan(current chan<- string) {
defer close(current)
current <- "hello world"
}
func main() {
chEg := make(chan string, 1)
done := make(chan bool)
insertIntoChan(chEg)
close(done)
for {
select {
case _ = <-done:
return
case msg := <-chEg:
fmt.Printf("received msg: %s", msg)
}
}
}
In this example, all we’re trying to do is send a log msg into the chEg channel, and then close that channel. Once that channel is closed, we also close a done channel we created. We have a for select loop, where we have 2 cases, one case where if a msg is received from the chEg , and another if something is sent to the done channel. Our insertIntoChan func sends data to our chEg channel which receievs the data and that data is printed. Once that func is complete, we call close(done) , which closes the done channel, and receives to immediately return the zero value**,** which returns the for select loop and causes an exit of the program since the main func completes. In theory, the case _ = <-done: would be called when we want to clean up something within our code.
A pipeline pattern
The pipeline pattern is essentially just having multiple goroutines communicating with each other via channels. It involves a goroutine doing some work, sending data back into a separate channel, data of which is consumed in another goroutine, which also does some work and sends data back into a separate channel which is consumed in the main func.
Below is an example of this pattern:
package main
import "fmt"
func main() {
nums := []int{1, 2, 3, 4}
ch := insertInitialValues(nums)
resultChan := insertSquaredVals(ch)
for result := range resultChan {
fmt.Println(result)
}
}
func insertInitialValues(nums []int) chan int {
ch := make(chan int)
go func() {
defer close(ch)
for _, num := range nums {
ch <- num
}
}()
return ch
}
func insertSquaredVals(ch <-chan int) chan int {
sq := make(chan int)
go func() {
defer close(sq)
for val := range ch {
sq <- val * val
}
}()
return sq
}
In the above code, we first initialize an integer array with numbers, which is sent to our first function insertInitialValues.
This function is responsible for sending data into an integer channel, which is what is returned by this function. The reason this function has a goroutine inside it sending the data into the channel is because the channel we’ve created is unbuffered, meaning that we can’t have data constantly being sent to it synchronously with no one ready to receive it. Hence, we do this in a goroutine, while we have our second function insertSquaredVals which takes in the returned channel from insertInitialValues , loops over data that is coming in, squares it, and sends it to anoher channel. This channel that is returned from insertSquaredVals is read from in our main function, which prints the data thats coming in from this returned channel from the second function.
This pattern is very useful for an example situation where data is being streamed from somewhere, and needs to be sent to downstream functions for further processing as the data is initially being iterated over, in this case our nums array.
This brings to the end of some concepts of go’s concurrency model, more to come :p



