perri.to: A mashup of things

Canales Waitgroups Y Cancelacion

  2019-10-15


Introducción

Nota: Los materiales de ejemplo de este artículo se pueden encontrar aqui. Tener en cuenta que la mayoría de las decisiones tomadas a la hora de escribir este código fueron influenciadas por la necesidad de mostrar como hacer algo así que quizas no sean las mas acertadas y ciertamente este código no dee ser usado en producción.

Uno de los grandes atractivos de go es la concurrencia (no confundir con paralelismo, algunos buenos ejemplos).

Go nos brinda herramientas varias para trabajar con concurrencias, en este tutorial veremos como hacer uso de canales, waitgroups y contextos con cancelación que nos permitiran respectivamente comunicarnos entre gorutinas, esperar finalización de porciones concurrentes y señalar el fin de procesos externos a gorutinas.

Ejercicio de ejemplo

Como ejemplo de código, haremos una pequeña herramienta que nos permitirá hacer búsquedas de cadenas de texto arbitrarias en los diferentes sitios regionales de Mercado Libre y comparar resultados del item mas caro entre los varios paises. Es claro que la utilidad es nula pero nos permite construir sobre lo hecho en el post sobre APIs y JSON. (No, Mercado Libre no participa ni condona nada de lo que hacemos, es simplemente un uso de su API abierta que parece estar dentro de lo que permiten los términos de uso).

Conocimientos Previos.

No es necesario pero si util leer en nuestra publicación preferida sobre los temas de concurrencia y paralelismo, mas arriba en este mismo artículo hay algunos enlaces interesantes que proveen suficiente información.

Concurrencia

Para hacer uso de las herramientas antes mencionadas, realizaremos el ejercicio con la siguiente arquitectura.

main()  --> |--> busquedaSitioML(Pais1) -->| --> main()
            |    \_ cotizacion()_/         |
            |--> busquedaSitioML(Pais2) -->|
            |    \_ cotizacion()_/         |
            |--> busquedaSitioML(Pais3) -->|
            |    \_ cotizacion()_/         |
            |--> busquedaSitioML(Pais4) -->|
            |    \_ cotizacion()_/         |

Donde main() será nuestra rutina principal que invocará a nuestra rutina de búsqueda en Mercado Libre por cada pais concurrentement y a su vez cada una de estas gorutinas hara su búsqueda concurrentemente a la búsqueda de la cotización de la moneda de dicho pais contra el Dolar Estado Unidense (como moneda de comparación entre los varios paises)

No se preocupen si esto no tiene mucho sentido, al final del ejercicio todo será mas claro.

Precondiciones

Vamos a necesitar como pre-condiciones dos cosas:

  • El texto arbitrario de búsqueda
  • La lista de sitios oficiales de Mercado Libre

El texto

Obtendremos el texto de los argumentos de la línea de comandos. El paquete incluido os tiene un miembro Args que es un slice de string ([]string) que contiene todos los elementos pertinentes a la llamada del comando que estamos ejecutando, por ejemplo: para ./ejecutable un parametro detras de otro tendrá []string{"ejecutable", "un", "parametro", "detras", "de", "otro"} vemos que cada elemento es uno de los argumentos pasados al comando (la separación es el espacio) tambien podemos observar que el primero es el ejecutable en sí, por ende para obtener el texto arbitrario tomaremos todos los siguientes con os.Args[1:] que nos devolverá un slice con todos menos el primer elemento o si se prefiere, desde el elemento 1 en adelante.

// asignemos los términos de búsqueda a un string uniendo los parametros de la linea
// de comando con el string espacio
searchTerms := strings.Join(os.Args[1:], " ")

La lista de sitios

Mercado Libre provee un endpoint para que hagamos un pedido de la lista de sitios, se encuentra en https://api.mercadolibre.com/sites y para obtenerlos usaremos la función fetchSites() creada para este propósito en el código de ejemplo, no desarollaremos sobre esta porque es una aplicación del anterior tutorial basta con mencionar que nos devolverá la información de los sitios disponibles en forma de un slice del siguiente struct

