대부분의 Python 개발자들은 **비동기 작업(Background Task)**을 처리할 때 Celery를 떠올린다.

하지만 Celery는 설정이 복잡하고, 유지보수가 어렵고, 과한 기능이 많아 작은 프로젝트에서는 오히려 불편하다.

 

📌 RQ (Redis Queue)와 Dramatiq은 이런 문제를 해결하는 가볍고 강력한 대안이다.

특히 RQ는 간단한 Redis 기반 큐 시스템, Dramatiq은 Celery와 비슷하면서도 훨씬 직관적이고 빠른 대안이다.

1. RQ (Redis Queue): 초간단 Background Task 라이브러리

 

RQ는 Celery보다 훨씬 단순한 구조로, Redis만 있으면 즉시 사용 가능하다.

설치부터 사용까지 1분이면 충분하다.

 

🚀 RQ 설치 및 사용법

pip install rq
import time
from redis import Redis
from rq import Queue

# Redis 연결 및 큐 생성
redis_conn = Redis()
queue = Queue(connection=redis_conn)

# 비동기 실행할 함수 정의
def background_task(n):
    time.sleep(n)
    return f"완료: {n}초 후"

# 작업을 큐에 넣기
job = queue.enqueue(background_task, 5)

print(f"작업 ID: {job.id}")  # 작업 ID 출력

🔹 RQ Worker 실행 (작업 처리)

 

RQ는 Celery처럼 복잡한 설정 없이 worker 실행만으로 비동기 작업을 처리할 수 있다.

rq worker

결과:

RQ Worker가 실행되면서 대기 중인 작업을 즉시 처리한다.

Celery처럼 복잡한 설정 없이, 단순한 작업을 Redis에서 관리할 때 매우 유용하다.

2. Dramatiq: Celery를 완벽하게 대체할 강력한 백그라운드 태스크 라이브러리

 

Celery는 강력하지만, 설정이 너무 복잡하고 무겁다는 단점이 있다.

Dramatiq은 Celery와 거의 동일한 기능을 제공하지만 훨씬 가볍고 빠르다.

 

🚀 Dramatiq 설치 및 기본 사용법

pip install dramatiq redis
import dramatiq
import time

# 비동기 태스크 정의
@dramatiq.actor
def background_task(n):
    time.sleep(n)
    print(f"완료: {n}초 후")

# 태스크 실행
background_task.send(5)

🔹 Dramatiq Worker 실행

 

Celery처럼 복잡한 celeryconfig.py 설정 없이, 단순히 worker만 실행하면 된다.

dramatiq my_script

Dramatiq의 장점

Celery보다 설정이 간편하고,

Redis, RabbitMQ, Kafka 등 다양한 메시지 브로커를 지원,

멀티 프로세싱과 멀티스레딩 지원으로 성능이 뛰어나다.

📌 RQ vs Celery vs Dramatiq 비교

기능RQCeleryDramatiq

설치 난이도 매우 쉬움 복잡함 쉬움
메시지 브로커 Redis Redis, RabbitMQ, SQS Redis, RabbitMQ, Kafka
성능 가벼움 무거움 빠름
비동기 작업 지원 지원 지원
멀티 프로세스 지원 지원 지원 (최적화)

RQ는 간단한 작업 큐,

Dramatiq은 Celery를 대체할 강력한 옵션이다.

🚀 결론: 언제 어떤 걸 써야 할까?

 

RQ를 선택해야 할 때

Redis만 사용하고 싶을 때

단순한 백그라운드 태스크 큐가 필요할 때

빠르게 개발하고 싶을 때

 

Dramatiq을 선택해야 할 때

Celery의 기능이 필요하지만 더 가볍고 빠른 솔루션이 필요할 때

RabbitMQ, Kafka 등 다양한 브로커를 활용할 때

성능 최적화가 중요한 시스템에서 사용할 때

📌 Celery가 너무 무겁다면?

📌 RQ와 Dramatiq을 적극 고려해보자!

 

 

GPT-4, Claude, Mistral 등 다양한 AI 모델이 쏟아지는 시대다. 하지만 모델 선택과 비용 문제가 개발자들에게는 가장 큰 고민이다.

특히 API 요청을 최적화하고 비용을 줄이는 방법이 절실한데, 이를 해결해 줄 강력한 솔루션이 바로 LiteLLM이다.

 

LiteLLM은 하나의 통합 API로 OpenAI, Anthropic, Mistral, Llama 등 여러 LLM(Large Language Model) API를 동시에 다룰 수 있는 라이브러리다.

즉, 코드를 수정하지 않고도 다양한 LLM을 교체하며 최적의 성능과 비용을 찾을 수 있다.

LiteLLM이 해결하는 문제

 

❌ 1. 특정 AI API에 종속되는 문제 (Vendor Lock-in)

기존에는 OpenAI API를 쓰면 코드를 OpenAI 전용으로 작성해야 했음

하지만 Anthropic Claude, Mistral, Llama 같은 더 저렴하고 빠른 대안이 계속 등장

LiteLLM을 사용하면 코드를 수정하지 않고도 다양한 AI 모델로 즉시 전환 가능

 

💰 2. API 비용 절감

GPT-4-turbo는 훌륭하지만 가격이 비싸다

경우에 따라 Claude 3, Mistral 7B 같은 모델이 더 싸고 빠를 수 있음

LiteLLM을 사용하면 비용 대비 최적의 모델을 자동 선택 가능

 

🚀 3. 로드 밸런싱 & 장애 대응

특정 API가 느려지거나 장애가 발생하면 자동으로 다른 LLM으로 전환 가능

여러 API 제공자를 조합하여 자동 로드 밸런싱 및 페일오버(failover) 가능

LiteLLM 설치 및 기본 사용법

 

LiteLLM을 설치하려면 간단히 다음 명령어를 실행하면 된다.

pip install litellm

이제 OpenAI API를 호출하는 기존 코드에서 LiteLLM으로 쉽게 변경할 수 있다.

import litellm

# OpenAI API처럼 사용 가능
response = litellm.completion(
    model="gpt-4-turbo",
    messages=[{"role": "user", "content": "AI 모델 추천해줘"}],
    api_key="your-openai-key"
)

print(response['choices'][0]['message']['content'])

기존 OpenAI API 코드와 100% 호환되기 때문에, 기존 코드를 수정할 필요 없이 바로 적용할 수 있다.

🚀 다양한 LLM을 자유롭게 전환하기

 

LiteLLM은 단순히 OpenAI만 지원하는 게 아니다.

Anthropic Claude, Mistral, Azure OpenAI, Groq 같은 다양한 모델을 한 줄만 변경하여 교체 가능하다.

# OpenAI 대신 Claude 3 사용
response = litellm.completion(
    model="claude-3-opus",
    messages=[{"role": "user", "content": "AI 모델 추천해줘"}],
    api_key="your-anthropic-key"
)
# Mistral 7B 사용 (오픈소스 모델)
response = litellm.completion(
    model="mistral-7b",
    messages=[{"role": "user", "content": "빠르고 저렴한 AI 추천해줘"}],
    api_key="your-mistral-key"
)

코드를 수정할 필요 없이, API 키만 변경하면 다른 AI 모델을 사용할 수 있다.

🛠️ LiteLLM의 고급 기능

 

🔄 1. 자동 모델 선택 (Failover & Load Balancing)

 

어떤 모델이 가장 빠르고 저렴한지 자동으로 선택할 수 있다.

예를 들어, GPT-4가 느리다면 Claude 3로 자동 전환하도록 설정 가능하다.

response = litellm.completion(
    model=["gpt-4-turbo", "claude-3-opus", "mistral-7b"],
    messages=[{"role": "user", "content": "현재 시간은?"}],
    api_key={"openai": "your-openai-key", "anthropic": "your-anthropic-key"}
)

리스트 형태로 여러 모델을 입력하면 자동으로 최적의 모델을 선택하여 요청을 보낸다.

