Temporal로 ML 추론 시스템 구축하기 (1)
워크플로우 오픈소스 중 하나인 Temporal을 살펴보았습니다.
Temporal
지난번 Airbyte를 공부하면서, Temporal이라는 오픈소스가 Airbyte의 백엔드에서 Scheduler 역할을 한다는 사실을 알게 되었다.
처음에는 Temporal을 가벼운 버전의 Airflow라고만 생각했는데, 직접 사용해보니 두 프레임워크의 추구미가 완전히 다르다고 느꼈다.
Temporal이란?
Temporal은 Uber의 Cadence로부터 fork되어 탄생한 오픈소스로, 공식 문서에는 durable execution platform으로 소개되고 있다.
여기서 durable execution이란, 우리가 작성한 비지니스 로직이 한번 실행되고 나면 반드시 완료될 수 있도록 보장한다는 의미이다.
그래서 Airflow는 workflow management platform로서 워크플로우의 전체 실행 계획을 먼저 정의(Top-down)하는 반면에, Temporal은 작업의 실행에 초점을 맞추어 실행 중에도 동적으로 워크플로우를 구성(Bottom-up)한다는 차이가 있다.
Properties
Temporal은 간단한 코드로도 장애 및 불안정한 네트워크 등에 대응할 수 있어 안정적인 실행을 보장한다.
Fault Tolerance(내결함성)
작업이 실패하면 중단된 부분부터 다시 실행하여 자동으로 상태를 복구한다. 덕분에 장시간 실행 작업이 필요한 경우에는 강력한 솔루션이 될 수 있다.
Scalability(확장성)
Client 뿐만 아니라 다수의 Worker에서도 분산 실행이 가능하며, 이를 위한 로드밸런싱도 지원한다.
Asynchronous Execution(비동기 실행)
워크플로우를 비동기 방식으로 디자인할 수 있다. 또한 이벤트 기반 아키텍처를 구성할 때, 시스템 간의 과도한 결합을 줄일 수 있어 복잡성을 낮출 수 있다.
출처 - Keynote: The way forward for event-driven architectures
Polyglot Development(다양한 언어 지원)
Go, Java, Python, PHP, Typescript, .Net 등 다양한 언어에 대해 SDK를 지원한다.
Main Concepts
Temporal로 어플리케이션을 구현하기 위해서는 Workflow와 Activity의 개념을 알아 둘 필요가 있다. 이해를 돕기 위해 Airflow와 한번 더 비교해 보았다.
- | Temporal | Airflow |
---|---|---|
워크플로우 정의 | Workflow | DAG |
작업 단위 | Activity | Operator |
Workflow와 Activity의 관계는 얼핏 Airflow의 DAG와 Operator의 관계와 유사해 보인다. 하지만 Airflow는 Temporal의 Workflow와 달리, DAG run이 실패하여 재실행하더라도 시스템 차원에서 똑같은 결과를 보장하지 않는다.
즉, Temporal은 재실행 시 똑같은 결과를 보장하도록 노력하고 있으며, 이를 결정적 모델(Deterministic Model)이라고 부른다.
Workflow
Workflow는 비즈니스 프로세스를 정의하는 stateful function이다. 비즈니스 로직을 캡슐화하며, 뒤에서 설명할 Activity의 실행 순서를 결정한다.
Temporal은 Workflow가 실행되는 동안 발생하는 모든 이벤트를 로그로 저장한다. 그러다 Workflow가 재시작되면 이전 실행에서 기록해둔 로그를 재실행(Replay)하여 기존과 동일한 결과를 보장한다.
만약 재시작 도중 코드가 변경되거나 실행할 때마다 함수가 다른 결과값을 가진다면, Temporal은 에러를 발생시킴으로써 일종의 제약을 둔다.
예를 들어, Temporal은 다음과 같은 비결정적 코드를 허용하지 않는다.
1
2
if random.choice([True, False]):
workflow.sleep(10)
대신 Temporal이 제공하는 API를 사용하면 무작위 값 또한 이벤트 로그에 기록하기 때문에, 여전히 결정적 실행 방식을 유지할 수 있다.
1
2
3
4
rand_value = workflow.random_seed()
if rand_value % 2 == 0:
workflow.sleep(10)
참고로 코드가 바뀌는 경우에는 Workflow Versioning을 통해 해결 가능하다.
Activity
Activity는 실제 비즈니스 로직이 실행되는 stateless function이다.
Workflow에 의해 호출되어 외부 API 호출, 데이터베이스 조회 등 실제 작업을 Worker에서 수행한다. 실패 시 재시도(Retry)가 가능한 가장 작은 단위이며, 분산 환경에서 개별적으로 확장될 수 있다.
상태를 유지하지 않기 때문에 결정적 실행에 대한 제약은 없지만, 멱등성(Idempotency)을 보장하는 방식으로 구현하기를 권장하고 있다.
Components
Temporal은 다른 분산 프레임워크와 마찬가지로 서버(Temporal Server)와 실제 로직을 실행하는 워커(Worker)를 분리하여 동작한다.
Temporal Server
- Control Plane
- Workflow의 생성과 실행을 관리, 상태 유지
- 클라이언트 요청을 처리하고 Worker에게 작업을 할당
- Workflow를 직접 실행하는게 아님에 주의
Worker
- Data Plane
- 실제 비지니스 로직이 실행되는 공간
- Temporal server로부터 Workflow 또는 Activity의 실행을 위임 받음
- 일반적으로 gRPC 통신 사용
Client SDK
- SDK를 통해 Temporal Server와 상호 작용
- 비지니스 로직을 작성하고 Workfow 또는 Activity를 실행
Persistence Layer
- Workflow의 상태를 저장
- 현재 사용 가능한 DB
- MySQL
- PostgresQL
- Cassandra
- Workflow의 검색 기능을 위해 필요시 사용
- Elasticsearch
Architecture
Temporal은 비동기 및 분산 처리를 위한 Queue 시스템, 스케쥴링을 위한 Timer 시스템 등을 갖추고 있다.
출처 - Designing A Workflow Engine from First Principles
기존에 이러한 시스템을 직접 구현하기 위해서는, 메시지를 전달할 브로커(ex: RabbitMQ)와 상태를 저장할 캐시 서버(ex: Redis)가 필요했다.
하지만 Temporal은 이러한 복잡한 구조를 단순화하고 엣지 케이스를 줄임으로써, 개발자들이 비지니스 로직에만 집중할 수 있는 환경을 제공한다.
Practice
이번 글에서는 Docker를 이용해 Temporal을 띄워보고, Python에서 Workflow와 Activity를 실행해보려고 한다.
다음 GitHub 링크에 상세한 내용을 정리해 두었습니다.
Docker
Temporal과 Temporal UI의 이미지는 Docker Hub에 업로드되어 있기 때문에, 우리는 docker compose를 이용하여 간편하게 띄워 볼 수 있다. 백엔드 데이터베이스로는 PostgresQL를 선택하였다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
services:
postgres:
container_name: postgres
image: postgres:14-alpine
ports:
- "5432:5432"
environment:
POSTGRES_USER: temporal
POSTGRES_PASSWORD: temporal
volumes:
- ./docker/volumes/postgres:/var/lib/postgresql/data
healthcheck:
test: [ "CMD", "pg_isready", "-U", "temporal" ]
interval: 10s
retries: 3
start_period: 5s
restart: unless-stopped
temporal:
container_name: temporal
image: temporalio/auto-setup:1.27.0
ports:
- "7233:7233"
environment:
- DB=postgres12
- DB_PORT=5432
- POSTGRES_USER=temporal
- POSTGRES_PWD=temporal
- POSTGRES_SEEDS=postgres # server host name
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
volumes:
- ./docker/temporal:/etc/temporal/config/dynamicconfig
depends_on:
- postgres
temporal-ui:
container_name: temporal-ui
image: temporalio/ui:2.35.1
ports:
- "8080:8080"
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
depends_on:
- temporal
1
docker compose up --build
Temporal Worker
Temporal은 Python 환경에서의 예제들을 잘 정리해 두었다. 이중 Client에서 Workflow와 Activity를 바로 실행할 수 있는 예제를 살펴보자.
Docker 파일에서 Temporal Server의 포트를 7233
으로 열었으므로, Client는 localhost:7233
로 접근을 요청하게 된다. 한번 더 강조하지만 코드의 실행이 Temporal Server에서 이루어지는 것이 아님을 기억하자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
async def main():
client = await Client.connect("localhost:7233")
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Temporal UI
이제 Temporal UI에 접근해보자. 최초에 http://localhost:8080
로 접속하면 다음과 같은 페이지를 볼 수 있다.
위에서 설명한 Python 스크립트를 실행하면, Workflow와 Activity가 실행된 history를 볼 수 있다.
Next
다음 글에서는 Temporal을 사용해 머신러닝 추론 서버를 디자인 하는 내용을 다루려고 한다.
기존의 머신러닝 추론 서버 시스템의 패턴에는 어떤 것들이 있는지, 그리고 Temporal이 여기서 어떤 역할을 할 수 있는지 설명할 예정이다!