diff --git a/5-go-by-example/31-closing-channels.go b/5-go-by-example/31-closing-channels.go new file mode 100644 index 0000000..f5c57f4 --- /dev/null +++ b/5-go-by-example/31-closing-channels.go @@ -0,0 +1,36 @@ +// closing a channel indicates that no more values will be sent on it +package main + +import "fmt" + +func main() { + // jobs channel to communicate work to be done + jobs := make(chan int, 5) + done := make(chan bool) + + // repeatedly receives from jobs + // more value will be false if jobs has been closed and all values in the channel have already been received + go func() { + for { + j, more := <-jobs + if more { + fmt.Println("received job", j) + } else { + fmt.Println("received all jobs") + done <- true + return + } + } + }() + + // sends 3 jobs to the worker over the jobs channel, then closes it + for j := 1; j <= 3; j++ { + jobs <- j + fmt.Println("sent job", j) + } + close(jobs) + fmt.Println("sent all jobs") + + // await the worker using the synchronization approach + <-done +} diff --git a/5-go-by-example/32-range-over-channels.go b/5-go-by-example/32-range-over-channels.go new file mode 100644 index 0000000..8a4983f --- /dev/null +++ b/5-go-by-example/32-range-over-channels.go @@ -0,0 +1,19 @@ +package main + +import "fmt" + +func main() { + // iterate over values received from a channel + + // here iterate over 2 values in the queue channel + queue := make(chan string, 2) + queue <- "one" + queue <- "two" + close(queue) + + // range iterates over each element as it’s received from queue + for elem := range queue { + fmt.Println(elem) + } + // also shows that it’s possible to close a non-empty channel but still have the remaining values be received +} diff --git a/5-go-by-example/33-timers.go b/5-go-by-example/33-timers.go new file mode 100644 index 0000000..deb113c --- /dev/null +++ b/5-go-by-example/33-timers.go @@ -0,0 +1,33 @@ +package main + +import ( + "fmt" + "time" +) + +// built-in timer and ticker features +func main() { + + // timers represent a single event in the future + // tell the timer how long to wait and it provides a channel that will be notified at that time + timer1 := time.NewTimer(2 * time.Second) + + // <-timer1.C blocks on the timer’s channel C until it sends a value indicating that the timer fired + <-timer1.C + fmt.Println("Timer 1 fired") + + // to wait, you could have used time.Sleep + timer2 := time.NewTimer(time.Second) + go func() { + <-timer2.C + fmt.Println("Timer 2 fired") + }() + // one can cancel the timer before it fires + stop2 := timer2.Stop() + if stop2 { + fmt.Println("Timer 2 stopped") + } + + // give the timer2 enough time to fire, if it ever was going to, to show it is in fact stopped + time.Sleep(2 * time.Second) +} diff --git a/5-go-by-example/34-tickers.go b/5-go-by-example/34-tickers.go new file mode 100644 index 0000000..67c0b39 --- /dev/null +++ b/5-go-by-example/34-tickers.go @@ -0,0 +1,31 @@ +// tickers are for when you want to do something repeatedly at regular intervals + +package main + +import ( + "fmt" + "time" +) + +func main() { + // Tickers use a similar mechanism to timers: a channel that is sent values + ticker := time.NewTicker(500 * time.Millisecond) + done := make(chan bool) + + go func() { + for { + select { + case <-done: + return + case t := <-ticker.C: + fmt.Println("Tick at", t) + } + } + }() + + // Tickers can be stopped like timers + time.Sleep(1600 * time.Millisecond) + ticker.Stop() + done <- true + fmt.Println("Ticker stopped") +} diff --git a/5-go-by-example/35-worker-pools.go b/5-go-by-example/35-worker-pools.go new file mode 100644 index 0000000..1c66539 --- /dev/null +++ b/5-go-by-example/35-worker-pools.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "time" +) + +// worker, of which will run several concurrent instances +// those workers will receive work on the jobs channel and send the corresponding results on results +func worker(id int, jobs <-chan int, results chan<- int) { + for j := range jobs { + fmt.Println("worker", id, "started job", j) + time.Sleep(time.Second) + fmt.Println("worker", id, "finished job", j) + results <- j * 2 + } +} + +func main() { + + // send workers work and collect their results + const numJobs = 5 + jobs := make(chan int, numJobs) + results := make(chan int, numJobs) + + // starts up 3 workers, initially blocked because there are no jobs + for w := 1; w <= 3; w++ { + go worker(w, jobs, results) + } + + // send 5 jobs and then close that channel + for j := 1; j <= numJobs; j++ { + jobs <- j + } + close(jobs) + + // collect all the results of the work + for a := 1; a <= numJobs; a++ { + <-results + } +} diff --git a/5-go-by-example/36-wait-group.go b/5-go-by-example/36-wait-group.go new file mode 100644 index 0000000..ed407e4 --- /dev/null +++ b/5-go-by-example/36-wait-group.go @@ -0,0 +1,37 @@ +// wait for multiple goroutines to finish + +package main + +import ( + "fmt" + "sync" + "time" +) + +// function to run in every goroutine +func worker(id int) { + fmt.Printf("Worker %d starting\n", id) + + time.Sleep(time.Second) + fmt.Printf("Worker %d done\n", id) +} + +func main() { + // WaitGroup is used to wait for all the goroutines launched here to finish + var wg sync.WaitGroup + + // Launch several goroutines and increment the WaitGroup counter for each + for i := 1; i <= 5; i++ { + wg.Add(1) + // Avoid re-use of the same i value in each goroutine closure + i := i + // Wrap the worker call in a closure that makes sure to tell the WaitGroup that this worker is done + go func() { + defer wg.Done() + worker(i) + }() + } + // Block until the WaitGroup counter goes back to 0 + wg.Wait() + +} diff --git a/5-go-by-example/37-rate-limiting.go b/5-go-by-example/37-rate-limiting.go new file mode 100644 index 0000000..cbe00e0 --- /dev/null +++ b/5-go-by-example/37-rate-limiting.go @@ -0,0 +1,57 @@ +package main + +import ( + "fmt" + "time" +) + +// controlling resource utilization and maintaining quality of service +func main() { + + // basic rate limiting + // want to limit handling of incoming requests + requests := make(chan int, 5) + for i := 1; i <= 5; i++ { + requests <- i + } + close(requests) + + // this limiter channel will receive a value every 200 milliseconds + // this is the regulator in the rate limiting scheme. + limiter := time.Tick(200 * time.Millisecond) + + // by blocking on a receive from the limiter channel before serving each request, + // limit to 1 request every 200 milliseconds + for req := range requests { + <-limiter + fmt.Println("request", req, time.Now()) + } + + // allow short bursts of requests in rate limiting scheme while preserving the overall rate limit + // accomplish this by buffering the limiter channel + burstyLimiter := make(chan time.Time, 3) + + // fill up the channel to represent allowed bursting + for i := 0; i < 3; i++ { + burstyLimiter <- time.Now() + } + + // every 200 milliseconds try to add a new value to burstyLimiter, up to its limit of 3 + go func() { + for t := range time.Tick(200 * time.Millisecond) { + burstyLimiter <- t + } + }() + + // simulate 5 more incoming requests + // the first 3 of these will benefit from the burst capability of burstyLimiter + burstyRequests := make(chan int, 5) + for i := 1; i <= 5; i++ { + burstyRequests <- i + } + close(burstyRequests) + for req := range burstyRequests { + <-burstyLimiter + fmt.Println("request", req, time.Now()) + } +} diff --git a/5-go-by-example/38-atomic-counters.go b/5-go-by-example/38-atomic-counters.go new file mode 100644 index 0000000..827c696 --- /dev/null +++ b/5-go-by-example/38-atomic-counters.go @@ -0,0 +1,35 @@ +package main + +import ( + "fmt" + "sync" + "sync/atomic" +) + +// atomic counters accessed by multiple goroutines +func main() { + // unsigned integer to represent our (always-positive) counter + var ops uint64 + + // WaitGroup will help wait for all goroutines to finish their work + var wg sync.WaitGroup + + // start 50 goroutines that each increment the counter exactly 1000 times + for i := 0; i < 50; i++ { + wg.Add(1) + + go func() { + for c := 0; c < 1000; c++ { + // atomically increment the counter, giving it the memory address of ops counter with the & syntax + atomic.AddUint64(&ops, 1) + } + wg.Done() + }() + } + + // wait until all the goroutines are done + wg.Wait() + + // it’s safe to access ops now because it's known no other goroutine is writing to it + fmt.Println("ops:", ops) +} diff --git a/5-go-by-example/39-mutexes.go b/5-go-by-example/39-mutexes.go new file mode 100644 index 0000000..27e4d82 --- /dev/null +++ b/5-go-by-example/39-mutexes.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "sync" +) + +// holds a map of counters +// add a Mutex to synchronize access +type Container struct { + mu sync.Mutex + counters map[string]int +} + +func (c *Container) inc(name string) { + // lock the mutex before accessing counters + c.mu.Lock() + // unlock it at the end of the function using defer + defer c.mu.Unlock() + c.counters[name]++ +} + +// mutex to safely access data across multiple goroutines +func main() { + c := Container{ + // the zero value of a mutex is usable as-is + counters: map[string]int{"a": 0, "b": 0}, + } + + var wg sync.WaitGroup + + // increments a named counter in a loop + doIncrement := func(name string, n int) { + for i := 0; i < n; i++ { + c.inc(name) + } + wg.Done() + } + + // run several goroutines concurrently + wg.Add(3) + go doIncrement("a", 10000) + go doIncrement("a", 10000) + go doIncrement("b", 10000) + + // wait a for the goroutines to finish + wg.Wait() + fmt.Println(c.counters) +} diff --git a/5-go-by-example/40-stateful-goroutines.go b/5-go-by-example/40-stateful-goroutines.go new file mode 100644 index 0000000..7e77a37 --- /dev/null +++ b/5-go-by-example/40-stateful-goroutines.go @@ -0,0 +1,90 @@ +// built-in synchronization features of goroutines and channels + +package main + +import ( + "fmt" + "math/rand" + "sync/atomic" + "time" +) + +// state will be owned by a single goroutine +// this will guarantee that the data is never corrupted with concurrent access +// in order to read or write that state, other goroutines will send messages to the owning goroutine and receive corresponding replies +// these readOp and writeOp structs encapsulate those requests and a way for the owning goroutine to respond +type readOp struct { + key int + resp chan int +} +type writeOp struct { + key int + val int + resp chan bool +} + +func main() { + // count how many operations to perform + var readOps uint64 + var writeOps uint64 + + // reads and writes channels will be used by other goroutines + reads := make(chan readOp) + writes := make(chan writeOp) + + // goroutine that owns the state, which is a map as in the previous example but now private to the stateful goroutine + // this goroutine repeatedly selects on the reads and writes channels, responding to requests as they arrive + go func() { + var state = make(map[int]int) + for { + select { + case read := <-reads: + read.resp <- state[read.key] + case write := <-writes: + state[write.key] = write.val + write.resp <- true + } + } + }() + + // starts 100 goroutines to issue reads to the state-owning goroutine via the reads channel + for r := 0; r < 100; r++ { + go func() { + for { + // each read requires constructing a readOp, sending it over the reads channel, and the receiving the result over the provided resp channel + read := readOp{ + key: rand.Intn(5), + resp: make(chan int)} + reads <- read + <-read.resp + atomic.AddUint64(&readOps, 1) + time.Sleep(time.Millisecond) + } + }() + } + + // start 10 writes as well, using a similar approach + for w := 0; w < 10; w++ { + go func() { + for { + write := writeOp{ + key: rand.Intn(5), + val: rand.Intn(100), + resp: make(chan bool)} + writes <- write + <-write.resp + atomic.AddUint64(&writeOps, 1) + time.Sleep(time.Millisecond) + } + }() + } + + // let the goroutines work for a second + time.Sleep(time.Second) + + // capture and report the op counts + readOpsFinal := atomic.LoadUint64(&readOps) + fmt.Println("readOps:", readOpsFinal) + writeOpsFinal := atomic.LoadUint64(&writeOps) + fmt.Println("writeOps:", writeOpsFinal) +}