💰 2. API 비용 절감을 위한 라우팅

 

LiteLLM을 사용하면 요청을 자동으로 가장 저렴한 API로 라우팅 가능하다.

예를 들어, 간단한 요청은 Mistral 7B에 보내고, 복잡한 요청만 GPT-4로 보내는 식이다.

def choose_model(user_query):
    if len(user_query) < 50:
        return "mistral-7b"  # 가벼운 요청은 저렴한 모델 사용
    else:
        return "gpt-4-turbo"  # 복잡한 요청은 GPT-4 사용

response = litellm.completion(
    model=choose_model("간단한 질문이야"),
    messages=[{"role": "user", "content": "간단한 질문이야"}]
)

비용을 절약하면서도 성능을 유지할 수 있다.

🏎️ 3. 로컬 모델 (Llama3, Mixtral)도 사용 가능

 

LiteLLM은 클라우드 API뿐만 아니라, 로컬에서 실행되는 모델도 지원한다.

즉, OpenAI API와 로컬 Llama3 모델을 동시에 사용할 수도 있다.

response = litellm.completion(
    model="http://localhost:8000/v1/completions",  # 로컬 Llama3 API
    messages=[{"role": "user", "content": "로컬 모델 사용해줘"}]
)

비용 절감과 프라이버시 보호를 위해 로컬 모델을 함께 활용할 수 있다.

💡 LiteLLM을 활용하면 이런 문제가 해결된다

문제LiteLLM 솔루션

특정 API(OpenAI)에 종속됨 다양한 LLM을 자유롭게 전환 가능
비용이 너무 비쌈 자동으로 저렴한 모델을 선택하여 비용 절감
API 응답 속도가 느림 여러 API를 조합하여 로드 밸런싱
특정 API가 다운됨 자동으로 다른 API로 전환 (Failover)
로컬 모델과 클라우드 API를 함께 사용하고 싶음 OpenAI + Llama3 같이 혼합 사용 가능

이제 OpenAI, Claude, Mistral, Llama3를 쉽게 조합하여 최적의 AI 환경을 만들 수 있다.

🚀 결론: LiteLLM을 사용해야 하는 이유

 

LiteLLM은 단순한 API Wrapper가 아니다.

AI 비용 절감, 성능 최적화, 장애 대응까지 해결할 수 있는 강력한 솔루션이다.

 

📌 언제 LiteLLM을 써야 할까?

 

OpenAI API 비용이 너무 부담될 때 → Mistral, Claude 등 저렴한 대안 자동 선택

API 장애나 속도 문제를 해결하고 싶을 때 → Failover & Load Balancing 지원

여러 LLM을 자유롭게 전환하며 실험하고 싶을 때

로컬 Llama3 같은 모델과 클라우드 API를 함께 사용하고 싶을 때

 

지금 당장 LiteLLM을 도입하면 비용 절감과 성능 최적화를 동시에 잡을 수 있다.

 

 

데이터 엔지니어링과 머신러닝에서 Pandas, Spark, Parquet 같은 툴은 필수적이다. 하지만 대용량 데이터를 다룰 때 I/O 속도가 병목이 되는 경우가 많다.

이런 문제를 해결하는 핵심 기술이 바로 Apache Arrow다.

 

Arrow는 고성능 컬럼 기반 메모리 포맷으로, 기존의 데이터 포맷(Pandas, CSV, JSON 등)보다 훨씬 빠른 데이터 처리와 변환을 가능하게 한다.

특히, Python, C++, Java, Rust, Go 등 다양한 언어 간 데이터 교환이 필요할 때 강력한 해결책이 된다.

Apache Arrow가 왜 중요한가?

 

🔥 1. 컬럼 기반 인메모리 포맷으로 속도 극대화

 

기존의 Pandas나 CSV는 데이터를 행(row) 단위로 저장하지만, Arrow는 컬럼(column) 단위 저장을 사용한다.

이는 벡터 연산 최적화캐시 효율성 향상으로 성능을 획기적으로 개선한다.

행 저장(Row-based, Pandas, CSV)

데이터를 한 행씩 읽음 → 연산 시 불필요한 데이터도 함께 로딩됨

컬럼 저장(Columnar, Arrow, Parquet)

필요한 컬럼만 읽을 수 있어 연산 속도 극대화

 

즉, 머신러닝이나 데이터 분석에서 특정 컬럼만 선택해도 Pandas보다 훨씬 빠르게 처리 가능

🚀 2. Pandas보다 10배 빠른 데이터 변환

 

Pandas를 쓰다 보면 메모리 사용량이 높고 속도가 느린 경우가 많다.

Apache Arrow를 사용하면 Pandas 데이터를 거의 즉시 변환할 수 있다.

import pandas as pd
import pyarrow as pa

# 샘플 데이터 생성
df = pd.DataFrame({'id': range(1_000_000), 'value': range(1_000_000)})

# Pandas -> Arrow 변환
arrow_table = pa.Table.from_pandas(df)

print(arrow_table)

결과:

 

기존 Pandas를 사용하면 데이터를 복사하는 비용이 발생하지만, Arrow는 Zero-Copy 변환이 가능해 속도가 10배 이상 빠르다.

⚡ 3. Parquet, Feather 파일을 즉시 로딩

 

Arrow는 Parquet, Feather 포맷을 직접 읽고 쓰는 기능을 제공한다.

Pandas로 대용량 Parquet 파일을 읽는 것보다 훨씬 빠르게 처리할 수 있다.

import pyarrow.parquet as pq

# Parquet 파일 읽기
table = pq.read_table("data.parquet")

# Pandas로 변환 (Zero-Copy 방식, 빠름)
df = table.to_pandas()

기존 Pandas의 read_parquet()보다 메모리 사용량이 적고 속도가 훨씬 빠르다.

특히 대규모 데이터 분석, 머신러닝 피처 엔지니어링에서 필수적인 최적화 기법이다.

🔗 4. Spark, Dask, Ray와 직접 통합 가능

 

Arrow는 Spark, Dask, Ray 등 분산 데이터 처리 시스템과 네이티브하게 연동된다.

즉, Pandas 대신 Arrow를 사용하면 분산 시스템에서 더 빠른 데이터 교환이 가능하다.

import pyarrow.dataset as ds

# CSV 데이터를 Arrow Dataset으로 변환 (고속 처리)
dataset = ds.dataset("data.csv", format="csv")

# 필터링 후 Pandas 변환
filtered_df = dataset.to_table(filter=ds.field("age") > 30).to_pandas()

결과: Pandas보다 빠르게 CSV 데이터를 읽고 필터링할 수 있다.

🏆 5. 언어 간 데이터 공유 (Python ↔ C++ ↔ Java)

 

기존 방식으로는 Python과 C++ 간 데이터를 교환하려면 직렬화/역직렬화가 필요했다.

Arrow를 사용하면 Python ↔ C++ 간 데이터를 Zero-Copy 방식으로 즉시 공유 가능하다.

import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:8815")
flight_info = client.get_flight_info(flight.FlightDescriptor.for_path("dataset"))
reader = client.do_get(flight_info.endpoints[0].ticket)
table = reader.read_all()

df = table.to_pandas()  # Zero-Copy 방식으로 변환

즉, Arrow를 사용하면 JSON이나 CSV 변환 없이 데이터를 그대로 공유할 수 있다.

이는 분산 머신러닝, 대규모 데이터 분석 시스템에서 엄청난 성능 향상을 가져온다.

📌 Arrow를 언제 사용해야 할까?

Pandas가 너무 느릴 때 → Arrow로 변환하면 속도가 10배 빨라짐

대량의 CSV, Parquet 데이터를 분석할 때 → 메모리 절약 + 빠른 로딩

Spark, Dask, Ray 등 분산 환경에서 데이터 교환할 때

Python ↔ C++ ↔ Java 간 데이터를 빠르게 공유할 때

🚀 결론: Apache Arrow는 데이터 엔지니어의 필수 도구

 

