Груба паралельна обробка за допомогою черги роботи

У цьому прикладі ви запустите Job Kubernetes з кількома паралельними робочими процесами.

У цьому прикладі, при створенні кожного Pod, він бере одиницю роботи з черги завдань, завершує її, видаляє її з черги та завершує роботу.

Ось огляд кроків у цьому прикладі:

  1. Запустіть службу черги повідомлень. У цьому прикладі ви використовуєте RabbitMQ, але ви можете використовувати іншу. На практиці ви налаштовували б службу черги повідомлень один раз і використовували б її для багатьох робочих завдань.
  2. Створіть чергу та заповніть її повідомленнями. Кожне повідомлення являє собою одне завдання для виконання. У цьому прикладі повідомлення — це ціле число, на якому ми будемо виконувати тривалі обчислення.
  3. Запустіть Job, що працює над завданнями з черги. Job створює декілька Podʼів. Кожен Pod бере одне завдання з черги повідомлень, обробляє його та завершує роботу.

Перш ніж ви розпочнете

Ви вже маєте бути знайомі з основним, не-паралельним використанням Job.

Вам треба мати кластер Kubernetes, а також інструмент командного рядка kubectl має бути налаштований для роботи з вашим кластером. Рекомендується виконувати ці настанови у кластері, що має щонайменше два вузли, які не виконують роль вузлів управління. Якщо у вас немає кластера, ви можете створити його, за допомогою minikube або використовувати одну з цих пісочниць:

Вам знадобиться реєстр образів контейнерів, куди ви можете завантажувати образи для запуску у своєму кластері.

Цей приклад завдання також передбачає, що у вас вже встановлений Docker локально.

Запуск служби черги повідомлень

У цьому прикладі використовується RabbitMQ, проте ви можете адаптувати приклад для використання іншої служби повідомлень типу AMQP.

На практиці ви можете налаштувати службу черги повідомлень один раз у кластері та використовувати її для багатьох завдань, а також для служб з тривалим виконанням.

Запустіть RabbitMQ таким чином:

# Створіть Service для використання StatefulSet
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created

Тестування служби черги повідомлень

Тепер ми можемо спробувати доступ до черги повідомлень. Ми створимо тимчасовий інтерактивний Pod, встановимо деякі інструменти на ньому, і спробуємо працювати з чергами.

Спочатку створіть тимчасовий інтерактивний Pod.

# Створіть тимчасовий інтерактивний контейнер
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...

Зверніть увагу, що ваше імʼя Pod та запрошення командного рядка будуть відрізнятися.

Далі встановіть amqp-tools, щоб працювати з чергами повідомлень. Наступні команди показують, що вам потрібно запустити в середовищі інтерактивної оболонки у цьому Pod:

apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils

Пізніше ви створите образ контейнера, який включає ці пакети.

Далі перевірте, що ви можете виявити Service для RabbitMQ:

# Виконайте ці команди всередині Pod
# Зауважте, що для служби rabbitmq-service існує DNS-ім\я, яке надається Kubernetes:
nslookup rabbitmq-service
Server:        10.0.0.10
Address:    10.0.0.10#53

Name:    rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152

(адреси IP будуть відрізнятися)

Якщо надбудова kube-dns не налаштована правильно, попередній крок може не працювати для вас. Ви також можете знайти IP-адресу для цього Service у змінній середовища:

# виконайте цю перевірку всередині Pod
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152

(IP-адреса буде відрізнятися)

Далі перевірте, що ви можете створити чергу, і публікувати та отримувати повідомлення.

# Виконайте ці команди всередині Pod
# У наступному рядку rabbitmq-service - це імʼя хоста, за яким можна звертатися до rabbitmq-service.
# 5672 - це стандартний порт для rabbitmq.
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# Якщо ви не могли отримати доступ до "rabbitmq-service" на попередньому кроці,
# використовуйте цю команду натомість:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672

# Тепер створіть чергу:

/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo

Опублікуйте одне повідомлення в черзі:

/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello

# І отримайте його назад.

/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo 1>&2
Hello

У останній команді інструмент amqp-consume взяв одне повідомлення (-c 1) з черги, і передав це повідомлення на стандартний ввід довільної команди. У цьому випадку програма cat виводить символи, зчитані зі стандартного вводу, а echo додає символ нового рядка, щоб приклад був читабельним.

Заповнення черги завданнями

Тепер заповніть чергу деякими симульованими завданнями. У цьому прикладі завдання — це рядки, які потрібно надрукувати.

На практиці вміст повідомлень може бути таким:

  • назви файлів, які потрібно обробити
  • додаткові прапорці для програми
  • діапазони ключів у таблиці бази даних
  • параметри конфігурації для симуляції
  • номери кадрів сцени для рендерингу

Якщо є великі дані, які потрібно лише для читання всіма Podʼами Job, зазвичай це розміщують на спільній файловій системі, наприклад NFS, і монтується це на всі Podʼи тільки для читання, або напишіть програму в Pod так, щоб вона могла нативно читати дані з кластерної файлової системи (наприклад: HDFS).

У цьому прикладі ви створите чергу та заповните її, використовуючи інструменти командного рядка AMQP. На практиці ви можете написати програму для заповнення черги, використовуючи бібліотеку клієнтів AMQP.

