Concurrent Programming with Go

Concurrent Programming with Go

By Mark Summerfield

Overload, 19(106):25-28, December 2011


Concurrency is becoming ever more important. Mark Summerfield looks at the approach of the new language Go.

The Go programming language is in some respects a radical departure from existing compiled languages. Its syntax, although C-ish, is much cleaner and simpler than C or C++’s, and it supports object-orientation through embedding (delegation) and aggregation, rather than by using inheritance. Go has a built-in garbage collector so we never have to worry about deleting/freeing memory – something that can be fiendishly complicated in a multithreaded context. In this article we will focus on another area where Go breaks new ground (at least, compared with other mainstream programming languages): concurrency.

Go has the usual concurrency primitives, such as mutexes, read–write mutexes, and wait conditions, as well as low-level primitives such as atomic adds, loads, and compare and swaps. But Go programmers are encouraged to avoid using any of these and instead to use Go’s high-level goroutines and channels .

A goroutine is a very lightweight thread of execution that shares the same address space as the rest of the program. The gc compiler multiplexes one or more goroutines per operating system thread and can realistically support hundreds, thousands, or more goroutines.

A channel is a two-way (or one-way, at our option) communications pipeline. Channels are type safe, and when they are used to pass immutable values ( bool s, int s, float64 s, string s, and struct s composed of immutable values), they can be used in multiple goroutines without formality. When it comes to passing pointers or references, we must, of course, ensure that our accesses are synchronized.

Incidentally, goroutines and channels are an implentation of a form of CSP (Communicating Sequential Processes), based on the ideas of computer scientist C. A. R. Hoare.

Go’s mantra for concurrency is:

Do not communicate by sharing memory;instead, share memory by communicating.

In this article we will review a simple concurrent program called headcheck , that, given a list of URLs, performs an HTTP HEAD request on each one and reports its results. We will look at a few different ways the program can implement concurrency using Go’s goroutines and channels, to give a flavour of the possibilities.

Listing 1 shows the struct s the program will operate on. We made Job a struct because this is syntactically more convenient when giving it methods.

type Result struct {
  url          string
  status       int
  lastModified string
}
type Job struct {
  url string
}
			
Listing 1

Listing 2 shows the main() function. The built-in make() command is used to create channels (as well as values of the built in map and slice collection types).

func main() {
  jobs := make(chan Job, nWorkers * 2)
  results := make(chan Result, bufferSize)
  done := make(chan bool, nWorkers)

  go addJobs(jobs)
  for i := 0; i < nWorkers; i++ {
    go doJobs(jobs, results, done)
  }
  go wait(results, done)
  process(results)
}
			
Listing 2

In Listing 2, both nWorkers and bufferSize are constants (6 and 24; not shown).

The main() function begins by creating three channels, one for passing jobs to worker goroutines, one for receiving all the results, and another to keep track of when each worker goroutine has finished.

By default channels are unbuffered (their size is 0) which means that a receive will block until there is a send and a send will block if there’s a sent item that hasn’t been received. By buffering we allow a channel to accept as many sends as the size of the buffer, before sends are blocked. Similarly, we can do as many receives as there are items in the buffer, only blocking when the buffer is empty. The purpose of buffering is to improve throughput by minimizing the time goroutines spend being blocked.

In this example we have buffered all the channels by giving make() a second buffer-size argument. We have made the jobs channel large enough to accept (an average of) two jobs per worker goroutine and made the results channel big enough to accept plenty of results without blocking the workers. The done channel’s buffer’s size is the same as the number of workers since, as we will see, each worker sends to that channel exactly once.

To execute code in a separate goroutine we use the go keyword. This keyword must be followed by a function call (which could be a call on a function literal – which is also a closure). The go statement completes ‘immediately’ and the called function is executed in a newly created goroutine. When the function finishes the Go runtime system automatically gets rid of its goroutine and reclaims the memory it used.

Here, the main() function executes the addJobs() function in its own separate goroutine, so execution continues immediately to the for loop. In the for loop six separate goroutines are created, each one executing an instance of the doJobs() function. All the newly created goroutines share the same jobs channel and the same results channel. The for loop completes as soon as the goroutines have been created and started and then another function is called, wait() , again in its own separate goroutine. And finally, we call the process() function in the current ( main ) goroutine.

Figure 1 schematically illustrates the relationships between the program’s goroutines and channels.

Figure 1

Once the main goroutine has finished, the program will terminate – even if there are other goroutines still executing. So, we must ensure that all the other goroutines finish their work before we leave main() .

The addJobs() function is used to populate the jobs channel and is shown in Listing 3, but with the code for reading in the URLs elided.

func addJobs(jobs chan Job) {
  reader := bufio.NewReader(os.Stdin)
  for {
    ... // Read in a URL
    url = strings.TrimSpace(url)
    if len(url) > 0 {
      jobs <- Job{url}
    }
  }
  close(jobs)
}
			
Listing 3