Apache Arrow는 단순한 데이터 포맷이 아니라 데이터 엔지니어링의 새로운 표준이다.

Pandas, Spark, Parquet, Dask 등의 속도를 극대화하고 머신러닝, 데이터 분석, 분산 시스템에서 최고의 성능을 낼 수 있다.

 

 

 

 

Redis는 흔히 캐시로만 사용된다고 생각하지만, 사실 훨씬 강력한 기능을 제공한다. 특히 RedisJSONRedisSearch를 결합하면, Redis를 단순한 캐시가 아니라 초고속 NoSQL 문서 저장소 및 검색 엔진으로 활용할 수 있다.

 

이 조합을 활용하면 MongoDB나 Elasticsearch 같은 NoSQL 솔루션을 대체할 수도 있으며, 검색 성능을 획기적으로 개선할 수 있다. 특히 JSON 데이터를 빠르게 저장하고 검색해야 하는 시스템에서 강력한 대안이 된다.

RedisJSON: Redis에서 JSON 문서 저장 및 조작하기

 

RedisJSON은 Redis에 JSON 데이터를 네이티브로 저장할 수 있도록 설계된 모듈이다. 기존에는 JSON을 문자열로 저장하고, 가져온 뒤 파싱하는 방식이었다면, 이제는 JSON을 네이티브 객체로 저장하고, 특정 필드만 업데이트하거나 조회할 수 있다.

 

RedisJSON의 주요 특징

1. JSON 데이터를 네이티브로 저장 가능

set 명령어 대신 JSON.SET 사용

특정 필드만 수정 가능 (JSON.SET user $.age 30)

2. SQL 없이도 JSON 필드 조회 및 업데이트 가능

JSON.GET으로 특정 필드만 가져오기

JSON.ARRAPPEND, JSON.ARRINDEX로 JSON 배열 조작

3. 메모리 효율적인 저장 방식

내부적으로 효율적인 압축 및 바이너리 포맷 사용

Redis의 인메모리 성능을 유지하면서도 JSON 문서를 다룰 수 있음

 

RedisJSON 활용 예제

# RedisJSON 모듈이 활성화된 Redis 컨테이너 실행
docker run -d --name redis-stack -p 6379:6379 redis/redis-stack
import redis

# Redis 클라이언트 연결
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# JSON 문서 저장
r.execute_command('JSON.SET', 'user:1', '$', '{"name": "Alice", "age": 25, "skills": ["Python", "Redis"]}')

# 특정 필드 가져오기
print(r.execute_command('JSON.GET', 'user:1', '$.name'))  # ["Alice"]

# JSON 필드 업데이트
r.execute_command('JSON.SET', 'user:1', '$.age', '30')

# 배열 데이터 추가
r.execute_command('JSON.ARRAPPEND', 'user:1', '$.skills', '"Docker"')

# 최종 결과 확인
print(r.execute_command('JSON.GET', 'user:1'))

출력 결과:

 

이 방식은 기존의 SET 명령어로 JSON을 문자열로 저장하는 방식보다 훨씬 효율적이며, 특정 필드만 업데이트할 수 있어서 네트워크 비용도 절감할 수 있다.

RedisSearch: Redis에서 초고속 검색 구현하기

 

RedisSearch는 JSON 데이터나 문자열 데이터를 기반으로 인덱싱 및 검색 기능을 추가하는 모듈이다. Elasticsearch처럼 텍스트 검색, 필터링, 정렬, 랭킹 기능을 지원하며, Redis의 초고속 성능을 그대로 유지한다.

 

RedisSearch의 주요 기능

1. 역색인(Inverted Index) 기반 초고속 검색

Elasticsearch와 유사한 강력한 검색 성능을 제공

완전한 텍스트 검색정렬 기능 지원

2. JSON 및 해시 데이터 인덱싱 지원

FT.CREATE를 사용하여 JSON 문서 필드를 인덱싱 가능

RedisJSON과 결합하여 MongoDB+Elasticsearch 조합을 Redis 하나로 해결

3. 쿼리 기능 강화

FT.SEARCH자연어 검색조건 검색 가능

필터링, 정렬, 페이징 지원

 

RedisSearch 활용 예제

 

먼저, JSON 데이터에 대한 검색 인덱스를 생성한다.

docker run -d --name redis-stack -p 6379:6379 redis/redis-stack
import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# RedisJSON 데이터 추가
r.execute_command('JSON.SET', 'user:1', '$', '{"name": "Alice", "age": 25, "skills": ["Python", "Redis"]}')
r.execute_command('JSON.SET', 'user:2', '$', '{"name": "Bob", "age": 30, "skills": ["Go", "Kubernetes"]}')
r.execute_command('JSON.SET', 'user:3', '$', '{"name": "Charlie", "age": 35, "skills": ["Java", "Spring"]}')

# 검색 인덱스 생성 (JSON 문서용)
r.execute_command('FT.CREATE', 'idx:user', 'ON', 'JSON',
                  'PREFIX', '1', 'user:',  
                  'SCHEMA', '$.name', 'AS', 'name', 'TEXT',  
                            '$.age', 'AS', 'age', 'NUMERIC')

# 텍스트 검색 실행
result = r.execute_command('FT.SEARCH', 'idx:user', '@name:Alice')
print(result)  # JSON 문서 검색 결과 출력

출력 결과:

 

이제 Redis에서 단순 key-value 조회가 아니라 텍스트 검색과 필터링까지 가능하게 되었다.

RedisJSON + RedisSearch 활용 사례

 

✅ 1. 실시간 검색 시스템

검색이 필요한 서비스(예: 쇼핑몰, 뉴스, 블로그)에 적용

Elasticsearch보다 훨씬 가벼운 대안으로 사용 가능

JSON 문서를 저장하면서 검색을 동시에 수행

 

✅ 2. 세션 및 유저 데이터 저장소

유저 프로필 데이터(JSON)를 저장하면서, 특정 조건으로 빠르게 검색 가능

예: “Redis를 아는 개발자만 검색”, “30대 이상 필터링”

 

✅ 3. 로그 및 이벤트 분석

실시간 로그 데이터를 저장한 후, 특정 이벤트를 검색할 수 있음

빠른 분석 및 모니터링 시스템 구축 가능

결론: Redis는 더 이상 단순한 캐시가 아니다

 

대부분의 개발자들은 Redis를 단순한 캐시나 세션 저장소로만 사용하지만, RedisJSON과 RedisSearch를 결합하면 강력한 NoSQL 데이터베이스와 검색 엔진으로 활용할 수 있다.

 

💡 이 기능을 언제 사용해야 할까?

JSON 데이터를 다루면서 빠른 검색이 필요할 때

Redis의 초고속 성능을 그대로 유지하면서 텍스트 검색이 필요한 경우

Elasticsearch가 너무 무거운 경우 가벼운 대체재로

 

즉, MongoDB + Elasticsearch 조합을 하나로 줄일 수 있는 강력한 솔루션이 될 수 있다. 🚀

 

 

대부분의 개발자들은 SQLite를 소형 데이터베이스로 사용해 본 경험이 있을 것이다. 하지만 OLAP(Online Analytical Processing) 작업을 수행할 때는 SQLite가 제약이 많다. 대용량 데이터를 다룰 때 빠르고 효율적인 분석을 원한다면, DuckDB가 최고의 선택이 될 수 있다.

 

DuckDB는 “In-process OLAP Database”, 즉 프로세스 내에서 실행되는 OLAP 전용 컬럼형 데이터베이스다. 이는 기존의 SQLite가 OLTP(Online Transaction Processing)에 최적화된 것과 정반대의 철학을 가진다. 특히 대용량 데이터 분석, 머신러닝 전처리, 로그 분석, BI(Business Intelligence) 애플리케이션 등에 적합하다.

왜 DuckDB인가?

 

DuckDB는 기존의 OLAP 데이터베이스와 비교했을 때 몇 가지 강력한 장점을 가진다.