// mlSite imita la estructura JSON que devuelve la búsqueda de Sites de Mercado Libre
// en este caso, de un solo site pero la búsqueda devuelve varios
type mlSite struct {
	DefaultCurrencyID string `json:"default_currency_id"`
	ID                string `json:"id"`
	Name              string `json:"name"`
}

Ya tenemos nuestro texto arbitrario de búsqueda y nuestra lista de sitios de Mercado Libre.

Vamos a prepararnos para recibir los resultados creando un slice de un struct que creamos para estos efectos:

// siteSearchResult contiene un resultado de búsqueda, es para uso interno, lo utilizaremos
// para enviar resultados de la gorutina a la rutina principal, contiene todo lo relevante
// que la rutina podria devolver, incluyendo un error por si esta fallara.
type siteSearchResult struct {
	site     mlSite
	price    decimal.Decimal
	priceUSD decimal.Decimal
	ratio    decimal.Decimal
	item     string
	err      error
}

dado que sabemos cuantos resultados habrá (tantos como sitios) pre-alocaremos el slice con el tamaño justo:

	// Hacemos una lista que contendrá los resultados de las búsquedas.
	results := make([]siteSearchResult, 0, len(sites))

Notese que el largo del mismo es 0 y la capacidad es el largo de sites, de este modo podremos asignar nuevos elementos con append pero en ningun momento se re-alocará el Array que hay detras del slice porque la capacidad dada al inicio es justa.

WaitGroups, señalización entre gorutinas.

Antes de lanzarnos en la creación de gorutinas, vamos a crear los mecanismos necesarios para saber cuando estas terminaron, hay mas de una forma de lograr esto pero me prece que la mas limpia es utilizando WaitGroups (si quieren investigar alguna otra, por ejemplo, pueden buscar sobre cierre de canales).

Los WaitGroups son una herramienta de sincronización entre gorutinas (si bien uno las puede usar libremente en otros contextos, no tienen mucho sentido)

La forma en que estos funcionan es a traves de la creación de un objeto que es seguro para compartir entre rutinas (no todos los objetos modificables los son ya que varias rutinas podrian competir por una asignación o una lectura y llevar a condiciones de carrera)

El WaitGroup tiene tres métodos a traves de los cuales lograremos sincronizar las gorutinas.

  • Add(int): puede ser invocado tantas veces como sea necesario, los enteros pasados se sumaran a un total, nos dice cuantas gorutinas o ejecuciones esperaremos.
  • Done(): será invocado por cada rutina o proceso que estamos esperando cuando crea pertinente avisar que lo que se esperaba ya sucedió.
  • Wait(): Bloqueará la ejecución hasta que Done haya sido invocado tantas veces como el total de enteros de Add, se puede invocar Add en cualquier momento y Done se puede invocar siempre que el contador de Add sea positivo, si disminuye de 0 causará un panico.

Nota: deberá ser un puntero.

	// creamos los WaitGroups para cada una de las go-rutinas que buscará.
	wg := &sync.WaitGroup{}
	wg.Add(len(sites))

Luego, dentro de la gorutina llamaremos, al principio de la misma:

func queryForSite(searchCriteria string, site mlSite,
	callerWaiting *sync.WaitGroup, result chan siteSearchResult) {
	// lo primero que haremos es encolar la llamada a Done, del wait group, así cuando
	// esta función salga, sin importar el resultado se avisará que terminó a quien esté
	// esperando.
	defer callerWaiting.Done()

la llamada es diferida hasta el final de la ejecución de la función pra asegurarnos que no indicaremos que estamos listos antes de tiempo, así pues defer se llamará luego del final de la función.

La función llamante, a su vez, tendrá una llamada bloqueante en el punto en que su ejecución no pueda continuar sin los resultados de esta gorutina o bien cuando haya terminado todo lo que tiene que hacer y este lista para retornar (si la rutina principal sale el programa terminará y se interrumpirá la ejecución de las gorutinas)

	// esperamos el wait group de todas las gorutinas de búsqueda, que no terminarán hasta
	// que la funcion de procesamiento haya leido su resultado.
	wg.Wait()

Canales, enviando datos entre gorutinas.

Uno de los problemas de la concurrencia es la necesidad de enviar datos entre las gorutinas varias. Podríamos utilizar complejas combinaciones de variables y mutexes, como la comunicación entre threads de varios lenguajes pero Go nos provee una forma nativa de hacer esto, los canales. Los canales son una especie de “tubo” que va entre rutinas y nos permite enviar datos, son tipadas como una variable así que a los fines practicos funciona como una asignación. Pueden ser con o sin búfer, o si se prefiere, tener un búfer de 0 a mas lugares. Lo que indicará el tamaño del búfer es cuantos elementos podremos asignar al canal antes de que el mismo bloquee, en este punto asignar al canal bloqueará hasta que se lean elementos, en este sentido funciona como una pila FIFO.

Crearemos para nuestro ejemplo un canal de tipo siteSearchResult.

	// creamos un canal, sin buffer, para los resultados.
	resultChannel := make(chan siteSearchResult)

Dada la naturaleza de la gorutina, no podemos retornar valores de la forma tradicional ya que no hay un receptor del otro lado de la asignación esperando ni una rutina bloqueada. A la hora de retornar utilizaremos el canal que creamos anteriormente y pasamos como parámetro durante la creación de la gorutina. El struct que creamos para este proposito y cuyo tipo le asignamos al canal será el transporte a la hora de hacer esto (notar que tiene campos para error y para éxito) de este modo enviaremos uno de esto por el canal en cada punto de salida de la función, ya sea por error:

	// si fallamos retornamos enseguida.
	if err != nil {
		result <- siteSearchResult{
			site: site,
			err:  err,
		}
		return
	}

como por éxito

	result <- siteSearchResult{
		site:     site,
		priceUSD: priceUSD,
		price:    price,
		item:     resultML.Results[0].Title,
		ratio:    currencyRatio,
	}

Dado que no queremos indicar que este proceso se completó hasta que los datos hayan sido consumidos, el canal es sin búfer y el envio bloqueante al canal esta en el código sin ninguna provisión para cancelación, una vez que este item sea consumido la asignación dejará de bloquear y la función prodecerá y terminará llamando eventualmente al Done() del wait group, asegurando consistencia.

Alternativamente, como veremos a continuación, podemos utilizar select para tener alternativas al bloqueo del canal, por ejemplo tiempos máximos de espera o incluso cancelación externa.

Para consumir los resultados, utilizaremos también una gorutina que se comporta de una manera ligeramente distinta:

El consumo de un canal, al igual que la asignación, es bloqueante. Un patrón común de uso para consumir canales que tienen resultados de otras gorutinas es utilizando un loop infinito for {} y consumir el canal dentro de un select que actua como un switch para canales, donde hay varios casos, uno por cada canal con el que queremos interactuar y se dará lugar al primero que se desbloquee, aquí un ejemplo de la función de consumo:

		for {
			select {
			case r := <-resultChannel:
				if r.err != nil {
					fmt.Printf("Site %q failed %v\n", r.site.Name, r.err)
					break
				}
				results = append(results, r)
			case <-ctx.Done():
				waitResultFetch.Done()
				return
			}
		}

En el ejemplo, el select hace dos operaciones bloqueantes con canales, la primera de ellas intenta asignar lo que salga del canal de resultados resultChannel a r, sabemos que será nuestro struct de resultado y en base a lo que sea (error o éxito) hara lo pertinente. El segundo caso simplemente consume de un canal ctx.Done() (no devuelve nada, simplemente un struct vacio) cuya función es bloquear hasta que una fuente externa indique que debe seguir, de este modo nunca tendremos un bloqueo infinito ya que podemos indicarle a la rutina que termine incluso i ya nada viene de resultChannel otros usos son, por ejemplo, la función After(tiempo) de la libreria incluida time que devuelve un canal que al cabo de tiempo devuelve un struct vacio.

Contexto con cancelacion

El context.Context es un elemento algo controversial, hay opiniones algo divididas sobre como usarlo, para que usarlo y si usarlo del todo. Al ser bastante versatil es facil abusar del mismo pero dejaremos estos detalles a criterio del lector ya que es material para un post entero.

En este caso utilizaremos el contexto con Cancelación, los contextos son objetos que vamos envolviendo con diferentes atributos, tipicamente los modificadores se llaman With* y son funciones que toman un contexto como entrada y devuelven otro que contempla el pasado y ha agregado cosas.

El contexto con cancelación, que se obtiene pasando un contexto existente (o context.Background si aun no tenemos uno) a context.WithCancel() que nos devuelve el nuevo contexto y una función que, al invocarla, cancelará el contexto.

	// Hacemos un contexto cancelable para indicar cuando estemos listos
	// para salir de la función de procesamiento de resultados.
	ctx, done := context.WithCancel(context.Background())

Podemos pasar el elemento ctx y el receptor o receptores pueden invocar ctx.Done() que bloqueará hasta que alguien ejecute done() tambien devuelve un canal, de modo tal que se pueda usar en select.

		select {
			case <-ctx.Done():
				//... tareas de cierre ...//
				return
		}

Gorutinas

Finalmente, una pequeña reseña sobre las gorutinas y como las usamos en este ejercicio. Iteraremos sobre todos los sitios e instanciaremos una gorutina por cada uno (esto se logra anteponiendo la palabra clave go al llamado de la función) y le pasamos el WaitGroup y el chan a cada una.

	// instanciamos una gorutina por cada sitio de Mercado Libre
	for i := range sites {
		go queryForSite(searchTerms, sites[i], wg, resultChannel)
	}

Código terminado

Luego de poner en práctica todo lo explicado, veremos un ejemplo de como terminó nuestro código en las partes relevantes, pueden encontrar todo el código con comentarios y explicaciones en el repositorio

El main

func main() {
	// Obtenemos de los argumentos de linea de comandos el criterio de búsqueda.
	searchTerms := strings.Join(os.Args[1:], " ")

	// obtenemos de mercado libre los sitios internacionales
	sites, err := fetchSites()
	if err != nil {
		log.Fatalf("could not obtain mercado libre sites: %v", err)
	}

	// Hacemos una lista que contendrá los resultados de las búsquedas.
	results := make([]siteSearchResult, 0, len(sites))

	// creamos los WaitGroups para cada una de las go-rutinas que buscará.
	wg := &sync.WaitGroup{}
	wg.Add(len(sites))

	// creamos un canal, sin buffer, para los resultados.
	resultChannel := make(chan siteSearchResult)

	// instanciamos una gorutina por cada sitio de Mercado Libre
	for i := range sites {
		go queryForSite(searchTerms, sites[i], wg, resultChannel)
	}

	// creamos un WaitGroup para esperar la gorutina que procesa los resultados.
	waitResultFetch := &sync.WaitGroup{}
	waitResultFetch.Add(1)

	// Hacemos un contexto cancelable para indicar cuando estemos listos
	// para salir de la función de procesamiento de resultados.
	ctx, done := context.WithCancel(context.Background())

	// invocamos la función anónima de procesamiento de resultados pasando
	// el contexto como parámetro, notar el shadowing.
	go func(ctx context.Context) {
		for {
			select {
			case r := <-resultChannel:
				if r.err != nil {
					fmt.Printf("Site %q failed %v\n", r.site.Name, r.err)
					break
				}
				results = append(results, r)
			case <-ctx.Done():
				waitResultFetch.Done()
				return
			}
		}
	}(ctx)

	// esperamos el wait group de todas las gorutinas de búsqueda, que no terminarán hasta
	// que la funcion de procesamiento haya leido su resultado.
	wg.Wait()

	// indicamos a la función de procesamiento que ya no queda nada por procesar
	done()

	// esperamos que la función de procesamiento termine.
	waitResultFetch.Wait()

	// imprimimos los resultados
	for _, v := range results {
		fmt.Printf("Comprar %q en %q cuesta USD %s (son %s %s a cambio %s):\n",
			searchTerms, v.site.Name, v.priceUSD.StringFixedBank(2), v.site.DefaultCurrencyID, v.price.StringFixedBank(2), v.ratio)
		fmt.Printf("--> Publicado como %q\n", v.item)
	}
}

La función de búsqueda de sitios

// queryForSite hara un pedido de búsqueda y devolverá el resultado mas caro para un site
// determinado de Mercado Libre. El resultado se devolverá en Dólares EstadoUnidenses si es
// posible por una cuestión de uniformidad de los resultados (ademas de la moneda de origen)
// esta pensado para ser llamado dentro de una gorutina, concurrentemente con otros sites.
func queryForSite(searchCriteria string, site mlSite,
	callerWaiting *sync.WaitGroup, result chan siteSearchResult) {
	// lo primero que haremos es encolar la llamada a Done, del wait group, así cuando
	// esta función salga, sin importar el resultado se avisará que terminó a quien esté
	// esperando.
	defer callerWaiting.Done()

	// creamos un wait group para la gorutina que obtendrá la cotización.
	currencyWait := &sync.WaitGroup{}
	currencyWait.Add(1)
	// como la gorutina es una función anónima dentro de esta, podemos compartir variables
	// para facilitar
	var currencyRatio decimal.Decimal
	var currencyError error

	// llamamos concurrentemente a la función de búsqueda de cotización, cuando termine
	// lo indicará al wait group.
	go func() {
		defer currencyWait.Done()
		currencyRatio, currencyError = fetchCurrencyRate(site.DefaultCurrencyID)
	}()

	// realizamos la función principal de esta función, buscar el item mas caro
	body, err := queryML(searchCriteria, site)
	// si fallamos retornamos enseguida.
	if err != nil {
		result <- siteSearchResult{
			site: site,
			err:  err,
		}
		return
	}

	// leemos el cuerpo de la respuesa
	bodyData, err := ioutil.ReadAll(body)
		// si fallamos retornamos enseguida.
	if err != nil {
		result <- siteSearchResult{
			site: site,
			err:  fmt.Errorf("reading mercado libre response body: %v", err),
		}
		return
	}

	// de-serializamos el cuerpo en un ResultadosML
	resultML := &ResultadosML{}
	err = json.Unmarshal(bodyData, &resultML)
			// si fallamos retornamos enseguida.
	if err != nil {
		result <- siteSearchResult{
			site: site,
			err:  fmt.Errorf("unmarshaling mercado libre response body: %v", err),
		}
		return
	}
			// si no encontramos resultados retornamos enseguida.
	if len(resultML.Results) == 0 {
		result <- siteSearchResult{
			site: site,
			err:  fmt.Errorf("results not found in response"),
		}
		return
	}

	// esperamos a la función de cotización para poder hacer la conversión de moneda.
	currencyWait.Wait()
	// si la función de cotización falló, retornaremos enseguida
	if currencyError != nil {
		result <- siteSearchResult{
			site: site,
			err:  fmt.Errorf("getting currency ratio: %v", currencyError),
		}
		return
	}

	// Algunos prints útiles para entender la función y como se ejecuta.
	//fmt.Println(site.Name)
	//fmt.Println(resultML.Results[0].Title)
	//fmt.Println(resultML.Results[0].Permalink)
	mlResult := resultML.Results[0]
	var price, priceUSD decimal.Decimal
	// si el precio esta en Dólares EstadoUnidenses originalmente agregaremos la otra
	// cotización dividiendo el precio en USD / cotización
	// de lo contrario multiplicaremos el precio en moneda de origen por cotización para
	// rellenar el precio en USD.
	if mlResult.CurrencyID == usdCurrencyCode {
		priceUSD = mlResult.GetPrice()
		price = priceUSD.Div(currencyRatio)
	} else {
		price = mlResult.GetPrice()
		priceUSD = price.Mul(currencyRatio)
	}

	// enviamos el struct que contiene el resultado por el canal de resultados.
	result <- siteSearchResult{
		site:     site,
		priceUSD: priceUSD,
		price:    price,
		item:     resultML.Results[0].Title,
		ratio:    currencyRatio,
	}
}
comments powered by Disqus