Each job simply consists of a URL to check. URLs are read from os.Stdin (e.g., by using redirection on the command line). At each iteration we read both a line and an error value; if the error is io.EOF we have finished and break out of the for loop. (All of this has been elided.)

Once all the jobs have been added the jobs channel is closed to signify that there are no more jobs to be added. Sending to a channel is done using the syntax channel <- item . Items can be received from a channel that is non-empty, even if it is closed, so no jobs will be lost. When the addJobs() function has finished the Go runtime system will take care of removing the goroutine in which it ran and reclaiming its memory.

The doJobs() function is shown in Listing 4. It is simple because it passes all its work on to a method of the Job type (not shown). The Job.Do() method sends one result of type Result to the results channel using the statement results <- result .

func doJobs(jobs chan Job,
  results chan Result, done chan bool) {
  for job := range jobs {
      job.Do(results)
  }
  done <- true
}
			
Listing 4

Go’s for ... range statement can iterate over maps (data dictionaries like C++11’s unordered_map ), slices (in effect, variable length arrays), and channels. If the channel has an item it is received and assigned to the for loop’s variable (here, job ); if the channel has no item but isn’t closed the loop blocks . Of course, this does not hold up the rest of the program, only the goroutine in which the loop is executing is blocked. The loop terminates when the channel is empty and closed.

Once all the jobs are done the function sends a bool to the done channel. Whether true or false is sent doesn’t matter, since the done channel is used purely to keep the program alive until all the jobs are done.

The headcheck program has one goroutine adding jobs to the jobs channel and six goroutines reading and processing jobs from the same channel, all of them working concurrently. Yet, we don’t have to worry about locking – Go handles all the synchronization for us.

Listing 5 shows the wait() function which was executed by main() in its own goroutine. This function has a regular for loop that iterates for as many worker goroutines as were created, and at each iteration it does a blocking receive using the syntax item <- channel . Notice that it doesn’t matter whether true or false was sent – we only care that something was sent – since we discard the channel’s items. Once all the workers have sent to the done channel we know that there can be no more results added to the results channel, so we close that channel.

func wait(results chan Result,
  done chan bool) {
  for i := 0; i < nWorkers; i++ {
    <-done
  }
  close(results)
}
			
Listing 5

Listing 6 shows the process() function which is executed in the main goroutine. This function iterates over the results channel and blocks if no result is available. The for loop terminates when the results channels is empty and closed, which will only happen when the wait() function finishes. This ensures that this function blocks the main goroutine until every result has been received and output.

func process(results chan result) {
  for result := range results {
    result.Report(os.Stdout)
  }
}
			
Listing 6

We could replace the wait() and process() functions with a single waitAndProcess() function executed in the main goroutine, as Listing 7 illustrates.

func waitAndProcess(results <-chan Result,
  done <-chan struct{}) {
  for working := nWorkers; working > 0; {
    select { // Blocking
    case result := <-results:
      result.Report(os.Stdout)
    case <-done:
      working--
    }
  }
  for {
    select { // Non-blocking
    case result := <-results:
      result.Report(os.Stdout)
    default:
      return
    }
  }
}
			
Listing 7

This function begins with a while loop that iterates so long as there is at least one worker still working. The select statement is structurally like a switch statement, only it works in terms of channel communications. A select with no default case is blocking . So, here, the first select blocks until it receives either a result or an empty struct .

Since we don’t care what’s sent to the done channel, only whether something’s sent, we have defined the channel to be of type chan struct{} . This channel’s value type specifies a struct with no fields; there is only one possible value of such a type and this is specified using struct{}{} which means create a (zero) value of type struct{} . Since such values have no data they are more expressive of our semantics than sending bool s whose value we would then ignore.

After each receive the select is broken out of and the loop condition is checked. This causes the main goroutine to block until all the workers have finished sending their results (because they only send to the done channel after they have finished all their jobs).

It is quite possible that after all the worker goroutines have finished there are still unprocessed results in the results channel (after all, we buffered the channel when we created it). So we execute a second for loop (an infinite loop) that uses a non-blocking select . So long as there are results in the results channel the select will receive each one and finish, and the for loop will iterate again. But once the results channel is empty and closed the default case will be executed, and the function returned from. At this point all the results have been output and the program will terminate.

A pipelining approach

Goroutines and channels are very flexible and versatile, to the extent that we can take some quite different approaches to concurrency. Listing 8 illustrates an alternative headcheck implementation’s main() function.

func main() {
  results := make(chan Result, bufferSize)
  go sink(processImages(processHTML(
    source(results))))
  for result := range results {
    result.Report(os.Stdout)
  }
}
			
Listing 8

Unlike the previous versions, this headcheck program only reports on URLs for HTML files and images, and ignores anything else. We could always add another pipeline component, say, processRemainder() , if we didn’t want to ignore any URLs.

Figure 2 schematically illustrates the relationships between the program’s goroutines and channels.

Figure 2