1. 컬럼 저장 방식(Columnar Storage)

SQLite와 같은 행 기반(Row-based) DB와 달리, DuckDB는 컬럼 기반(Columnar)으로 데이터를 저장한다.

분석 쿼리는 특정 열(컬럼)만을 대상으로 수행되므로, 불필요한 데이터 로딩을 최소화하고 속도를 극대화한다.

2. 고성능 벡터 연산(Vectorized Execution)

데이터 처리를 벡터 연산(Vectorized Execution) 방식으로 수행하여 CPU 캐시 효율성을 극대화하고, 불필요한 연산을 최소화한다.

3. 단일 파일, 설치 필요 없음 (Zero-Dependency)

SQLite처럼 단일 파일로 동작하며, 별도의 서버가 필요 없다.

Python, R, C++, JavaScript 등 다양한 언어에서 로컬 프로세스 내에서 바로 실행할 수 있다.

4. Parquet 및 CSV와 원활한 호환

Pandas, Arrow, Parquet, CSV 등을 직접 로드하여 분석할 수 있다. 즉, 대규모 데이터를 복사하지 않고 바로 쿼리 가능하다.

5. 멀티코어 병렬 처리 지원

SQLite는 기본적으로 싱글 스레드지만, DuckDB는 멀티코어를 활용한 병렬 쿼리 실행을 지원한다.

DuckDB 활용 예제

 

1. Python에서 DuckDB 활용하기

 

DuckDB는 Pandas, Parquet, CSV와 자연스럽게 연동되므로 기존 데이터 분석 워크플로우에 쉽게 통합할 수 있다.

import duckdb
import pandas as pd

# 샘플 데이터프레임 생성
df = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'value': [10, 20, 30, 40, 50]
})

# DuckDB 메모리 내에서 SQL 실행
result = duckdb.query("SELECT id, value * 2 AS double_value FROM df").df()

print(result)

결과:

 

메모리 내에서 바로 SQL을 실행할 수 있다는 점에서 DuckDB는 Pandas보다 더 SQL 친화적인 데이터 분석을 가능하게 한다.

2. CSV 파일을 직접 쿼리하기

 

DuckDB는 CSV 파일을 로드하지 않고 그대로 SQL 쿼리를 실행할 수 있다.

import duckdb

query = """
SELECT category, AVG(price) as avg_price
FROM read_csv_auto('products.csv')
GROUP BY category
ORDER BY avg_price DESC
"""

result = duckdb.query(query).df()
print(result)

이 방식의 장점은 대용량 CSV 파일도 빠르게 처리할 수 있다는 점이다.

Pandas에서 CSV를 읽고 나서 분석하는 것보다 훨씬 빠르고 메모리 사용량이 적다.

3. Parquet 파일을 직접 분석하기

 

Parquet은 대규모 데이터 저장 및 분석에 많이 사용되는데, DuckDB는 Parquet 파일을 바로 쿼리할 수 있다.

import duckdb

query = """
SELECT user_id, COUNT(*) as event_count
FROM 'events.parquet'
GROUP BY user_id
ORDER BY event_count DESC
"""

result = duckdb.query(query).df()
print(result)

Pandas와 비교하면 Parquet 파일을 불러오는 속도가 훨씬 빠르며, 데이터 크기가 클수록 메모리 절약 효과가 극대화된다.

DuckDB vs SQLite vs Pandas 성능 비교

기능DuckDBSQLitePandas

저장 방식 컬럼 기반 행 기반 데이터프레임 (메모리)
주요 용도 OLAP 분석 OLTP 트랜잭션 데이터 분석
병렬 처리 O (멀티코어 지원) X (싱글 스레드) X (GIL 제한)
SQL 지원 O O 제한적 (Pandas API)
CSV/Parquet 직접 분석 O X X (로드 필요)
메모리 사용 효율 높음 낮음 높음 (메모리 한계)

SQLite는 OLTP(트랜잭션)에 최적화되어 있어 OLAP 분석에서는 성능이 떨어지고, Pandas는 데이터가 커지면 메모리 이슈가 발생한다. DuckDB는 OLAP 분석에 최적화된 설계 덕분에, 둘의 단점을 보완하는 대안이 될 수 있다.

실전 활용 사례

 

✅ 1. 데이터 엔지니어링

대량의 로그 데이터 분석, ETL 처리, 데이터 정제에 활용 가능

CSV, Parquet 데이터를 직접 분석하여 사전 변환 없이 쿼리 실행 가능

 

✅ 2. 머신러닝 및 데이터 과학

Pandas보다 메모리 효율적이므로 대용량 데이터 프레임을 효율적으로 처리할 수 있음

데이터 분석, 피처 엔지니어링, 모델 학습 전처리에 유용

 

✅ 3. BI(비즈니스 인텔리전스) 및 대시보드

Parquet/CSV 데이터 소스를 실시간으로 쿼리하여 대시보드에 반영 가능

OLAP 분석에 최적화되어 있어 BI 분석 속도를 획기적으로 개선 가능

결론: DuckDB를 언제 사용해야 할까?

OLAP 분석을 빠르게 실행하고 싶을 때 (대량 데이터 요약, 그룹핑 등)

대용량 CSV 또는 Parquet 파일을 직접 쿼리하고 싶을 때

Pandas보다 더 SQL 친화적인 데이터 분석 환경이 필요할 때

SQLite보다 OLAP에 최적화된 경량 DB를 찾을 때

 

 
 
1. 왜 FAISS가 중요한가?
 
대규모 문서에서 유사한 정보를 검색하는 것은 AI 애플리케이션에서 필수적인 기능이다. 전통적인 방법(예: TF-IDF, BM25)은 단순한 키워드 매칭에 의존하여 문맥을 제대로 반영하지 못한다. 반면 **FAISS(Facebook AI Similarity Search)**를 활용하면 벡터 검색을 통해 고차원 의미를 고려한 초고속 검색이 가능하다.
 
FAISS는 특히 다음과 같은 경우에 강력하다.
수백만 개 이상의 문서를 실시간으로 검색해야 하는 경우
질문-응답 시스템(QA), 추천 시스템, 챗봇을 구축하는 경우
LLM(Large Language Model)과 연동하여 RAG(Retrieval-Augmented Generation) 시스템을 만들려는 경우
 
이번 글에서는 FAISS가 어떻게 작동하는지, 그리고 LangChain과 함께 벡터 검색 기반 문서 검색 시스템을 구축하는 방법을 살펴보자.
2. FAISS 개념 이해: 벡터 검색이란?
 
LLM이 문서를 이해하고 검색하려면 텍스트 데이터를 벡터로 변환해야 한다. 이를 **임베딩(Embedding)**이라 하며, 각 문장은 다차원 공간에서 하나의 점으로 표현된다. FAISS는 이런 벡터들을 효율적으로 저장하고 초고속 검색이 가능하도록 최적화된 라이브러리다.
 
FAISS가 특히 뛰어난 이유는 다음과 같다.
L2 거리 기반 최근접 이웃(Nearest Neighbor) 검색
수억 개의 벡터를 저장해도 빠른 인덱싱 및 검색 속도
CPU/GPU 최적화로 대규모 벡터 검색 성능 극대화
3. FAISS + LangChain을 활용한 문서 검색 시스템 구축
 
이제 실제로 FAISS를 활용하여 PDF 문서를 벡터화하고, LangChain을 통해 문서 검색 시스템을 구축하는 과정을 살펴보자.
📌 Step 1: 필요한 라이브러리 설치
 
다음 패키지를 설치해야 한다.

pip install langchain faiss-cpu openai pypdf sentence-transformers

📌 Step 2: PDF 문서 로드 및 텍스트 추출
 
먼저, PDF 문서에서 텍스트를 추출해야 한다.

from langchain.document_loaders import PyPDFLoader

# PDF 문서 로드
pdf_path = "example.pdf"  # 분석할 PDF 파일
loader = PyPDFLoader(pdf_path)
documents = loader.load()

