Indexando datos sin indisponibilidad

Indexando datos sin indisponibilidad

En el artículo anterior, Indexando datos en Elasticsearch, vimos cómo indexar datos en Elasticsearch usando una aplicación Ruby on Rails. Sin embargo, como la atención se centró únicamente en presentar el proceso de indexación, no se abordó el rendimiento involucrado en el proceso y, como resultado, la forma en que se implementó no es escalable para bases de datos con cientos de miles de registros.

Este artículo presentará un enfoque que se asemeja a un deploy blue-green, creando un nuevo índice que recibirá los registros. Y una vez finalizado el proceso, este nuevo índice reemplazará al índice anterior, lo que podría eliminar el impacto que genera la necesidad de reindexar completamente una base.

Antes de comenzar, conviene hacer algunas observaciones. En nuestro proyecto de ejemplo estamos indexando un candidato a la vez, es decir, por cada solicitud HTTP solo se indexa un candidato. Esto se puede mejorar indexando batches de registros a cada requisición HTTP. Dicho esto, indexar simplemente en batches no es suficiente para bases de datos con cientos de miles o incluso millones de registros, lo que hace que la indexación tarde horas en completarse.

Entendiendo el problema

Inicialmente, haremos download y configurar nuestro proyecto ejecutando los siguientes comandos. Este proyecto es una aplicación Ruby on Rails que tiene un modelo llamado Candidate, que persiste en nuestra base de datos local (PostgreSQL) y se indexa en Elasticsearch. El proyecto está descrito detalladamente en el artículo anterior.

# Faz o download do projeto
git clone git@github.com:joaofelipesus/reindex-blue-green.git
# Navega até o diretório do projeto
cd reindex-blue-green
# Faz o build do containers
docker-compose build
# Inicia os containers
docker-compose up

Puedes acceder al container en el que se está ejecutando nuestra aplicación web y, con ello, crear nuestra base de datos y tabla de candidatos ejecutando los siguientes comandos:

# Conecta ao container em que a aplicação web está rodando
docker compose exec web bash
# Cria o banco de dados
bundle exec rails db:create
# Executa as migrações pendentes
bundle exec rails db:migrate

Si todo va bien al acceder a localhost:3000, se mostrará la página de bienvenida de Rails. Con nuestro entorno configurado, inicialmente generaremos e indexaremos solo cien candidatos ejecutando el rake tasks siguiente:

# Cria cem candidatos
bundle exec rails candidates:seed
# Indexa todos os candidatos da base
bundle exec rails candidates:mass_indexation

A continuación, accediendo a http://localhost:5601/app/dev_tools#/console y ejecutando la query GET candidates/_count, vemos que tenemos cien candidatos indexados en un índice llamado candidates.

Para visualizar el problema de performance, aumentaremos el tamaño de nuestraa base de candidatos. Altera el número de candidatos generados por la rake task candidates:seed de 100 a 5 mil y ejecuta nuevamente las tasks candidates:seed y candidates:mass_indexation.

Nota que el tiempo de ejecución del proceso de indexación subió considerablemente. Imagina un escenario donde tenemos una feature que hace búsquedas en un índice Elasticsearch y las presenta a nuestros usuarios. En este contexto, en caso de necesitar reindexar completamente nuestra base debido a la inserción de un nuevo atributo, esta funcionalidad tendrá inestabilidad, una vez que los resultados de búsquedas hechas por nuestros usuarios devolverá resultados apenas entre los candidatos ya indexados.

Proceso de reindexación blue-green

Ahora agregaremos un nuevo atributo llamado favorite_language al modelo Candidate. Debemos mapearlo en el índice candidates que está definido en el archivo app/repositories/candidate_repository.rb. Acto seguido, añadiremos algunos valores para que sean guardados durante la rake task candidates:seed en este nuevo atributo.

# Gerar a migration que adiciona a nova coluna
bundle exec rails g migration add-favorite-lnguage-to-candidate

# Código da migration
class AddFavoriteLnguageToCandidate < ActiveRecord::Migration[7.0]
  def change
    add_column :candidates, :favorite_language, :string, default: nil

	# Set an value to :favorite_language attribute
	Candidate.all.find_each { |c| c.update(favorite_language: %w[elixir ruby javascript].sample) }
  end
end

# Adicionar novo atributo ao mapeamento utilizado no Elasticsearch
class CandidateRepository
  # NOTE: defines the host of elasticsearch, we use elasticsearch instead of localhost due to docker network.
  ELASTICSEARCH_HOST = 'http://elasticsearch:9200/'.freeze
  # NOTE: defines the mapping with candidate attributes and its types.
  INDEX_MAPPING = {
    mappings: {
      properties: {
        name: { type: "text" },
        email: { type: "text" },
        experience_time: { type: "integer" },
        focus: { type: "text" },
        favorite_language: { type: "text" }
      }
    }
  }.freeze
  ...