The function begins by creating a buffered results channel and then it executes a pipeline in a separate goroutine. (And as we will see, each component of the pipeline itself executes in a separate goroutine.) Control immediately passes to the for ... range loop which blocks waiting for results and finishes when the results channel is empty and closed. (The Result type was shown in Listing 1.) The source() function shown in Listing 9 is where the pipeline begins.

func source(results chan Result) (
  <-chan string, chan Result) {
  out := make(chan string, bufferSize)
  go func() {
    reader := bufio.NewReader(os.Stdin)
    for {
      ... // Read in a URL
      out <- url
    }
    close(out)
  }()
  return out, results
}
			
Listing 9

A Go function’s return value is specified after the closing parenthesis that follows the function’s arguments. Multiple values can be returned simply by enclosing them in parentheses. Here we return a receive-only channel and a send–receive channel.

The source() function is passed the results channel, which it simply returns to its caller. All the pipeline functions share the same results channel. The function starts by creating a buffered output channel which is initially bi-directional. It then creates a goroutine to populate the out channel with jobs (in this case URL strings), after which the goroutine closes the channel. By closing the channel, future receivers (e.g., using a for ... range loop) will know when to finish. The go func() { ... }() creates a function literal (which is a closure) and executes it in a separate goroutine. So processing immediately continues to the source() function’s last statement which simply returns the out channel as a receive-only channel thanks to the way the function’s return value is declared, as well as the results channel.

The processHTML() function shown in Listing 10 has the same structure as all the pipeline component functions except for the source() function. It is passed two channels, a receive-only jobs channel (of type chan string ) which it calls in (and which was the previous pipeline component’s out channel), and the shared results channel. The function creates a new buffered bi-directional out channel with the same buffer size as the capacity of the in channel the function has been passed. It then creates a goroutine which executes a new function literal. The goroutine reads jobs from the in channel. This channel is closed by the previous pipeline component when it has been fully populated, so this goroutine’s for ... range loop is guaranteed to terminate. For each job (URL) received, for those that this function can process it performs its processing (in this case a call to a Check() function), and sends the result to the results channel. And those jobs the function isn’t concerned with are simply sent to the (new) out channel. At the end the function returns its out channel and the shared results channel.

func processHTML(in <-chan string,
  results chan Result) (<-chan string,
  chan Result) {
  out := make(chan string, cap(in))
  go func() {
    for url := range in {
      suffix := strings.ToLower(
        filepath.Ext(url))
      if suffix == ".htm" ||
        suffix == ".html" {
        results <- Check(url)
      } else {
        out <- url
      }
    }
    close(out)
  }()
  return out, results
}
			
Listing 10

The processImages() function has the same signature and uses the same logic.

The sink() function takes a receive-only jobs channel and a receive-only results channel. It is shown in Listing 11.

func sink(in <-chan string,
  results chan Result) {
  for _ = range in {
    // Drain unprocessed URLs
  }
  close(results)
}
			
Listing 11

The sink() function is the last one in the pipeline. It iterates over the penultimate pipeline component’s jobs channel, draining it until it is empty. (It might be empty in the first place if every job has been done by one or other pipeline component.)

At the end the function closes the results channel. Closing the results channel is essential to ensure that the for ... range loop in main() terminates. But we must be careful not to close the channel when one or more goroutines might still want to send to it. Looking back at the implementations of the source() and processHTML() functions we can see that each creates its own out channel which it ultimately closes when it has finished processing. The last of these out channels is passed to the sink() function as its in channel – and this channel isn’t closed until all the previous pipeline components have finished reading their in channels and closed their out channels. In view of this we know that once the for ... range loop in the sink() function has finished, all of the pipeline’s preceding components have finished processing and no more results could be sent to the results channel, and hence the channel is safe to close.

The pipeline-based headcheck program ends up with five goroutines: one executing sink() , one started by source() , one started by processHTML() , one started by processImages() , and the main goroutine that main() executes in. All of these goroutines operate concurrently, passing type-safe values via channels, only terminating when their work is done, and with no explicit locks.

Thinking in terms of goroutines and channels is very different from thinking in terms of threads and locks, and may take some getting used to! Also, keep in mind that in this article we have seen just some of the possible ways of using goroutines and channels – many other approaches are possible. Go is a fascinating language, not just for its innovative approach to concurrency, but also for its clean syntax and very different approach to object orientation, and is well worth getting to know.

Further information

The Go Programming Language: http://golang.org/






Your Privacy

By clicking "Accept Non-Essential Cookies" you agree ACCU can store non-essential cookies on your device and disclose information in accordance with our Privacy Policy and Cookie Policy.

Current Setting: Non-Essential Cookies REJECTED


By clicking "Include Third Party Content" you agree ACCU can forward your IP address to third-party sites (such as YouTube) to enhance the information presented on this site, and that third-party sites may store cookies on your device.

Current Setting: Third Party Content EXCLUDED



Settings can be changed at any time from the Cookie Policy page.