# 첫 번째 문서 확인
print(documents[0].page_content)

이렇게 하면 PDF 문서의 내용을 documents 리스트에 저장할 수 있다.
📌 Step 3: 문서를 임베딩(Embedding) 벡터로 변환
 
텍스트 데이터를 벡터로 변환하려면, OpenAI 또는 Sentence Transformers를 사용할 수 있다. 여기서는 무료로 사용할 수 있는 sentence-transformers를 활용해 보자.

from langchain.embeddings import HuggingFaceEmbeddings

# 임베딩 모델 로드 (한국어 포함 지원)
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

# 첫 번째 문서 벡터화
text_sample = documents[0].page_content
vector = embedding_model.embed_query(text_sample)
print("벡터 차원:", len(vector))

이제 텍스트가 다차원 벡터로 변환되었으며, 이 벡터를 기반으로 검색할 수 있다.
📌 Step 4: FAISS를 활용한 벡터 저장 및 검색
 
이제 FAISS를 사용하여 벡터 데이터베이스를 구축하고 검색을 수행해 보자.

from langchain.vectorstores import FAISS

# 문서 데이터를 벡터화하여 FAISS에 저장
faiss_index = FAISS.from_documents(documents, embedding_model)

# FAISS 인덱스를 로컬에 저장 (추후 빠른 로드 가능)
faiss_index.save_local("faiss_index")

📌 Step 5: 유사한 문서 검색하기
 
이제 사용자가 입력한 질문과 가장 유사한 문서를 FAISS에서 검색해 보자.

# 저장된 FAISS 인덱스 불러오기
faiss_index = FAISS.load_local("faiss_index", embedding_model)

# 검색할 질문 입력
query = "계약서에서 위약금 조항을 어떻게 확인하나요?"

# FAISS를 통해 유사 문서 검색
search_results = faiss_index.similarity_search(query, k=3)

# 검색 결과 출력
for i, doc in enumerate(search_results):
    print(f"결과 {i+1}:")
    print(doc.page_content)
    print("="*80)

이제 사용자가 입력한 질문과 가장 유사한 문서 3개를 검색할 수 있다.
4. FAISS의 성능을 극대화하는 방법
 
FAISS를 사용할 때, 성능을 극대화하는 몇 가지 핵심 기술이 있다.
 
1️⃣ IVF(InvList File) 인덱싱을 활용한 검색 속도 향상
 
FAISS는 기본적으로 L2 거리 기반 검색을 사용하지만, IVF(인버스 리스트 파일) 인덱싱을 적용하면 속도를 더욱 빠르게 만들 수 있다.

import faiss

# 128차원 벡터를 저장할 IVF 인덱스 생성 (클러스터 개수 100)
d = 128  # 벡터 차원
nlist = 100  # 클러스터 개수
quantizer = faiss.IndexFlatL2(d)
index = faiss.IndexIVFFlat(quantizer, d, nlist)

# 학습 데이터로 클러스터링
index.train(vector_data)  # vector_data는 벡터 리스트
index.add(vector_data)  # 인덱스에 벡터 추가

2️⃣ GPU 가속을 활용한 대규모 검색
 
FAISS는 GPU 연산을 지원하므로 수천만 개의 벡터를 초고속 검색할 수 있다.

import faiss

# CPU -> GPU 변환
gpu_index = faiss.index_cpu_to_gpu(faiss.StandardGpuResources(), 0, index)

# GPU에서 벡터 검색 실행
D, I = gpu_index.search(query_vector, k=5)

5. FAISS와 LangChain을 활용한 실전 적용 사례
 
FAISS는 실제로 대용량 데이터를 빠르게 검색해야 하는 다양한 AI 프로젝트에 사용된다.
1. LLM 기반 RAG(Retrieval-Augmented Generation)
• FAISS를 사용하여 외부 문서를 검색하고, LLM의 컨텍스트를 보강
• ChatGPT와 결합하여 문맥을 더 잘 반영하는 챗봇 제작
2. 기업용 문서 검색 시스템
• 내부 정책 문서, 계약서, 기술 매뉴얼을 벡터화하여 검색
3. AI 기반 추천 시스템
• 유사한 뉴스 기사, 논문, 제품을 추천하는 서비스
결론
 
FAISS는 대규모 문서 검색 및 추천 시스템을 구축할 때 필수적인 벡터 검색 엔진이다. 특히 LangChain과 결합하면 강력한 RAG 기반 검색 시스템을 구축할 수 있다.
 

 

 

AI 기반 애플리케이션이 점점 복잡해지면서 단일 LLM 호출만으로 해결할 수 없는 문제들이 많아졌다. 예를 들어, 여러 개의 에이전트가 서로 다른 역할을 수행하며 협력해야 하는 경우가 많다. 단순한 체이닝(chaining)만으로는 이 문제를 해결하기 어려운데, LangGraph가 바로 이런 문제를 해결해 주는 핵심 기술이다.

 

이번 글에서는 LangGraph가 왜 필요한지, 어떻게 작동하는지, 그리고 실전에서 어떻게 활용할 수 있는지 살펴보자.

1. 왜 LangGraph가 필요한가?

 

LangChain의 기본 체이닝 기능은 직렬적인 흐름을 따르지만, 복잡한 문제를 해결하려면 병렬 처리비결정적인 흐름 제어가 필요하다.

 

기존 LangChain의 한계

순차적 호출: 기존 체이닝 방식은 하나의 LLM 응답이 다음 단계로 전달되는 직렬적 흐름을 갖는다.

다중 에이전트 협력 부족: 여러 개의 AI 에이전트가 상호작용하는 시나리오에서는 데이터 공유와 동적 제어가 어렵다.

상태 유지 어려움: 특정 상태를 저장하고 활용하는 기능이 부족하여, 반복적인 피드백 루프를 만들기 어렵다.

 

LangGraph는 이러한 한계를 해결하기 위해 비동기 병렬 실행, 상태 기반 워크플로우, 동적 흐름 제어를 지원한다.

2. LangGraph의 핵심 개념

 

LangGraph는 유향 비순환 그래프(DAG, Directed Acyclic Graph) 기반으로 워크플로우를 정의할 수 있도록 한다. 주요 개념은 다음과 같다.

노드(Node): 하나의 LLM 호출이나 함수 실행

엣지(Edge): 노드 간 데이터 흐름을 정의

상태(State): 워크플로우 내에서 유지되는 공유 정보

비동기 실행: 병렬 처리와 다중 에이전트 협업 가능

3. LangGraph 실전 코드: 다중 에이전트 협력 예제

 

LangGraph를 활용하여 법률 상담 챗봇을 구축해 보자. 이 챗봇은 다음과 같이 동작한다.

1. 사용자의 질문을 분석 에이전트가 분류한다.

2. 질문이 법률 관련인지 확인하고, 적절한 법률 전문가 에이전트에 전달한다.

3. 법률 전문가가 답변을 생성하면, 감수 에이전트가 내용을 검토한다.

4. 최종 응답을 사용자에게 제공한다.

 

📌 필요한 패키지 설치

pip install langchain langgraph openai

Step 1. LangGraph 기본 설정

from langgraph.graph import StateGraph
from langchain.chat_models import ChatOpenAI
from langchain.schema import AIMessage, HumanMessage, SystemMessage

# LLM 모델 설정
llm = ChatOpenAI(model="gpt-4", temperature=0)

# 워크플로우의 상태 정의
class WorkflowState:
    def __init__(self, user_input):
        self.user_input = user_input
        self.category = None
        self.response = None

Step 2. 노드(Node) 정의

 

LangGraph에서 노드는 함수를 의미하며, 각 노드는 특정 작업을 수행한다.

 

1. 사용자 입력 분석 에이전트

 

사용자의 질문이 법률 관련인지 판단하는 역할을 한다.