end

# Adicionar valor à favorite_language durante a geração de candidatos
namespace :candidates do
  desc 'Create candidates'
  task seed: :environment do
    5_000.times do |index|
      Candidate.create!(
        name: Faker::Name.name,
        email: Faker::Internet.email,
        experience_time: rand(1..10),
        focus: %w[backend frontend fullstack].sample,
        favorite_language: %w[elixir ruby javascript].sample
      )
    puts "#{index + 1} of 5000"
  end
end

# Executar a nova migração
docker-compose exec web bash
bundle exec rails db:migrate

Ahora con el nuevo mapeo del índice, podríamos borrar el índice anterior y crearíamos en la secuencia un nuevo índice con el nuevo atributo siendo indexado. Mientras tanto, este abordaje impactaría a nuestros usuarios ficticios, una vez que las búsquedas arrojarían resultados inconsistentes hasta que se indexe completamente el índice.

Para lidiar con este problema, utilizaremos un método basado en Alias, considerando que un alias es un puntero para un índice. Con eso, nuestra aplicación piensa que hace inserciones o búsquedas en un índice llamado candidates_index. Sin embargo, este es el nombre del alias que apunta a algún otro índice, como se ilustra en la imagen siguiente.

Para poder crear un nuevo índice en paralelo e indexar candidatos para este nuevo índice, necesitamos agregar un método que cree un nuevo alias que apunte a un índice. Dicho esto, agregaremos un rake task que, Al crear un nuevo índice, también creará un alias que apunte a este nuevo índice. También debemos adaptar el código de task candidates:mass_indexation para que trabaje con Alias, de forma que, al ser ejecutado, cree el Alias de producción que apunte al nuevo índice.

class CandidateRepository
  ...

  # Create an alias with received name and index.
  #
  # @param alias_name [String] alias name.
  # @param index_name [String] index name.
  def create_alias(alias_name, index_name)
    response = conn.post(
      '_aliases',
      {
        actions: [ { add: { index: index_name, alias: alias_name } } ]
      }.to_json
    )
  
    return 'Alias created' if response.status == 200
  
    response.body
  end
end

#######################################################################

# Rake tasks
desc 'Index candidates'
task mass_indexation: :environment do
  splitted_date_time = DateTime.current.to_s.split('T')
  date_values = splitted_date_time.first.split('-')
  time_values = splitted_date_time.last.split(':')
  index_name = "candidates_#{date_values[0]}_#{date_values[1]}_#{date_values[2]}_#{time_values[0]}_#{time_values[1]}"
  
  puts 'Creating index ...'
  CandidateRepository.new.create_index(index_name)
  puts 'Index created'
  
  puts 'Creating alias ...'
  CandidateRepository.new.create_alias(
    CandidateRepository::PRODUCTION_ALIAS, 
    index_name
  )
  puts 'Alias created'
  
  Candidate.all.find_each.with_index do |candidate, index|
    CandidateRepository.new.index_candidate(candidate, index_name) 
    puts "#{index + 1} of #{Candidate.count}"
  end
end

Ahora ejecutaremos la task candidates:mass_indexation que creará un nuevo índice llamado candidates_, seguido de un timestamp, y un alias denominado candidates_index que apunte al nuevo índice. A partir del nuevo Alias se harán las interacciones con el índice, de forma que el índice apuntado por el Alias candidates_index cambie sin que haya necesidad de alterarse en nuestra aplicación.

Al completar estos comandos, agregaremos una nueva task llamada candidates:blue_green_indexation. Ésta creará un nuevo índice, también llamado candidates_ y seguido de un timestamp. Con el fin de esta etapa, se creará un Alias llamado candidates_processing, que apuntará para el nuevo índice y, al final de esta etapa, se iniciará la indexación de todos los candidatos de la base.

desc 'Background indexation'
task blue_green_indexation: :environment do
  splitted_date_time = DateTime.current.to_s.split('T')
  date_values = splitted_date_time.first.split('-')
  time_values = splitted_date_time.last.split(':')
  index_name = "candidates_#{date_values[0]}_#{date_values[1]}_#{date_values[2]}_#{time_values[0]}_#{time_values[1]}"

  puts 'Creating index ...'
  CandidateRepository.new.create_index(index_name)
  puts 'Index created'

  CandidateRepository.new.create_alias(
    CandidateRepository::PROCESSING_ALIAS, 
    index_name
  )

  Candidate.all.find_each.with_index do |candidate, index|
    CandidateRepository.new.index_candidate(
      candidate, 
      CandidateRepository::PROCESSING_ALIAS
    )
    puts "#{index + 1} of #{Candidate.count}"
  end 
