Google Cloud, tareas de larga duración y Compute Engine

Google Cloud, tareas de larga duración y Compute Engine

Recientemente, trabajé en un proyecto que requería la implementación de un servicio de ETL que sobrepasaba los límites de algunos servicios serverless de Google Cloud, como Cloud Run y Cloud Functions. Aunque Google Cloud ofrece herramientas diseñadas para estas tareas, otro punto que se buscaba mantener eran los costos bajos. Por tanto Dataflow, Dataproc o cualquiera de estos servicios orientados a pipelines de datos incrementaron la facturación a un punto donde no era viable.

Esto me llevó a iterar diferentes arquitecturas que cumplieran con todos los requerimientos y sin las limitantes que otros servicios imponen, como por ejemplo timeouts de 30 minutos a 1 hora o la persistencia de datos alcanzada integrando más componentes a la arquitectura (y, por tanto, un grado mayor de mantenimiento) de ciertos servicios stateless.

Después de analizarlo, decidí implementar una arquitectura a través de Compute Engine, pero con la variante de que ejecutaría el contenedor del servicio en vez de un servidor tradicional, junto con otros componentes que, al trabajar juntos, permitieran la ejecución del servicio sin ninguna limitación, así como desechar todos los recursos al terminar.

En este artículo, te enseñaré cómo implementar esta arquitectura para ejecutar un simple programa scraper, cabe notar que este es un ejemplo, si realmente consideras esta arquitectura para tu proyecto deberías hacerlo por las siguientes razones:

  • El tiempo de ejecución sobrepasa los límites de otros servicios.
  • Mantener los costos bajos.
  • Reducir el número de dependencias de tu infraestructura.

Requisitos para realizar este proyecto

  • Conocimiento intermedio de Python y Virtualenv.
  • Conocimiento de Git y GitHub.
  • Conocimiento de Google Cloud.
  • Un proyecto en Google Cloud con billing habilitado o con créditos vigentes.
  • Una cuenta de servicio con las siguientes funciones:

    -Administrador de Compute.
    -Escritor de registros.
    - Invocador de flujos de trabajo.
    - Usuario de cuenta de servicio.
    - Administrador de almacenamiento.
  • Generar una llave para la cuenta de servicio (si vas a ejecutar esto en local).
  • Instalar el SDK de Google Cloud y configurar el proyecto en local utilizando el comando gcloud init.

Este sería el diagrama de arquitectura considerado:


Antes de hablar de infraestructura, primero necesitamos definir y ejecutar el servicio encargado de la(s) tarea(s) de larga duración. Para este artículo, escribí un scraper en Python, pero dado que vamos a usar Docker, podrías usar el lenguaje de tu preferencia. Ya que la arquitectura no depende de un SO o lenguaje, solo necesitas un ambiente donde montar la imagen y correr el contenedor.

Revisemos ahora la estructura del proyecto a implementar, no me detendré mucho a explicar el código ya que este artículo está más enfocado en la parte de infraestructura. Sin embargo, a futuro podría hacer otro donde examinemos el código con más detalle.

Acá te dejo el link al repositorio donde explico los pasos para configurar este servicio.

Vayamos a la acción


Esta será la estructura del proyecto:

scraper/

scrape.py

Dockerfile

requirements.txt

start_vm.yml

config.json

1) Clona el repositorio y crea tu virtualenv

git clone https://github.com/luigicfh/scraper.git

2) Crea el virtualenv

virtualenv venv

3) Activa el virtualenv

source venv/bin/activate

4) Instala las dependencias

pip install -r requirements.txt

5) Examinemos el archivo scrape.py

from bs4 import BeautifulSoup, element

import requests

import json

from google.cloud import compute_v1, storage

import google.cloud.logging

from google.cloud.exceptions import NotFound

import logging

import traceback

import os

import datetime

BASE_URL = 'https://books.toscrape.com'

PAGE_URL = BASE_URL + '/catalogue/page-{}.html'

LOG_MSG = "Scraper logs: "

def setup_env_variables() -> None:

with open('config.json') as file:

to_dict = json.loads(file.read())

os.environ['PROJECT_ID'] = to_dict['project_id']

os.environ['ZONE'] = to_dict['zone']

os.environ['INSTANCE_NAME'] = to_dict['instance_name']

os.environ['BUCKET'] = to_dict['bucket']

os.environ['FOLDER'] = to_dict['folder']

def get_products() -> list[element.ResultSet]:

product_list = []

page = 1

while True:

response = requests.get(url=PAGE_URL.format(page))

if response.status_code != 200:

break

soup = BeautifulSoup(response.content, 'html.parser')

products = soup.find_all('article', {'class': 'product_pod'})

product_list.append(products)

page += 1

return product_list

def setup_logging() -> None:

logging_client = google.cloud.logging.Client()

logging_client.setup_logging()

def delete_instance() -> None:

gce_client = compute_v1.InstancesClient()

try:

gce_client.delete(

project=os.environ.get('PROJECT_ID'),

zone=os.environ.get('ZONE'),

instance=os.environ.get('INSTANCE_NAME')

)

except NotFound:

pass

def upload_to_gcs(data: str) -> None:

gcs_client = storage.Client()

bucket = gcs_client.get_bucket(os.environ.get("BUCKET"))

blob = bucket.blob(f"{os.environ.get('FOLDER')}/results.json")

blob.upload_from_string(data=data, content_type="application/json")

def handle_exception() -> None:

logging.error(LOG_MSG + traceback.format_exc())

delete_instance()

def to_list(products: list[element.ResultSet]) -> list[dict]:

d = {}

data = []

for product in products:

d['book_title'] = product[0].find('h3').find('a')['title']

d['image'] = BASE_URL + product[0].find(attrs={'class': 'image_container'}).find(

'a').find('img')['src'].replace("..", '')

d['price'] = product[0].find(attrs={'class': 'product_price'}).find(

attrs={'class': 'price_color'}).get_text()

d['in_stock'] = product[0].find(

attrs={'class': 'instock availability'}).get_text().replace('\n', '').strip()

d['rating'] = product[0].find(attrs={'class': 'star-rating'})[

'class'][-1]

data.append(d)

d = {}

return data

def scrape() -> None:

logging.info(

LOG_MSG + f"scraping started {datetime.datetime.now().isoformat()}")

book_products = get_products()

logging.info(

LOG_MSG + f"transforming data {datetime.datetime.now().isoformat()}")

data_list = to_list(book_products)

to_json = json.dumps(data_list, indent=4)

logging.info(

LOG_MSG + f"uploading file {datetime.datetime.now().isoformat()}")

upload_to_gcs(to_json)

logging.info(

LOG_MSG + f"scraping process finished, deleting instance {datetime.datetime.now().isoformat()}")

delete_instance()

if __name__ == '__main__':

setup_env_variables()

setup_logging()

try:

scrape()

except Exception:

handle_exception()

Aunque ya mencioné que no examinaremos el código a detalle, sí quiero que notemos un punto importante: si regresamos al diagrama de arquitectura, se darán cuenta de que existe un workflow cuyo trabajo es crear la instancia de Compute Engine y la configuración de la misma para iniciar la ejecución de la tarea. Pero ¿quién destruye la instancia cuando el servicio termina su ejecución? La respuesta es: el mismo servicio, ya que solo éste sabe cuándo ha terminado de ejecutarse, sin depender de otras técnicas como polling para ese fin.

El código que presento a continuación muestra cómo este servicio destruye la instancia en dos diferentes situaciones: si la ejecución fue correcta y si produjo algún error (ya que no queremos una instancia de Compute Engine activa si algo salió mal).

El intento de borrado de la instancia está dentro de un bloque de try except, ya que si ejecutas pruebas locales, la instancia de Compute Engine probablemente no exista.

def delete_instance():

gce_client = compute_v1.InstancesClient()

try:

gce_client.delete(

project=os.environ.get('PROJECT_ID'),

zone=os.environ.get('ZONE'),

instance=os.environ.get('INSTANCE_NAME')

)

except NotFound:

pass

El último elemento a examinar, pero no menos importante, es nuestro Dockerfile, donde el punto principal es la última línea. Como podrás ver, la ejecución del servicio es iniciada dentro de este mismo archivo.

FROM python:3.10-slim

ENV PYTHONUNBUFFERED True

ENV APP_HOME /app

WORKDIR $APP_HOME

COPY . ./

RUN apt-get update -y && apt-get update

RUN pip install --upgrade pip

RUN pip install -r requirements.txt

CMD python scrape.py

El scraper por sí mismo ya integra dos servicios de Google Cloud. El primero de ellos es Cloud Logging, empleado para registrar el proceso mientras se ejecuta, como también cualquier error que pueda surgir durante su ejecución. Las ventajas de usar Cloud Logging es que puedes crear vistas personalizadas que solo muestren registros del servicio en cuestión, así como crear alertas para algún tipo de registro que notifiquen a las personas encargadas del monitoreo del servicio si algo ha salido mal.