def classify_question(state: WorkflowState):
    prompt = f"다음 질문이 법률 관련인가? '예' 또는 '아니오'로 답변:\n\n{state.user_input}"
    result = llm.invoke([HumanMessage(content=prompt)]).content
    state.category = "법률" if "예" in result else "일반"
    return state

2. 법률 전문가 에이전트

 

법률 관련 질문일 경우 답변을 생성한다.

def legal_expert(state: WorkflowState):
    if state.category == "법률":
        prompt = f"다음 법률 질문에 대한 전문가 답변을 제공하시오:\n\n{state.user_input}"
        response = llm.invoke([HumanMessage(content=prompt)]).content
        state.response = response
    return state

3. 감수 에이전트

 

법률 전문가가 생성한 답변을 검토한다.

def review_response(state: WorkflowState):
    if state.response:
        prompt = f"다음 답변이 정확하고 윤리적인지 검토하고, 필요하면 수정하시오:\n\n{state.response}"
        state.response = llm.invoke([HumanMessage(content=prompt)]).content
    return state

Step 3. 그래프 생성 및 실행

 

이제 위에서 정의한 노드를 연결하여 그래프를 만든다.

# 그래프 생성
workflow = StateGraph(WorkflowState)

# 노드 추가
workflow.add_node("classify", classify_question)
workflow.add_node("legal_expert", legal_expert)
workflow.add_node("review", review_response)

# 엣지(Edge) 추가
workflow.set_entry_point("classify")
workflow.add_edge("classify", "legal_expert")
workflow.add_edge("legal_expert", "review")

# 그래프 실행
graph = workflow.compile()

# 사용자 입력을 받아 실행
user_query = "계약서를 검토하고 싶은데 어떤 조항을 신경 써야 할까요?"
initial_state = WorkflowState(user_query)
final_state = graph.invoke(initial_state)

print("최종 응답:", final_state.response)

4. LangGraph가 주는 가치

 

LangGraph를 활용하면 다음과 같은 장점이 있다.

병렬 처리 가능: 여러 에이전트가 동시에 작업 가능

상태 유지: 한 번 생성된 정보가 여러 노드에서 활용됨

비결정적 워크플로우 가능: 조건에 따라 동적으로 경로를 변경 가능

확장성 용이: 새로운 기능을 추가하는 것이 쉽다

5. 실제 활용 사례

 

LangGraph는 특히 멀티 에이전트 시스템이 필요한 곳에서 강력한 힘을 발휘한다. 실제 적용 사례는 다음과 같다.

1. 기업용 챗봇

HR 상담, 법률 지원, 기술 지원 등을 자동화하는 다중 AI 시스템 구축 가능

2. 연구 논문 분석

논문의 요약, 비교, 검토 작업을 분산 처리하여 빠른 정보 수집 가능

3. 금융 리스크 평가

여러 금융 모델을 동시에 실행하여 리스크 평가 자동화 가능

결론

 

LangGraph는 단순한 체이닝 방식의 한계를 극복하고, 복잡한 워크플로우를 AI 기반으로 설계할 수 있도록 해준다. 멀티 에이전트 시스템을 구축해야 하는 경우라면, LangGraph는 필수적인 도구가 될 것이다.

 

 

옷을 오래 입기 위해서는 단순히 세탁기를 돌리는 것이 아니라, 섬유의 특성을 이해하고 올바른 방법을 적용해야 한다. 잘못된 세탁 습관은 옷감을 손상시키고, 결국 수명을 단축시킨다. 본질적인 원리를 바탕으로 효과적인 세탁법을 깊이 있게 살펴보자.

1. 세제 사용량이 많으면 왜 옷이 상할까?

1-1. 과도한 세제 사용이 불러오는 문제점

많은 사람이 세제를 많이 넣으면 옷이 더 깨끗해진다고 생각하지만, 실제로는 그렇지 않다. 세탁 과정에서 헹굼이 충분하지 않으면 세제가 섬유에 남아 피부 자극을 유발하고, 원단을 손상시키는 원인이 된다.

세제가 남아 있는 상태에서 햇볕에 노출되면 섬유의 변색을 초래할 수도 있다. 특히 면이나 울 같은 천연 섬유는 세제 잔여물이 남으면 경화(딱딱해지는 현상)가 발생해 원단이 뻣뻣해진다.

1-2. 적절한 세제 사용량과 세탁법

  • 표준 세제량을 따른다: 보통 세탁세제는 제품 설명서에 적힌 권장량을 따르는 것이 가장 적절하다.
  • 고농축 세제는 더욱 적게 사용: 최근 나오는 고농축 세제는 기존 세제보다 사용량이 1/2~1/3 정도 적어도 충분한 세정력이 있다.
  • 헹굼 횟수 추가: 특히 유아용 옷이나 피부가 민감한 사람이라면 헹굼 횟수를 1~2회 추가하여 세제 찌꺼기를 완전히 제거하는 것이 중요하다.

2. 세탁 온도가 중요한 이유

2-1. 온도가 세탁에 미치는 영향

세탁물의 오염도를 고려하여 적절한 온도를 설정해야 한다. 온도가 너무 낮으면 기름때나 단백질 오염이 제거되지 않고, 너무 높으면 섬유가 손상될 수 있다.

  • 30~40도: 대부분의 세탁물에 적합한 온도. 일반적인 오염 제거 가능.
  • 60도 이상: 침구류, 수건 등 세균 번식이 쉬운 직물 세탁 시 유용. 다만, 면 이외의 재질에는 주의해야 한다.
  • 찬물 세탁이 필요한 경우: 니트, 울, 실크 같은 천연 섬유는 변형을 막기 위해 30도 이하의 물에서 세탁하는 것이 좋다.

2-2. 찬물과 뜨거운 물을 적절히 활용하는 방법

  • 흰옷의 누런 때는 미지근한 물(40도) 에 산소계 표백제를 풀어 담근 후 세탁.
  • 땀 얼룩이 있는 옷은 찬물에 먼저 담가 단백질 성분이 응고되지 않도록 한 뒤 세탁.
  • 유성 오염(기름때)은 40~50도의 따뜻한 물에서 세탁하면 더욱 효과적이다.

3. 옷감별 세탁법: 잘못된 방법이 수명을 단축한다

3-1. 니트와 울은 왜 변형이 심할까?

니트와 울 소재는 섬유 구조가 느슨하고, 수분을 머금었을 때 쉽게 늘어나거나 줄어들 수 있다. 특히 뜨거운 물과 강한 회전력은 섬유 사이의 단백질 결합을 깨뜨려 원단을 망가뜨린다.

옳은 세탁법:

  • 손세탁이 가장 이상적: 미지근한 물에 울 전용 세제를 풀고, 부드럽게 눌러가며 세탁한다.
  • 세탁기 사용 시: 세탁망에 넣고 ‘울 코스’ 또는 ‘손세탁 코스’로 설정하여 약한 회전으로 세탁한다.
  • 건조 방법: 물기를 제거할 때 비틀어 짜지 말고, 수건으로 감싸 눌러 물을 흡수한 후 평평한 곳에서 건조.

3-2. 청바지는 왜 뒤집어 세탁해야 할까?

청바지는 염료가 쉽게 빠지는 원단이므로 마찰을 줄이고 색 바램을 최소화하기 위해 반드시 뒤집어서 세탁해야 한다.

청바지 세탁법:

  • 뒤집어서 찬물 세탁: 뜨거운 물은 염료가 더 쉽게 빠지므로 피해야 한다.
  • 섬유유연제 사용 금지: 섬유유연제는 청바지 원단의 탄성을 약하게 만들어 쉽게 늘어나게 한다.
  • 건조 시 자연건조: 기계 건조기를 사용하면 원단이 줄어들고 변형될 가능성이 높다.

4. 세탁할 때 피해야 할 것들

4-1. 섬유유연제의 함정

