GO by example, part 4

This commit is contained in:
WieErWill 2022-01-11 14:50:17 +01:00
parent c5983176c5
commit 5d7e654fa1
10 changed files with 428 additions and 0 deletions

View File

@ -0,0 +1,36 @@
// closing a channel indicates that no more values will be sent on it
package main
import "fmt"
func main() {
// jobs channel to communicate work to be done
jobs := make(chan int, 5)
done := make(chan bool)
// repeatedly receives from jobs
// more value will be false if jobs has been closed and all values in the channel have already been received
go func() {
for {
j, more := <-jobs
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
// sends 3 jobs to the worker over the jobs channel, then closes it
for j := 1; j <= 3; j++ {
jobs <- j
fmt.Println("sent job", j)
}
close(jobs)
fmt.Println("sent all jobs")
// await the worker using the synchronization approach
<-done
}

View File

@ -0,0 +1,19 @@
package main
import "fmt"
func main() {
// iterate over values received from a channel
// here iterate over 2 values in the queue channel
queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
// range iterates over each element as its received from queue
for elem := range queue {
fmt.Println(elem)
}
// also shows that its possible to close a non-empty channel but still have the remaining values be received
}

View File

@ -0,0 +1,33 @@
package main
import (
"fmt"
"time"
)
// built-in timer and ticker features
func main() {
// timers represent a single event in the future
// tell the timer how long to wait and it provides a channel that will be notified at that time
timer1 := time.NewTimer(2 * time.Second)
// <-timer1.C blocks on the timers channel C until it sends a value indicating that the timer fired
<-timer1.C
fmt.Println("Timer 1 fired")
// to wait, you could have used time.Sleep
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 fired")
}()
// one can cancel the timer before it fires
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
// give the timer2 enough time to fire, if it ever was going to, to show it is in fact stopped
time.Sleep(2 * time.Second)
}

View File

@ -0,0 +1,31 @@
// tickers are for when you want to do something repeatedly at regular intervals
package main
import (
"fmt"
"time"
)
func main() {
// Tickers use a similar mechanism to timers: a channel that is sent values
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()
// Tickers can be stopped like timers
time.Sleep(1600 * time.Millisecond)
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}

View File

@ -0,0 +1,41 @@
package main
import (
"fmt"
"time"
)
// worker, of which will run several concurrent instances
// those workers will receive work on the jobs channel and send the corresponding results on results
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
// send workers work and collect their results
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// starts up 3 workers, initially blocked because there are no jobs
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// send 5 jobs and then close that channel
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// collect all the results of the work
for a := 1; a <= numJobs; a++ {
<-results
}
}

View File

@ -0,0 +1,37 @@
// wait for multiple goroutines to finish
package main
import (
"fmt"
"sync"
"time"
)
// function to run in every goroutine
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
// WaitGroup is used to wait for all the goroutines launched here to finish
var wg sync.WaitGroup
// Launch several goroutines and increment the WaitGroup counter for each
for i := 1; i <= 5; i++ {
wg.Add(1)
// Avoid re-use of the same i value in each goroutine closure
i := i
// Wrap the worker call in a closure that makes sure to tell the WaitGroup that this worker is done
go func() {
defer wg.Done()
worker(i)
}()
}
// Block until the WaitGroup counter goes back to 0
wg.Wait()
}

View File

@ -0,0 +1,57 @@
package main
import (
"fmt"
"time"
)
// controlling resource utilization and maintaining quality of service
func main() {
// basic rate limiting
// want to limit handling of incoming requests
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
// this limiter channel will receive a value every 200 milliseconds
// this is the regulator in the rate limiting scheme.
limiter := time.Tick(200 * time.Millisecond)
// by blocking on a receive from the limiter channel before serving each request,
// limit to 1 request every 200 milliseconds
for req := range requests {
<-limiter
fmt.Println("request", req, time.Now())
}
// allow short bursts of requests in rate limiting scheme while preserving the overall rate limit
// accomplish this by buffering the limiter channel
burstyLimiter := make(chan time.Time, 3)
// fill up the channel to represent allowed bursting
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
// every 200 milliseconds try to add a new value to burstyLimiter, up to its limit of 3
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
// simulate 5 more incoming requests
// the first 3 of these will benefit from the burst capability of burstyLimiter
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("request", req, time.Now())
}
}

View File

@ -0,0 +1,35 @@
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// atomic counters accessed by multiple goroutines
func main() {
// unsigned integer to represent our (always-positive) counter
var ops uint64
// WaitGroup will help wait for all goroutines to finish their work
var wg sync.WaitGroup
// start 50 goroutines that each increment the counter exactly 1000 times
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
for c := 0; c < 1000; c++ {
// atomically increment the counter, giving it the memory address of ops counter with the & syntax
atomic.AddUint64(&ops, 1)
}
wg.Done()
}()
}
// wait until all the goroutines are done
wg.Wait()
// its safe to access ops now because it's known no other goroutine is writing to it
fmt.Println("ops:", ops)
}

View File

@ -0,0 +1,49 @@
package main
import (
"fmt"
"sync"
)
// holds a map of counters
// add a Mutex to synchronize access
type Container struct {
mu sync.Mutex
counters map[string]int
}
func (c *Container) inc(name string) {
// lock the mutex before accessing counters
c.mu.Lock()
// unlock it at the end of the function using defer
defer c.mu.Unlock()
c.counters[name]++
}
// mutex to safely access data across multiple goroutines
func main() {
c := Container{
// the zero value of a mutex is usable as-is
counters: map[string]int{"a": 0, "b": 0},
}
var wg sync.WaitGroup
// increments a named counter in a loop
doIncrement := func(name string, n int) {
for i := 0; i < n; i++ {
c.inc(name)
}
wg.Done()
}
// run several goroutines concurrently
wg.Add(3)
go doIncrement("a", 10000)
go doIncrement("a", 10000)
go doIncrement("b", 10000)
// wait a for the goroutines to finish
wg.Wait()
fmt.Println(c.counters)
}

View File

@ -0,0 +1,90 @@
// built-in synchronization features of goroutines and channels
package main
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
)
// state will be owned by a single goroutine
// this will guarantee that the data is never corrupted with concurrent access
// in order to read or write that state, other goroutines will send messages to the owning goroutine and receive corresponding replies
// these readOp and writeOp structs encapsulate those requests and a way for the owning goroutine to respond
type readOp struct {
key int
resp chan int
}
type writeOp struct {
key int
val int
resp chan bool
}
func main() {
// count how many operations to perform
var readOps uint64
var writeOps uint64
// reads and writes channels will be used by other goroutines
reads := make(chan readOp)
writes := make(chan writeOp)
// goroutine that owns the state, which is a map as in the previous example but now private to the stateful goroutine
// this goroutine repeatedly selects on the reads and writes channels, responding to requests as they arrive
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
write.resp <- true
}
}
}()
// starts 100 goroutines to issue reads to the state-owning goroutine via the reads channel
for r := 0; r < 100; r++ {
go func() {
for {
// each read requires constructing a readOp, sending it over the reads channel, and the receiving the result over the provided resp channel
read := readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
// start 10 writes as well, using a similar approach
for w := 0; w < 10; w++ {
go func() {
for {
write := writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
// let the goroutines work for a second
time.Sleep(time.Second)
// capture and report the op counts
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
}