시리즈: Codex SDK 한계와 app-server 모드

이 시리즈는 OpenAI Codex TS SDK로 LLM 응답 streaming을 구현하려다 구조적 한계에 봉착한 과정을 분석하고, 동일 바이너리의 app-server 모드를 대안으로 채택하여 실제 구현까지 도달하는 과정이다.

내용핵심
1편 (본문)TS SDK 한계 분석exec 모드는 delta를 emit하지 않는다 — app-server가 대안
2편app-server 구현 설계JSON-RPC 프로토콜 직접 구현으로 전체 기능 확보

  • OpenAI Codex TS SDK는 Rust codex 바이너리를 exec 모드로 감싼 thin wrapper
  • exec 모드에서 agent_message에 대한 item.updated 이벤트가 emit되지 않아 delta streaming 불가
  • 동일 바이너리의 app-server 모드(JSON-RPC)를 직접 사용하면 delta streaming 포함 전체 기능 사용 가능

해당 개념이 필요한 이유

  • Codex TS SDK를 사용해 LLM 응답을 streaming으로 받으려 했으나, 텍스트가 한 번에 도착하는 현상 발생
  • 공식 문서와 SDK 소스 코드를 분석한 결과, SDK의 구조적 한계임을 확인
  • 실제 프로덕션에서 delta streaming을 구현한 오픈소스(vibe-kanban)는 SDK를 사용하지 않고 app-server를 직접 사용

AS-IS: TS SDK (exec 모드) — delta 없음

sequenceDiagram
    autonumber
    participant App as Application
    participant SDK as @openai/codex-sdk
    participant Bin as codex binary<br/>(exec --experimental-json)
    participant API as OpenAI API

    App->>SDK: thread.runStreamed(message)
    SDK->>Bin: spawn + stdin (prompt)
    Bin->>API: Responses API (streaming)
    API-->>Bin: token stream
    Note over Bin: 내부에서 전체 텍스트 축적
    Bin-->>SDK: JSONL: item.completed (전체 텍스트)
    Note over Bin: item.updated 미발생
    SDK-->>App: yield ThreadEvent {item.completed}
    Note over App: 전체 텍스트가 한 번에 도착

TO-BE: app-server 모드 — delta streaming

sequenceDiagram
    autonumber
    participant App as Application
    participant Srv as codex binary<br/>(app-server)
    participant API as OpenAI API

    App->>Srv: JSON-RPC: turn/start
    Srv->>API: Responses API (streaming)
    API-->>Srv: token stream
    Srv-->>App: notify: item/started
    Srv-->>App: notify: item/agentMessage/delta ("Hello")
    Srv-->>App: notify: item/agentMessage/delta (", I'll")
    Srv-->>App: notify: item/agentMessage/delta (" help you")
    Srv-->>App: notify: item/completed
    Note over App: 토큰 단위로 실시간 수신

@openai/codex 패키지 구조

@openai/codex npm 패키지 안에 Rust로 빌드된 codex 바이너리가 포함되어 있다. 이 바이너리는 두 가지 모드로 실행할 수 있다.

graph TD
    PKG["@openai/codex (npm)<br/>Rust codex 바이너리 포함"]

    PKG --> EXEC["exec 모드<br/>codex exec --experimental-json"]
    PKG --> APPSRV["app-server 모드<br/>codex app-server"]

    EXEC --> TSSDK["@openai/codex-sdk<br/>(별도 npm 패키지)<br/>exec 모드를 감싼 TS wrapper"]
    APPSRV --> DIRECT["직접 spawn + JSON-RPC 구현<br/>(vibe-kanban 방식)"]

    TSSDK --> LIMIT["단방향 JSONL<br/>item.completed만<br/>delta ❌"]
    DIRECT --> FULL["양방향 JSON-RPC<br/>item/agentMessage/delta<br/>delta ✅"]

    style LIMIT fill:#fee,stroke:#c33
    style FULL fill:#efe,stroke:#3c3
항목exec 모드app-server 모드
프로토콜단방향 JSONL (stdout)양방향 JSON-RPC (stdio)
실행 명령codex exec --experimental-jsoncodex app-server
TS SDK 지원SDK가 이 모드 사용SDK 미지원, 직접 구현
통신 방향App → Binary (stdin), Binary → App (stdout)양방향 (stdin ↔ stdout)

TS SDK 내부 동작 분석

SDK 소스 코드 (v0.105.0)