섬유유연제는 부드러운 촉감을 유지하는 데 도움을 주지만, 일부 원단에는 오히려 해가 될 수 있다. 특히 스포츠웨어(기능성 옷), 속옷, 타월에는 사용하지 않는 것이 좋다.

  • 스포츠웨어: 섬유유연제가 기능성 원단의 흡습·속건 기능을 저하시킴.
  • 타월: 흡수력을 떨어뜨려 물기 제거 기능이 약해짐.
  • 속옷: 신축성 있는 섬유(Lycra, Spandex 등)에 영향을 주어 형태 변형이 일어날 수 있음.

4-2. 과도한 탈수는 금물

탈수 시간을 길게 설정하면 세탁물이 강한 압력을 받아 옷감이 손상될 수 있다. 특히 셔츠, 니트 같은 옷은 짧은 시간(1분 이내) 탈수 후 바로 꺼내서 건조하는 것이 좋다.

결론: 작은 습관 하나가 옷의 수명을 결정한다

올바른 세탁법은 단순한 청결 유지가 아니라, 옷을 오래 입기 위한 기본적인 관리법이다. 섬유의 특성과 세탁 원리를 이해하고, 상황에 맞게 적용하면 불필요한 옷 손상을 줄이고 경제적으로도 큰 이점을 얻을 수 있다.

 

 

Python의 **제너레이터(generator)**는 대용량 데이터를 처리할 때 유용하지만,

대부분의 개발자들은 yield를 단순히 값을 반환하는 용도로만 사용한다.

 

하지만, yield에는 숨겨진 강력한 기능이 있다.

👉 send()를 사용하면 제너레이터에 데이터를 주고받을 수 있으며, 이를 활용하면 상태를 유지하는 코루틴을 만들 수 있다.

👉 이 기능을 제대로 활용하면, 일반적인 함수보다 훨씬 더 강력한 상태 관리가 가능하다.

 

이번 글에서는 제너레이터와 send()를 활용한 고급 프로그래밍 기법을 소개한다. 🚀

1. 대부분의 개발자가 아는 yield의 기본적인 사용법

 

일반적으로 yield는 값을 반환하고, 다음 호출에서 다시 실행할 수 있도록 하는 기능을 한다.

def simple_generator():
    yield 1
    yield 2
    yield 3

gen = simple_generator()
print(next(gen))  # 1
print(next(gen))  # 2
print(next(gen))  # 3

기본적인 yield의 동작: next()를 호출할 때마다 yield 다음의 값을 반환한다.

하지만 여기까지가 대부분의 개발자가 알고 있는 yield의 기능이다.

2. send()를 사용하여 제너레이터와 데이터를 주고받기

 

yield는 단순히 값을 반환하는 기능뿐만 아니라, 외부에서 데이터를 받아 제너레이터 내부에서 활용할 수도 있다.

이를 가능하게 하는 것이 바로 send() 함수다.

def echo():
    while True:
        received = yield  # yield가 값을 반환받는 역할도 수행
        print(f"Received: {received}")

gen = echo()
next(gen)  # 제너레이터 실행 준비 (첫 번째 `yield`까지 실행됨)

gen.send("Hello")  # Received: Hello
gen.send("Python")  # Received: Python

📌 send()가 하는 역할

1️⃣ yield는 값을 반환하는 것뿐만 아니라, 외부에서 값을 받을 수도 있다.

2️⃣ send(value)를 호출하면, yield가 해당 값을 받아서 처리할 수 있다.

3️⃣ 일반적인 next()와 달리, 제너레이터 내부의 변수 값을 동적으로 변경할 수 있다.

 

즉, yield를 사용하면 단순한 이터레이터(iterator) 역할뿐만 아니라, 상태를 유지하는 코루틴(coroutine) 역할도 가능하다!

3. send()를 활용한 제너레이터 기반의 상태 머신 구현

 

이제 send()를 사용하여 상태를 유지하는 제너레이터 기반의 상태 머신을 만들어 보자.

def state_machine():
    state = "INIT"
    while True:
        value = yield state  # 현재 상태를 반환하고, 새로운 값을 받음
        if value == "run":
            state = "RUNNING"
        elif value == "stop":
            state = "STOPPED"
        elif value == "reset":
            state = "INIT"

gen = state_machine()
print(next(gen))  # INIT (초기 상태)

print(gen.send("run"))   # RUNNING
print(gen.send("stop"))  # STOPPED
print(gen.send("reset")) # INIT

📌 설명

1️⃣ yield를 통해 현재 상태를 반환하고, send()를 통해 새로운 상태 값을 받을 수 있다.

2️⃣ send("run")을 호출하면 "RUNNING" 상태로 변경되고, send("stop")을 호출하면 "STOPPED" 상태로 변경된다.

3️⃣ 이처럼 yieldsend()를 조합하면 상태를 유지하는 프로그램을 쉽게 만들 수 있다!

 

일반적인 함수는 실행이 끝나면 상태를 유지할 수 없지만, 제너레이터는 상태를 유지하면서 실행을 계속할 수 있다.

4. send()와 yield를 활용한 데이터 스트리밍 시스템

 

제너레이터와 send()를 활용하면 실시간 데이터 스트리밍 시스템을 구현할 수도 있다.

예를 들어, 실시간으로 데이터를 분석하는 시스템을 만들어보자.

def data_stream_processor():
    total = 0
    count = 0
    avg = None

    while True:
        value = yield avg  # 현재 평균값 반환
        if value is not None:
            total += value
            count += 1
            avg = total / count  # 새로운 평균값 계산

gen = data_stream_processor()
print(next(gen))  # None (초기값)

print(gen.send(10))  # 10.0
print(gen.send(20))  # 15.0
print(gen.send(30))  # 20.0
print(gen.send(40))  # 25.0

📌 설명

1️⃣ yield는 현재 평균값을 반환하고, 새로운 값을 받아서 업데이트한다.

2️⃣ send(10), send(20)을 호출할 때마다 평균값이 업데이트된다.

3️⃣ 실시간으로 데이터를 처리하면서 평균을 구하는 시스템이 완성됨!

 

send()를 활용하면, 데이터를 지속적으로 받아서 처리하는 실시간 분석 시스템을 간단하게 만들 수 있다!

5. send()와 throw()를 활용한 예외 처리

 

send()를 활용하면 제너레이터의 흐름을 조작할 수 있는데,

이와 함께 throw()를 사용하면 제너레이터 내부에서 특정 예외를 발생시키는 기능도 추가할 수 있다.

def controlled_generator():
    try:
        while True:
            value = yield
            print(f"Processing: {value}")
    except Exception as e:
        print(f"Generator stopped due to error: {e}")

gen = controlled_generator()
next(gen)  # 제너레이터 실행 준비

gen.send(10)  # Processing: 10
gen.send(20)  # Processing: 20
gen.throw(ValueError, "Something went wrong!")  # Generator stopped due to error: Something went wrong!

📌 설명

1️⃣ throw(ExceptionType, message)를 호출하면 제너레이터 내부에서 예외가 발생한다.

2️⃣ 예외가 발생하면 except 블록이 실행되며, 이후 제너레이터가 종료된다.

3️⃣ 이 방법을 사용하면 제너레이터 내부에서 특정 조건이 발생했을 때, 예외를 던지고 종료할 수 있다.

 

throw()를 활용하면, 데이터 처리 중 특정 예외가 발생했을 때 빠르게 종료하는 로직을 만들 수 있다.

6. 정리: send()를 활용하면 얻을 수 있는 것

기능기존 방식send() 활용

단순한 값 반환 yield만 사용 send()로 동적 값 변경 가능
상태 유지 불가능 ✅ 제너레이터 내부에서 상태 유지 가능
실시간 데이터 처리 어려움 send()를 사용하면 가능
예외 처리 try-except 필요 throw()를 활용 가능

제너레이터와 send()를 활용하면 단순한 이터레이터를 넘어, 상태를 유지하는 강력한 코루틴을 만들 수 있다.

실시간 데이터 처리, 상태 머신, 이벤트 기반 시스템 등에 활용할 수 있다.

기존의 함수형 프로그래밍보다 훨씬 더 유연한 흐름 제어가 가능하다.

