시리즈: Codex SDK 한계와 app-server 모드
이 시리즈는 OpenAI Codex TS SDK로 LLM 응답 streaming을 구현하려다 구조적 한계에 봉착한 과정을 분석하고, 동일 바이너리의 app-server 모드를 대안으로 채택하여 실제 구현까지 도달하는 과정이다.
| 편 | 내용 | 핵심 |
|---|---|---|
| 1편 (본문) | TS SDK 한계 분석 | exec 모드는 delta를 emit하지 않는다 — app-server가 대안 |
| 2편 | app-server 구현 설계 | JSON-RPC 프로토콜 직접 구현으로 전체 기능 확보 |
- OpenAI Codex TS SDK는 Rust
codex바이너리를exec모드로 감싼 thin wrapper exec모드에서agent_message에 대한item.updated이벤트가 emit되지 않아 delta streaming 불가- 동일 바이너리의
app-server모드(JSON-RPC)를 직접 사용하면 delta streaming 포함 전체 기능 사용 가능
해당 개념이 필요한 이유
- Codex TS SDK를 사용해 LLM 응답을 streaming으로 받으려 했으나, 텍스트가 한 번에 도착하는 현상 발생
- 공식 문서와 SDK 소스 코드를 분석한 결과, SDK의 구조적 한계임을 확인
- 실제 프로덕션에서 delta streaming을 구현한 오픈소스(vibe-kanban)는 SDK를 사용하지 않고 app-server를 직접 사용
AS-IS: TS SDK (exec 모드) — delta 없음
sequenceDiagram autonumber participant App as Application participant SDK as @openai/codex-sdk participant Bin as codex binary<br/>(exec --experimental-json) participant API as OpenAI API App->>SDK: thread.runStreamed(message) SDK->>Bin: spawn + stdin (prompt) Bin->>API: Responses API (streaming) API-->>Bin: token stream Note over Bin: 내부에서 전체 텍스트 축적 Bin-->>SDK: JSONL: item.completed (전체 텍스트) Note over Bin: item.updated 미발생 SDK-->>App: yield ThreadEvent {item.completed} Note over App: 전체 텍스트가 한 번에 도착
TO-BE: app-server 모드 — delta streaming
sequenceDiagram autonumber participant App as Application participant Srv as codex binary<br/>(app-server) participant API as OpenAI API App->>Srv: JSON-RPC: turn/start Srv->>API: Responses API (streaming) API-->>Srv: token stream Srv-->>App: notify: item/started Srv-->>App: notify: item/agentMessage/delta ("Hello") Srv-->>App: notify: item/agentMessage/delta (", I'll") Srv-->>App: notify: item/agentMessage/delta (" help you") Srv-->>App: notify: item/completed Note over App: 토큰 단위로 실시간 수신
@openai/codex 패키지 구조
@openai/codex npm 패키지 안에 Rust로 빌드된 codex 바이너리가 포함되어 있다. 이 바이너리는 두 가지 모드로 실행할 수 있다.
graph TD PKG["@openai/codex (npm)<br/>Rust codex 바이너리 포함"] PKG --> EXEC["exec 모드<br/>codex exec --experimental-json"] PKG --> APPSRV["app-server 모드<br/>codex app-server"] EXEC --> TSSDK["@openai/codex-sdk<br/>(별도 npm 패키지)<br/>exec 모드를 감싼 TS wrapper"] APPSRV --> DIRECT["직접 spawn + JSON-RPC 구현<br/>(vibe-kanban 방식)"] TSSDK --> LIMIT["단방향 JSONL<br/>item.completed만<br/>delta ❌"] DIRECT --> FULL["양방향 JSON-RPC<br/>item/agentMessage/delta<br/>delta ✅"] style LIMIT fill:#fee,stroke:#c33 style FULL fill:#efe,stroke:#3c3
| 항목 | exec 모드 | app-server 모드 |
|---|---|---|
| 프로토콜 | 단방향 JSONL (stdout) | 양방향 JSON-RPC (stdio) |
| 실행 명령 | codex exec --experimental-json | codex app-server |
| TS SDK 지원 | SDK가 이 모드 사용 | SDK 미지원, 직접 구현 |
| 통신 방향 | App → Binary (stdin), Binary → App (stdout) | 양방향 (stdin ↔ stdout) |
TS SDK 내부 동작 분석
SDK 소스 코드 (v0.105.0)
TS SDK의 runStreamed()는 Rust 바이너리의 stdout JSONL을 그대로 yield하는 thin wrapper이다.
// thread.ts L66-L68
async runStreamed(input: Input, turnOptions: TurnOptions = {}): Promise<StreamedTurn> {
return { events: this.runStreamedInternal(input, turnOptions) };
}
// thread.ts L70-L111
private async *runStreamedInternal(input: Input, turnOptions: TurnOptions = {}) {
// ...
const generator = this._exec.run({ ... }); // L77: Rust 바이너리 spawn
try {
for await (const item of generator) { // L97: stdout JSONL 라인 순회
let parsed: ThreadEvent;
parsed = JSON.parse(item); // L99: 파싱
if (parsed.type === "thread.started") {
this._id = parsed.thread_id;
}
yield parsed; // L106: 그대로 yield — 변환/버퍼링 없음
}
} finally {
await cleanup();
}
}ThreadEvent 타입 정의
// events.ts L43-L46
export type ItemStartedEvent = {
type: "item.started";
item: ThreadItem;
};
// events.ts L49-L52 — 타입은 존재하지만...
export type ItemUpdatedEvent = {
type: "item.updated";
item: ThreadItem;
};
// events.ts L55-L58
export type ItemCompletedEvent = {
type: "item.completed";
item: ThreadItem;
};
// events.ts L72-L80 — union에 ItemUpdatedEvent 포함
export type ThreadEvent =
| ThreadStartedEvent | TurnStartedEvent | TurnCompletedEvent
| TurnFailedEvent | ItemStartedEvent | ItemUpdatedEvent
| ItemCompletedEvent | ThreadErrorEvent;AgentMessageItem — 항상 전체 누적 텍스트
// items.ts L74-L79
export type AgentMessageItem = {
id: string;
type: "agent_message";
text: string; // 항상 전체 누적 텍스트 (delta 필드 없음)
};item.updated 타입이 정의되어 있지만, exec 모드에서 Rust 바이너리가 agent_message에 대해 이 이벤트를 emit하지 않는다.
실제 로그 증거
event: item.started → item_2 (mcp_tool_call) ✅ 발생
event: item.completed → item_2 (mcp_tool_call) ✅ 발생
event: item.completed → item_3 (reasoning) ← item.updated 없이 바로 completed
event: item.completed → item_4 (agent_message) ← item.updated 없이 바로 completed
event: item.completed → item_7 (agent_message) ← item.updated 없이 바로 completed
agent_message와 reasoning에 대해 item.updated가 단 한 번도 발생하지 않았다. SDK README 공식 예제도 item.completed만 사용한다:
// README.md L37-L49 — 공식 예제, item.updated 사용하지 않음
const { events } = await thread.runStreamed(
"Diagnose the test failure and propose a fix"
);
for await (const event of events) {
switch (event.type) {
case "item.completed": // L41: completed만
console.log("item", event.item);
break;
case "turn.completed":
console.log("usage", event.usage);
break;
}
}Delta 변환 로직이 무의미해지는 구조
exec 모드에서 delta를 추출하려면 item.updated가 여러 번 발생해야 한다:
// delta 추출 로직 (정상 동작 시나리오)
function emitDelta(itemId, fullText, prevTextMap) {
const prev = prevTextMap.get(itemId) ?? "";
const delta = fullText.slice(prev.length); // 이전 대비 증분
prevTextMap.set(itemId, fullText);
return delta;
}
// 기대: item.updated가 여러 번 와야 delta가 의미있음
// item.updated → text: "Hello" → delta: "Hello"
// item.updated → text: "Hello, I'll" → delta: ", I'll"
// item.updated → text: "Hello, I'll h" → delta: " h"
// 현실: item.completed만 한 번 옴
// item.completed → text: "Hello, I'll help you with that."
// → delta: "Hello, I'll help you with that." (전체 텍스트 = delta)app-server JSON-RPC 프로토콜
app-server 모드에서는 item lifecycle이 다른 프로토콜을 따른다:
item/started → item/agentMessage/delta (N회) → item/completed
// item/agentMessage/delta notification 예시
{
"jsonrpc": "2.0",
"method": "item/agentMessage/delta",
"params": {
"itemId": "item_99X",
"delta": "The requested changes have been applied."
}
}Q) “exec 모드에서도 OpenAI API는 token streaming을 하는데, 왜 바이너리가 item.updated를 안 보내는 건가?”
- Rust
codex바이너리의exec모드는 OpenAI API의 streaming 응답을 내부에서 축적한 뒤item.completed로 한 번에 출력 - 이는 exec 모드가 “단일 실행 → 결과 반환” 패턴으로 설계되었기 때문
- app-server 모드는 “장기 세션 → 실시간 피드백” 패턴으로 설계되어 delta를 별도 notification으로 emit
TS SDK vs app-server 기능 비교
| 기능 | TS SDK (exec) | app-server |
|---|---|---|
| Delta streaming (agent_message) | ❌ | ✅ |
| Delta streaming (reasoning) | ❌ | ✅ |
System prompt (base_instructions) | ❌ | ✅ |
| Developer instructions | ❌ | ✅ |
| MCP server 설정 | ✅ | ✅ |
| Model 선택 | ✅ | ✅ |
| Reasoning effort | ✅ | ✅ |
| Approval policy | ✅ | ✅ |
| Sandbox mode | 제한적 | ✅ |
| Thread fork (세션 이어가기) | ❌ | ✅ |
Context 압축 (thread/compact) | ❌ | ✅ |
| Config 런타임 변경 | ❌ | ✅ |
| Review 기능 | ❌ | ✅ |
| Account/rate limit 조회 | ❌ | ✅ |
| 구현 난이도 | 낮음 (SDK 제공) | 높음 (JSON-RPC 직접 구현) |
vibe-kanban 사례 분석
vibe-kanban은 10+ coding agent를 지원하는 칸반 보드 도구로, Codex 연동 시 TS SDK를 사용하지 않고 app-server를 직접 사용한다.
실행 방식
crates/executors/src/executors/codex.rs—base_command(),build_command_builder()
fn base_command() -> &'static str {
"npx -y @openai/codex@0.114.0" // 같은 npm 패키지
}
fn build_command_builder(&self) -> Result<CommandBuilder, CommandBuildError> {
let mut builder = CommandBuilder::new(Self::base_command());
builder = builder.extend_params(["app-server"]); // ← exec이 아닌 app-server
// ...
}JSON-RPC 양방향 통신
crates/executors/src/executors/codex/jsonrpc.rs—JsonRpcPeer::spawn()
pub struct JsonRpcPeer {
stdin: Arc<Mutex<ChildStdin>>, // App → codex (요청)
pending: Arc<Mutex<HashMap<...>>>, // 응답 대기열
// ...
}
// stdout을 라인 단위로 읽어 JSON-RPC 메시지 타입별 분기
match serde_json::from_str::<JSONRPCMessage>(line) {
Ok(JSONRPCMessage::Response(response)) => callbacks.on_response(...),
Ok(JSONRPCMessage::Request(request)) => callbacks.on_request(...),
Ok(JSONRPCMessage::Notification(notif)) => callbacks.on_notification(...),
Ok(JSONRPCMessage::Error(error)) => callbacks.on_error(...),
}System prompt 설정
crates/executors/src/executors/codex.rs—build_thread_start_params()
fn build_thread_start_params(&self, cwd: &Path) -> ThreadStartParams {
ThreadStartParams {
model: ...,
base_instructions: self.base_instructions.clone(), // system prompt
developer_instructions: self.developer_instructions.clone(),
config: self.build_config_overrides(), // reasoning effort 등
// ...
}
}Notification 기반 이벤트 처리
crates/executors/src/executors/codex/client.rs—on_notification()
async fn on_notification(&self, _peer: &JsonRpcPeer, raw: &str,
notification: JSONRPCNotification) -> Result<bool, ExecutorError>
{
// 모든 notification을 raw JSONL로 log_writer에 기록
// → 프론트엔드가 이 스트림을 읽어 실시간 UI 업데이트
self.log_writer.log_raw(raw).await?;
let method = notification.method.as_str();
// turn 완료 감지
if method == "turn/completed" { ... }
// item/agentMessage/delta, item/started, item/completed 등
// 모두 log_writer를 통해 실시간 전달
Ok(false)
}vibe-kanban은 on_notification에서 받은 모든 JSON-RPC notification(item/agentMessage/delta 포함)을 log_writer로 실시간 전달하여 프론트엔드에서 delta streaming UI를 구현한다.
참고 문서
sdk/typescript/src/thread.ts— runStreamed, runStreamedInternalsdk/typescript/src/events.ts— ThreadEvent, ItemUpdatedEventsdk/typescript/src/items.ts— AgentMessageItem, ThreadItemsdk/typescript/README.md— 공식 streaming 예제codex-rs/app-server/README.md— app-server 프로토콜- vibe-kanban Codex Executor — app-server 구현 사례