TS SDK의 runStreamed()는 Rust 바이너리의 stdout JSONL을 그대로 yield하는 thin wrapper이다.

sdk/typescript/src/thread.ts L66-L111

// thread.ts L66-L68
async runStreamed(input: Input, turnOptions: TurnOptions = {}): Promise<StreamedTurn> {
    return { events: this.runStreamedInternal(input, turnOptions) };
}
 
// thread.ts L70-L111
private async *runStreamedInternal(input: Input, turnOptions: TurnOptions = {}) {
    // ...
    const generator = this._exec.run({ ... }); // L77: Rust 바이너리 spawn
    try {
        for await (const item of generator) {   // L97: stdout JSONL 라인 순회
            let parsed: ThreadEvent;
            parsed = JSON.parse(item);          // L99: 파싱
            if (parsed.type === "thread.started") {
                this._id = parsed.thread_id;
            }
            yield parsed;                       // L106: 그대로 yield — 변환/버퍼링 없음
        }
    } finally {
        await cleanup();
    }
}

ThreadEvent 타입 정의

sdk/typescript/src/events.ts L43-L80

// events.ts L43-L46
export type ItemStartedEvent = {
    type: "item.started";
    item: ThreadItem;
};
 
// events.ts L49-L52 — 타입은 존재하지만...
export type ItemUpdatedEvent = {
    type: "item.updated";
    item: ThreadItem;
};
 
// events.ts L55-L58
export type ItemCompletedEvent = {
    type: "item.completed";
    item: ThreadItem;
};
 
// events.ts L72-L80 — union에 ItemUpdatedEvent 포함
export type ThreadEvent =
  | ThreadStartedEvent | TurnStartedEvent | TurnCompletedEvent
  | TurnFailedEvent | ItemStartedEvent | ItemUpdatedEvent
  | ItemCompletedEvent | ThreadErrorEvent;

AgentMessageItem — 항상 전체 누적 텍스트

sdk/typescript/src/items.ts L74-L79

// items.ts L74-L79
export type AgentMessageItem = {
    id: string;
    type: "agent_message";
    text: string;   // 항상 전체 누적 텍스트 (delta 필드 없음)
};

item.updated 타입이 정의되어 있지만, exec 모드에서 Rust 바이너리가 agent_message에 대해 이 이벤트를 emit하지 않는다.

실제 로그 증거

event: item.started   → item_2 (mcp_tool_call)     ✅ 발생
event: item.completed → item_2 (mcp_tool_call)     ✅ 발생
event: item.completed → item_3 (reasoning)          ← item.updated 없이 바로 completed
event: item.completed → item_4 (agent_message)      ← item.updated 없이 바로 completed
event: item.completed → item_7 (agent_message)      ← item.updated 없이 바로 completed

agent_messagereasoning에 대해 item.updated단 한 번도 발생하지 않았다. SDK README 공식 예제도 item.completed만 사용한다:

sdk/typescript/README.md L37-L49

// README.md L37-L49 — 공식 예제, item.updated 사용하지 않음
const { events } = await thread.runStreamed(
    "Diagnose the test failure and propose a fix"
);
 
for await (const event of events) {
  switch (event.type) {
    case "item.completed":       // L41: completed만
      console.log("item", event.item);
      break;
    case "turn.completed":
      console.log("usage", event.usage);
      break;
  }
}

Delta 변환 로직이 무의미해지는 구조

exec 모드에서 delta를 추출하려면 item.updated가 여러 번 발생해야 한다:

// delta 추출 로직 (정상 동작 시나리오)
function emitDelta(itemId, fullText, prevTextMap) {
    const prev = prevTextMap.get(itemId) ?? "";
    const delta = fullText.slice(prev.length);   // 이전 대비 증분
    prevTextMap.set(itemId, fullText);
    return delta;
}
 
// 기대: item.updated가 여러 번 와야 delta가 의미있음
// item.updated → text: "Hello"         → delta: "Hello"
// item.updated → text: "Hello, I'll"   → delta: ", I'll"
// item.updated → text: "Hello, I'll h" → delta: " h"
 
// 현실: item.completed만 한 번 옴
// item.completed → text: "Hello, I'll help you with that."
//                → delta: "Hello, I'll help you with that." (전체 텍스트 = delta)

app-server JSON-RPC 프로토콜

app-server 모드에서는 item lifecycle이 다른 프로토콜을 따른다:

item/started → item/agentMessage/delta (N회) → item/completed

codex-rs/app-server/README.md — NOTIFY item/lifecycle

// item/agentMessage/delta notification 예시
{
  "jsonrpc": "2.0",
  "method": "item/agentMessage/delta",
  "params": {
    "itemId": "item_99X",
    "delta": "The requested changes have been applied."
  }
}

Q) “exec 모드에서도 OpenAI API는 token streaming을 하는데, 왜 바이너리가 item.updated를 안 보내는 건가?”

  • Rust codex 바이너리의 exec 모드는 OpenAI API의 streaming 응답을 내부에서 축적한 뒤 item.completed로 한 번에 출력
  • 이는 exec 모드가 “단일 실행 → 결과 반환” 패턴으로 설계되었기 때문
  • app-server 모드는 “장기 세션 → 실시간 피드백” 패턴으로 설계되어 delta를 별도 notification으로 emit

TS SDK vs app-server 기능 비교

기능TS SDK (exec)app-server
Delta streaming (agent_message)
Delta streaming (reasoning)
System prompt (base_instructions)
Developer instructions
MCP server 설정
Model 선택
Reasoning effort
Approval policy
Sandbox mode제한적
Thread fork (세션 이어가기)
Context 압축 (thread/compact)
Config 런타임 변경
Review 기능
Account/rate limit 조회
구현 난이도낮음 (SDK 제공)높음 (JSON-RPC 직접 구현)

vibe-kanban 사례 분석

vibe-kanban은 10+ coding agent를 지원하는 칸반 보드 도구로, Codex 연동 시 TS SDK를 사용하지 않고 app-server를 직접 사용한다.

실행 방식

crates/executors/src/executors/codex.rsbase_command(), build_command_builder()

fn base_command() -> &'static str {
    "npx -y @openai/codex@0.114.0"     // 같은 npm 패키지
}
 
fn build_command_builder(&self) -> Result<CommandBuilder, CommandBuildError> {
    let mut builder = CommandBuilder::new(Self::base_command());
    builder = builder.extend_params(["app-server"]);  // ← exec이 아닌 app-server
    // ...
}

JSON-RPC 양방향 통신

crates/executors/src/executors/codex/jsonrpc.rsJsonRpcPeer::spawn()

pub struct JsonRpcPeer {
    stdin: Arc<Mutex<ChildStdin>>,      // App → codex (요청)
    pending: Arc<Mutex<HashMap<...>>>,  // 응답 대기열
    // ...
}
 
// stdout을 라인 단위로 읽어 JSON-RPC 메시지 타입별 분기
match serde_json::from_str::<JSONRPCMessage>(line) {
    Ok(JSONRPCMessage::Response(response))     => callbacks.on_response(...),
    Ok(JSONRPCMessage::Request(request))       => callbacks.on_request(...),
    Ok(JSONRPCMessage::Notification(notif))    => callbacks.on_notification(...),
    Ok(JSONRPCMessage::Error(error))           => callbacks.on_error(...),
}

System prompt 설정

crates/executors/src/executors/codex.rsbuild_thread_start_params()

fn build_thread_start_params(&self, cwd: &Path) -> ThreadStartParams {
    ThreadStartParams {
        model: ...,
        base_instructions: self.base_instructions.clone(),       // system prompt
        developer_instructions: self.developer_instructions.clone(),
        config: self.build_config_overrides(),  // reasoning effort 등
        // ...
    }
}

Notification 기반 이벤트 처리

crates/executors/src/executors/codex/client.rson_notification()

async fn on_notification(&self, _peer: &JsonRpcPeer, raw: &str,
    notification: JSONRPCNotification) -> Result<bool, ExecutorError>
{
    // 모든 notification을 raw JSONL로 log_writer에 기록
    // → 프론트엔드가 이 스트림을 읽어 실시간 UI 업데이트
    self.log_writer.log_raw(raw).await?;
 
    let method = notification.method.as_str();
 
    // turn 완료 감지
    if method == "turn/completed" { ... }
 
    // item/agentMessage/delta, item/started, item/completed 등
    // 모두 log_writer를 통해 실시간 전달
    Ok(false)
}

vibe-kanban은 on_notification에서 받은 모든 JSON-RPC notification(item/agentMessage/delta 포함)을 log_writer로 실시간 전달하여 프론트엔드에서 delta streaming UI를 구현한다.

참고 문서