7. 결론: send()를 활용하면 제너레이터가 훨씬 강력해진다!

 

많은 개발자가 yield를 단순한 반환 용도로만 사용하지만,

👉 send()를 활용하면 제너레이터와 외부 데이터 간의 상호작용이 가능해진다.

👉 상태를 유지하는 코루틴을 만들 수 있으며, 실시간 데이터 처리에도 활용할 수 있다.

 

🔥 제너레이터를 단순히 next()로만 사용하지 말고, send()를 활용해 더욱 강력한 프로그램을 만들어보자! 🚀

 

 

SQLAlchemy는 강력한 ORM(Object Relational Mapper) 라이브러리이지만, 많은 개발자들이 기본적인 CRUD 기능만 사용하고,

SQL에서 흔히 사용되는 **서브쿼리(Subquery)**를 ORM에서 최적화하는 방법을 잘 모른다.

 

대부분의 SQLAlchemy 사용자는 서브쿼리를 직접 작성하거나, .join()을 사용해 복잡한 연산을 수행하지만,

👉 SQLAlchemy의 subquery() 기능을 활용하면 서브쿼리를 효율적으로 최적화할 수 있다.

 

이 글에서는 “Subquery ORM”을 사용하여 복잡한 SQL을 최적화하는 방법과, 성능을 높이는 실전 코드 예제를 살펴본다. 🚀

1. 서브쿼리가 필요한 이유

 

서브쿼리는 데이터베이스에서 다른 쿼리의 결과를 기반으로 추가 연산을 수행하는 SQL 기술이다.

예를 들어, “각 부서에서 가장 높은 급여를 받는 직원”을 찾는 문제를 생각해보자.

 

기본 SQL은 다음과 같이 작성할 수 있다.

SELECT name, department_id, salary
FROM employees
WHERE salary = (
    SELECT MAX(salary)
    FROM employees e2
    WHERE e2.department_id = employees.department_id
);

✅ 서브쿼리는 특정 그룹(부서) 내에서 최대 급여를 가진 직원을 찾기 위해 필요하다.

✅ 하지만 ORM에서는 이러한 쿼리를 어떻게 작성해야 할까? 🤔

2. SQLAlchemy에서 서브쿼리를 최적화하는 subquery() 사용법

 

SQLAlchemy에서 서브쿼리를 구현할 때 많은 개발자가 .filter(), .join()을 남용하는데,

👉 subquery()를 사용하면 SQL에서와 동일한 방식으로 서브쿼리를 만들 수 있다.

✅ 기본 모델 정의

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, func
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

Base = declarative_base()

class Employee(Base):
    __tablename__ = "employees"

    id = Column(Integer, primary_key=True)
    name = Column(String)
    department_id = Column(Integer, ForeignKey("departments.id"))
    salary = Column(Integer)

    department = relationship("Department", back_populates="employees")

class Department(Base):
    __tablename__ = "departments"

    id = Column(Integer, primary_key=True)
    name = Column(String)
    employees = relationship("Employee", back_populates="department")

# 데이터베이스 연결
engine = create_engine("sqlite:///company.db")
Session = sessionmaker(bind=engine)
session = Session()

Employee 테이블에는 직원 정보가 있고, Department 테이블과 관계가 설정되어 있음.

✅ subquery()를 사용한 서브쿼리 ORM

from sqlalchemy.orm import aliased
from sqlalchemy import select

# 서브쿼리: 각 부서에서 가장 높은 급여를 찾기
subq = (
    session.query(
        Employee.department_id,
        func.max(Employee.salary).label("max_salary")
    )
    .group_by(Employee.department_id)
    .subquery()
)

# 메인 쿼리: 최고 급여를 받는 직원 찾기
max_salary_employees = (
    session.query(Employee)
    .join(subq, (Employee.department_id == subq.c.department_id) & (Employee.salary == subq.c.max_salary))
    .all()
)

# 결과 출력
for emp in max_salary_employees:
    print(f"부서 {emp.department_id}: {emp.name} (급여: {emp.salary})")

각 부서에서 최고 급여를 받는 직원을 찾는 서브쿼리를 ORM으로 최적화!

3. subquery() 없이 ORM을 사용하면? (비효율적인 방식)

 

보통 서브쿼리를 모르고 ORM을 사용할 경우, 다음과 같이 비효율적인 방식으로 데이터를 가져온다.

# 비효율적인 방식: 모든 직원 데이터를 가져와서 파이썬에서 처리
departments = session.query(Department).all()
result = []

for dept in departments:
    max_salary = max(emp.salary for emp in dept.employees)
    top_employee = [emp for emp in dept.employees if emp.salary == max_salary]
    result.extend(top_employee)

# 결과 출력
for emp in result:
    print(f"부서 {emp.department_id}: {emp.name} (급여: {emp.salary})")

문제점:

1️⃣ 모든 데이터를 메모리로 로드한 후, Python에서 직접 연산을 수행 → 비효율적

2️⃣ 대량 데이터에서 메모리 사용량이 급증

3️⃣ SQL의 최적화 기능을 전혀 활용하지 못함

4. subquery() vs filter() 성능 비교

 

테스트 데이터셋(10만 개 행)에서 subquery()를 적용한 경우와, 기존 .filter()를 적용한 경우의 성능을 비교해보자.

 

✅ .filter() 방식 (비효율적)

import time

start = time.time()

max_salary_employees = []
for dept_id in session.query(Employee.department_id).distinct():
    max_salary = session.query(func.max(Employee.salary)).filter(Employee.department_id == dept_id).scalar()
    employee = session.query(Employee).filter(Employee.department_id == dept_id, Employee.salary == max_salary).all()
    max_salary_employees.extend(employee)

end = time.time()
print(f"`.filter()` 방식 실행 시간: {end - start:.4f}초")

✅ subquery() 방식 (최적화)

start = time.time()

max_salary_employees = (
    session.query(Employee)
    .join(subq, (Employee.department_id == subq.c.department_id) & (Employee.salary == subq.c.max_salary))
    .all()
)

end = time.time()
print(f"`subquery()` 방식 실행 시간: {end - start:.4f}초")

📌 결과 예시

`.filter()` 방식 실행 시간: 5.8421초  
`subquery()` 방식 실행 시간: 0.8523초  # 🚀 약 7배 빠름!

subquery()를 활용하면 데이터베이스에서 직접 연산을 수행하므로 속도가 훨씬 빠름!

filter() 방식은 개별적으로 max() 쿼리를 수행하므로 성능이 급격히 저하됨.

5. subquery()를 사용하면 얻을 수 있는 것

 

SQL처럼 서브쿼리를 ORM에서 그대로 사용할 수 있음

Python에서 불필요한 연산을 하지 않고, 데이터베이스에서 최적화된 연산 수행

쿼리 성능이 5~10배 이상 향상

대용량 데이터에서 메모리 사용량 감소

6. 정리: SQLAlchemy에서 서브쿼리를 최적화하는 방법

방법성능특징추천 상황

.filter() + .max() ❌ 느림 개별적으로 max() 실행, 비효율적 작은 데이터셋
subquery() 🚀 빠름 SQL 서브쿼리와 동일한 방식 대량 데이터 처리

ORM에서도 SQL처럼 서브쿼리를 작성할 수 있으며, subquery()를 사용하면 성능을 극대화할 수 있다.

대용량 데이터를 처리하는 ORM 애플리케이션이라면 반드시 활용해야 하는 핵심 기법!

7. 결론: ORM에서도 SQL 성능 최적화를 하자!

 

많은 개발자가 ORM을 사용하면서 SQL의 최적화 기법을 놓치는 실수를 한다.

하지만 subquery()를 활용하면, SQL의 강력한 서브쿼리 기능을 ORM에서도 동일하게 사용할 수 있다.

 

👉 ORM에서 서브쿼리를 최적화하고 싶다면, 반드시 subquery()를 사용하자! 🚀

+ Recent posts