# Виконайте це на вашому комп'ютері, а не в Pod
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1  -d
job1

Додайте елементи до черги:

for f in apple banana cherry date fig grape lemon melon
do
  /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done

Ви додали 8 повідомлень у чергу.

Створення образу контейнера

Тепер ви готові створити образ, який ви будете запускати як Job.

Job буде використовувати утиліту amqp-consume для читання повідомлення з черги та виконання реальної роботи. Ось дуже простий приклад програми:

#!/usr/bin/env python

# Просто виводить стандартний вивід і очікує протягом 10 секунд.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)

Дайте скрипту дозвіл на виконання:

chmod +x worker.py

Тепер створіть образ. Створіть тимчасову теку, перейдіть в неї, завантажте Dockerfile, і worker.py. У будь-якому випадку, створіть образ за допомогою цієї команди:

docker build -t job-wq-1 .

Для Docker Hub позначте ваш образ застосунка вашим імʼям користувача і завантажте його на Hub за допомогою таких команд. Замініть <username> на ваше імʼя користувача Hub.

docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1

Якщо ви використовуєте альтернативний реєстр образів контейнерів, позначте образ і завантажте його туди замість цього.

Визначення Job

Ось маніфест для Job. Вам потрібно зробити копію маніфеста Job (назвіть його ./job.yaml), і змініть імʼя образу контейнера, щоб відповідало імені, яке ви використовували.

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 8
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: gcr.io/<project>/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
      restartPolicy: OnFailure

У цьому прикладі кожен Pod працює над одним елементом з черги, а потім виходить. Таким чином, кількість завершень завдання відповідає кількості виконаних робочих елементів. Тому у прикладі маніфесту .spec.completions встановлено на 8.

Запуск Job

Тепер запустіть завдання:

# це передбачає, що ви вже завантажили та відредагували маніфест
kubectl apply -f ./job.yaml

Ви можете зачекати, доки завдання успішно виконається, використовуючи тайм-аут:

# Перевірка для умови що імена нечутливі до регістру
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1

Далі перевірте стан завдання:

kubectl describe jobs/job-wq-1
Name:             job-wq-1
Namespace:        default
Selector:         controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-1
Annotations:      <none>
Parallelism:      2
Completions:      8
Start Time:       Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses:    0 Running / 8 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                job-name=job-wq-1
  Containers:
   c:
    Image:      container-registry.example/causal-jigsaw-637/job-wq-1
    Port:
    Environment:
      BROKER_URL:       amqp://guest:guest@rabbitmq-service:5672
      QUEUE:            job1
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen  LastSeen   Count    From    SubobjectPath    Type      Reason              Message
  ─────────  ────────   ─────    ────    ─────────────    ──────    ──────              ───────
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-hcobb
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-weytj
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-qaam5
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-b67sr
  26s        26s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-xe5hj
  15s        15s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-w2zqe
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-d6ppa
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-p17e0

Усі Podʼи для цього завдання успішно виконалися! У вас вийшло.

Альтернативи

Цей підхід має перевагу у тому, що вам не потрібно змінювати вашу "робочу" програму, щоб вона була обізнана, що є черга роботи. Ви можете включити незмінену робочу програму у свій контейнерний образ.

Використання цього підходу передбачає запуск сервісу черги повідомлень. Якщо запуск сервісу черги є не зручним, ви можете розглянути один з інших шаблонів завдань.

Цей підхід створює Pod для кожного робочого елемента. Якщо ваші робочі елементи займають лише декілька секунд, створення Podʼу для кожного робочого елемента може додати багато накладних витрат. Розгляньте інший дизайн, наприклад, у прикладі тонкої паралельної черги робочих елементів, що виконує кілька робочих елементів на кожному Pod.

У цьому прикладі ви використовували утиліту amqp-consume для читання повідомлення з черги та запуску реальної програми. Це має перевагу у тому, що вам не потрібно модифікувати свою програму, щоб вона була обізнана про існування черги. У прикладі тонкої паралельної черги робочих елементів показано, як спілкуватися з чергою робочих елементів за допомогою клієнтської бібліотеки.

Обмеження

Якщо кількість завершень встановлена меншою, ніж кількість елементів у черзі, то не всі елементи будуть оброблені.

Якщо кількість завершень встановлена більшою, ніж кількість елементів у черзі, то завдання не буде здаватися завершеним, навіть якщо всі елементи у черзі були оброблені. Воно буде запускати додаткові Podʼи, які будуть блокуватися, чекаючи на повідомлення. Вам потрібно буде створити свій власний механізм для виявлення наявності роботи для виконання і вимірювати розмір черги, встановлюючи кількість завершень відповідно.

Існують малоймовірні перегони для цього патерну. Якщо контейнер був завершений між часом, коли повідомлення було підтверджено командою amqp-consume, і часом, коли контейнер вийшов з успішним завершенням, або якщо вузол зазнає збою, перш ніж kubelet зможе розмістити успіх Podʼа назад до сервера API, то завдання не буде здаватися завершеним, навіть якщо всі елементи у черзі були оброблені.

Змінено June 20, 2024 at 12:44 PM PST: Sync changest from andygol/k8s-website (36d05bc8a1)