Para integrar Cloud Logging en tu proyecto, primero debes inicializar el cliente y luego podrás utilizar la librería logging de Python para generar tus registros en Google Cloud.

def setup_logging():

logging_client = google.cloud.logging.Client()

logging_client.setup_logging()

El segundo servicio utilizado es Cloud Storage, el cual utilizamos para guardar los datos convertidos a JSON que extrajimos del sitio web. A continuación puedes examinar la función upload_to_gcs().

def upload_to_gcs(data: str) -> None:

gcs_client = storage.Client()

bucket = gcs_client.get_bucket(os.environ.get("BUCKET"))

blob = bucket.blob(f"{os.environ.get('FOLDER')}/results.json")

blob._chunk_size = 8388608  # 1024 * 1024 B * 16 = 8 MB

blob.upload_from_string(data=data, content_type="application/json")

Para generar la imagen de nuestro servicio y llevarla a Google Cloud haremos uso de dos servicios más: Cloud Build para construir nuestra imagen junto con todas sus dependencias, y Container Registry, donde nuestra imagen estará disponible para ser descargada e implementada en cualquier servicio que ejecuta contenedores.

Para construir nuestra imagen y guardarla en Container Registry haremos uso del siguiente comando:

Nota: Esto debe ser ejecutado dentro del fichero donde se encuentra el archivo Docker.

gcloud builds submit --tag=gcr.io/<project_id>/<service_name>

Bien, ahora ya tenemos un servicio que ejecuta nuestra tarea de scraping e integra diferentes servicios de Google Cloud, ahora ¿cómo automatizamos este proceso de acuerdo con un horario establecido? Ahora hablaremos sobre workflows y como éstos nos pueden ayudar a ejecutar tareas secuenciales o incluso paralelas en Google Cloud.

Examinemos el archivo start_vm.yml.

main:

steps:

- init:

assign:

- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}

- projectNumber: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}

- zone: "us-east1-b"

- machineType: "e2-micro"

- instanceName: "scraper-vm"

- log_vm_creation_start:

call: sys.log

args:

data: ${"Scraper logs:initializing VM"}

- create_and_start_vm:

call: googleapis.compute.v1.instances.insert

args:

project: ${projectId}

zone: ${zone}

body:

name: ${instanceName}

machineType: ${"zones/" + zone + "/machineTypes/" + machineType}

disks:

- initializeParams:

sourceImage: "projects/cos-cloud/global/images/cos-stable-93-16623-102-1"

diskSizeGb: "10"

boot: true

autoDelete: true

networkInterfaces:

- accessConfigs:

- kind: compute#accessConfig

name: "external-nat"

networkTier: "PREMIUM"

metadata:

items:

- key: "gce-container-declaration"

value: '${"spec:\n  containers:\n  - name: scraper\n    image: gcr.io/" + projectId + "/scraper\n    stdin: false\n    tty: false\n  restartPolicy: Always\n"}'

- key: "google-logging-enabled"

value: "true"

- key: "google-monitoring-enabled"

value: "true"

serviceAccounts:

- email: "service-account-name@project-id.iam.gserviceaccount.com"

scopes:

- https://www.googleapis.com/auth/devstorage.read_only

- https://www.googleapis.com/auth/logging.write

- https://www.googleapis.com/auth/monitoring.write

- https://www.googleapis.com/auth/servicecontrol

- https://www.googleapis.com/auth/service.management.readonly

- https://www.googleapis.com/auth/trace.append

- https://www.googleapis.com/auth/cloud-platform

- log_wait_for_vm_network:

call: sys.log

args:

data: ${"Scraper logs:Waiting for VM network to initialize"}

- wait_for_vm_network:

call: sys.sleep

args:

seconds: 10

- log_vm_creation_end:

call: sys.log

args:

data: ${"Scraper logs:VM initialized successfully"}

El primer paso de nuestro workflow es init, donde simplemente asignamos algunas variables que serán utilizadas más adelante.

Adicionalmente, acá puedes configurar el tamaño de la máquina que quieres crear, desde máquinas predefinidas hasta máquinas con especificaciones personalizadas.

- init:

assign:

- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}

- projectNumber: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}

- zone: "us-east1-b"

- machineType: "e2-micro"

- instanceName: "scraper-vm"

El siguiente paso simplemente es una utilidad para registrar en Cloud Logging.

- log_vm_creation_start:

call: sys.log

args:

data: ${"Scraper logs:initializing VM"}

Ahora revisemos el paso más complejo, el cual ejecuta la creación de la máquina, instala Container OS y la imagen de nuestro servicio.

  • call: dicta el API de Google Cloud y el método que invocamos para la creación de la máquina.
  • machineType: es donde configuramos el tipo de máquina que se creará.
  • sourceImage es donde definimos la versión de Container OS.
  • natIP: es donde asignamos la IP estática pública que definimos anteriormente.
  • metadata: allí se utiliza la estructura de llave. Podemos agregar parámetros adicionales de configuración, como por ejemplo startup scripts, pero en este caso puntual acá es donde configuramos la imagen de nuestro servicio que será instalado y ejecutado en la máquina.
  • serviceAccounts: permite configurar la cuenta de servicio asociada a la máquina y los scopes que ésta tendrá para llamar otros servicios de Google Cloud.

- create_and_start_vm:

call: googleapis.compute.v1.instances.insert

args:

project: ${projectId}

zone: ${zone}

body:

name: ${instanceName}

machineType: ${"zones/" + zone + "/machineTypes/" + machineType}

disks:

- initializeParams:

sourceImage: "projects/cos-cloud/global/images/cos-stable-93-16623-102-1"

diskSizeGb: "10"

boot: true

autoDelete: true

networkInterfaces:

- accessConfigs:

- kind: compute#accessConfig

name: "external-nat"

networkTier: "PREMIUM"

metadata:

items:

- key: "gce-container-declaration"

value: '${"spec:\n  containers:\n  - name: scraper\n    image: gcr.io/" + projectId + "/scraper\n    stdin: false\n    tty: false\n  restartPolicy: Always\n"}'

- key: "google-logging-enabled"

value: "true"

- key: "google-monitoring-enabled"

value: "true"

serviceAccounts:

- email: "service-account-name@project-id.iam.gserviceaccount.com"

scopes:

- https://www.googleapis.com/auth/devstorage.read_only

- https://www.googleapis.com/auth/logging.write

- https://www.googleapis.com/auth/monitoring.write

- https://www.googleapis.com/auth/servicecontrol

- https://www.googleapis.com/auth/service.management.readonly

- https://www.googleapis.com/auth/trace.append

- https://www.googleapis.com/auth/cloud-platform

El resto de los pasos generan registros para monitorear el estado del workflow.

- log_wait_for_vm_network:

call: sys.log

args:

data: ${"Scraper logs:Waiting for VM network to initialize"}

- wait_for_vm_network:

call: sys.sleep

args:

seconds: 10

- log_vm_creation_end:

call: sys.log

args:

data: ${"Scraper logs:VM initialized successfully"}

Para desplegar nuestro workflow y realizar pruebas, utilizamos el siguiente comando:

gcloud workflows deploy <workflow_name> --source=<file-name>.yml --service-account=<service-account-email> --location=<region>

Finalmente, podemos asociar nuestro workflow a un Cloud Scheduler que, básicamente, es una tarea cron configurada en Google Cloud para definir el horario de ejecución. Para esto, seguimos los siguientes pasos:

  • Abrimos nuestra consola de GCP.
  • Buscamos workflows o flujos de trabajo.
  • Seleccionamos nuestro workflow de la lista.
  • Vamos a la pestaña de Activadores y damos clic a Editar.
  • En la página de edición, buscamos la sección de Activadores.
  • Damos clic en Agregar Activador Nuevo.
  • Seleccionamos Cloud Scheduler.
  • Damos un nombre y establecemos el horario usando la sintaxis para tareas cron.
  • Establecemos la zona horaria.
  • Vamos a la sección de Configura la ejecución y seleccionamos la cuenta de servicio.
  • Clic en Crear.

Acá te dejo una página donde puedes generar tu frase cron.

¡Y eso es todo! Si llegaste hasta acá, has implementado una arquitectura que te permitirá ejecutar tareas de larga duración sin ningún límite. También integramos múltiples servicios de Google Cloud utilizando las librerías cliente para Python.

Saludos.

⚠️
Las opiniones y comentarios emitidos en este artículo son propiedad única de su autor y no necesariamente representan el punto de vista de Revelo.

Revelo Content Network da la bienvenida a todas las razas, etnias, nacionalidades, credos, géneros, orientaciones, puntos de vista e ideologías, siempre y cuando promuevan la diversidad, la equidad, la inclusión y el crecimiento profesional de los profesionales en tecnología.