end

Después de ejecutarse la nueva task, tenemos dos índices en los que candidates_index emula un índice consumido por nuestra aplicación para hacer búsquedas y candidates_processing, que es un nuevo índice de nuevo mapeao que tiene sus datos insertados en paralelo. De esta forma, el ambiente de producción será poco afectado.

El hecho de que la aplicación esté interactuando con un alias, y no directamente con el índice, nos da la posibilidad de cambiar a qué índice apunta el alias. Una vez indexados los candidatos haremos que el alias candidates_index comience a señalar el nuevo índice y luego elimine el índice anterior, como se ilustra en la siguiente figura.

Comenzaremos agregando a la clase CandidateRepository un método llamado switch_production_alias, que implementará el flujo descrito encima.

class CandidateRepository
  ...
  
  # Switch indexes pointed by production and processing alias, and at the end removes old production index.
  def switch_production_alias
    production_index_name = get_index_name(PRODUCTION_ALIAS)
    processing_index_name = get_index_name(PROCESSING_ALIAS)

    point_alias_to_index(
      alias_name: PRODUCTION_ALIAS, 
      index_name: processing_index_name
    )

    remove_index_pointer(
      alias_name: PRODUCTION_ALIAS, 
      index_name: production_index_name
    )

    remove_index_pointer(
      alias_name: PROCESSING_ALIAS, 
      index_name: processing_index_name
    )
    drop_index(production_index_name)
  end

  private

  ...

  # Return the index name that received alias points.
  #
  # @param alias_name [String] alias name.
  def get_index_name(alias_name)
    response_body = conn.get("#{alias_name}/_alias").body
    JSON.parse(response_body).keys.first
  end

  # Points received alias to received index.
  #
  # @param alias_name [String] alias name.
  # @param index_name [String] index name.
  def point_alias_to_index(alias_name:, index_name:)
    conn.post(
      '_aliases',
      {
        actions: [
          {
            add: {
              index: index_name,
              alias: alias_name
            }
          }
        ]
      }.to_json
    )
  end

  # Remove index pointer from received alias.
  #
  # @param alias_name [String] alias name.
  # @param index_name [String] index name.
  def remove_index_pointer(alias_name:, index_name:)
    conn.post(
      '_aliases',
      {
        actions: [
          {
            remove: {
              index: index_name,
              alias: alias_name
            }
          }
        ]
      }
    )
  end

  # Drop received index.
  #
  # @param index_name [String] index name.
  def drop_index(index_name)
    conn.delete(index_name)
  end
end

Ahora con la lógica que reemplaza los índices implementada, crearemos un nuevo task de nombre candidates:switch_production_alias que haga esta llamada.

desc 'Switch indexes pointed by alias candidates_index'
task switch_indexes: :environment do
  puts 'Starting switch indexes'
  CandidateRepository.new.switch_production_alias
  puts 'Switch indexes finished'
end

Antes de ejecutar la nueva task, haz lo propio con el comando GET candidates_index y guarda el nombre del índice que está siendo apuntado por este alias. Al ejecutar la task, nota la rapidez para cambiar entre índices. Al ejecutar GET candidates_index podemos confirmar que el índice apuntado por el alias candidates_index sí cambió.

Conclusión

El enfoque propuesto en este artículo nos permite crear y completar un nuevo índice mientras la aplicación continúa usando el índice anterior. Una vez finalizado el procesamiento de datos, podemos ejecutar la lógica que intercambia el índice apuntado por el alias de producción, eliminando el índice anterior, sin ningún impacto en la producción durante esta operación. Esta transición ocurre en una ventana de ejecución muy corta, que consta de solo unas pocas llamadas HTTP a Elasticsearch.

Algunos puntos que vale la pena mencionar son: el hecho de que Sidekiq no se utilizó para procesar llamadas en segundo plano y la no utilización de la indexación por lotes compatible con Elasticsearch. Cabe mencionar que en aplicaciones reales el uso de estas herramientas es válido en conjunto con el proceso de indexación paralelo sugerido en este artículo.

Todos los códigos utilizados en este artículo están presentes en este repositorio.

¡Éxito!

💡
Las opiniones y comentarios emitidos en este artículo son propiedad única de su autor y no necesariamente representan el punto de vista de Listopro.

Listopro Community da la bienvenida a todas las razas, etnias, nacionalidades, credos, géneros, orientaciones, puntos de vista e ideologías, siempre y cuando promuevan la diversidad, la equidad, la inclusión y el crecimiento profesional de los profesionales en tecnología.