A2A 서버

·3분 읽기

원문: Koog Documentation — a2a-server 이 글은 Koog 공식 문서의 a2a-server 페이지를 한국어로 옮긴 번역본입니다. 문서 구조와 링크 의미를 유지하되, MkDocs 전용 UI 문법은 블로그에서 읽기 좋도록 정리했습니다.

A2A 서버

A2A 서버를 사용하면 표준화된 A2A(에이전트 대 에이전트) 프로토콜을 통해 AI 에이전트를 노출할 수 있습니다. 이는 A2A protocol specification의 완전한 구현을 제공하여 클라이언트 요청 처리, 에이전트 논리 실행, 복잡한 작업 수명주기 관리 및 실시간 스트리밍 응답 지원을 제공합니다.

종속성

프로젝트에서 A2A 서버를 사용하려면 build.gradle.kts에 다음 종속성을 추가하세요.

1dependencies {2    // Core A2A server library3    implementation("ai.koog:a2a-server:$koogVersion")45    // HTTP JSON-RPC transport (most common)6    implementation("ai.koog:a2a-transport-server-jsonrpc-http:$koogVersion")78    // Ktor server engine (choose one that fits your needs)9    implementation("io.ktor:ktor-server-netty:$ktorVersion")10}

개요

A2A 서버는 A2A 프로토콜 전송 계층과 사용자 지정 에이전트 논리 사이의 브리지 역할을 합니다. 프로토콜 준수를 유지하고 강력한 세션 관리를 제공하는 동시에 전체 요청 수명주기를 조정합니다.

핵심 구성요소

A2A서버

완전한 A2A 프로토콜을 구현하는 메인 서버 클래스입니다. 이는 다음과 같은 중앙 조정자 역할을 합니다.

  • 프로토콜 사양에 따라 수신 요청 검증
  • 관리 동시 세션 및 작업 수명주기
  • 전송, 저장소, 비즈니스 로직 계층 간의 통신을 조정합니다.
  • 처리 모든 프로토콜 작업: 메시지 전송, 작업 쿼리, 취소, 푸시 알림

A2AServer은 두 가지 필수 매개변수를 허용합니다.

  • AgentExecutor 에이전트의 비즈니스 로직 구현을 정의합니다.
  • AgentCard 에이전트 기능 및 메타데이터를 정의합니다.

그리고 저장 및 전송 동작을 사용자 정의하는 데 사용할 수 있는 다양한 선택적 매개변수도 있습니다.

AgentExecutor

AgentExecutor 인터페이스는 에이전트의 핵심 비즈니스 로직을 구현하는 곳입니다. 이는 A2A 프로토콜과 특정 AI 에이전트 기능 간의 브리지 역할을 합니다. 에이전트 실행을 시작하려면 에이전트 논리를 정의하는 execute 메서드를 구현해야 합니다. 에이전트를 취소하려면 cancel 메서드를 구현해야 합니다.

1class MyAgentExecutor : AgentExecutor {2    override suspend fun execute(3        context: RequestContext<MessageSendParams>,4        eventProcessor: SessionEventProcessor5    ) {6        // Agent logic here7    }89    override suspend fun cancel(10        context: RequestContext<TaskIdParams>,11        eventProcessor: SessionEventProcessor,12        agentJob: Deferred<Unit>?13    ) {14        // Cancel agent here, optional15    }16}

RequestContext은 현재 요청에 대한 풍부한 정보를 제공합니다. 현재 세션의 contextIdtaskId, 전송된 message 및 요청의 params를 포함합니다.

SessionEventProcessor은 클라이언트와 통신합니다.

  • sendMessage(message): 즉시 응답 보내기(채팅 스타일 상호작용)
  • sendTaskEvent(event): 작업 관련 업데이트 보내기(장기 실행 작업)
1// For immediate responses (like chatbots)2eventProcessor.sendMessage(3    Message(4        messageId = generateId(),5        role = Role.Agent,6        parts = listOf(TextPart("Here's your answer!")),7        contextId = context.contextId8    )9)1011// For task-based operations12eventProcessor.sendTaskEvent(13    TaskStatusUpdateEvent(14        contextId = context.contextId,15        taskId = context.taskId,16        status = TaskStatus(17            state = TaskState.Working,18            message = Message(/* progress update */),19            timestamp = Clock.System.now()20        ),21        final = false  // More updates to come22    )23)

AgentCard

AgentCard은 에이전트의 자체 설명 매니페스트 역할을 합니다. 에이전트가 수행할 수 있는 작업, 에이전트와 통신하는 방법, 에이전트의 보안 요구 사항을 클라이언트에게 알려줍니다.

1val agentCard = AgentCard(2    // Basic Identity3    name = "Advanced Recipe Assistant",4    description = "AI agent specialized in cooking advice, recipe generation, and meal planning",5    version = "2.1.0",6    protocolVersion = "0.3.0",78    // Communication Settings9    url = "https://api.example.com/a2a",10    preferredTransport = TransportProtocol.JSONRPC,1112    // Optional: Multiple transport support13    additionalInterfaces = listOf(14        AgentInterface("https://api.example.com/a2a", TransportProtocol.JSONRPC),15    ),1617    // Capabilities Declaration18    capabilities = AgentCapabilities(19        streaming = true,              // Support real-time responses20        pushNotifications = true,      // Send async notifications21        stateTransitionHistory = true  // Maintain task history22    ),2324    // Content Type Support25    defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),26    defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),2728    // Define available security schemes29    securitySchemes = mapOf(30        "bearer" to HTTPAuthSecurityScheme(31            scheme = "Bearer",32            bearerFormat = "JWT",33            description = "JWT token authentication"34        ),35        "api-key" to APIKeySecurityScheme(36            `in` = In.Header,37            name = "X-API-Key",38            description = "API key for service authentication"39        )40    ),4142    // Specify security requirements (logical OR of requirements)43    security = listOf(44        mapOf("bearer" to listOf("read", "write")),  // Option 1: JWT with read/write scopes45        mapOf("api-key" to emptyList())              // Option 2: API key46    ),4748    // Enable extended card for authenticated users49    supportsAuthenticatedExtendedCard = true,50    51    // Skills/Capabilities52    skills = listOf(53        AgentSkill(54            id = "recipe-generation",55            name = "Recipe Generation",56            description = "Generate custom recipes based on ingredients, dietary restrictions, and preferences",57            tags = listOf("cooking", "recipes", "nutrition"),58            examples = listOf(59                "Create a vegan pasta recipe with mushrooms",60                "I have chicken, rice, and vegetables. What can I make?"61            )62        ),63        AgentSkill(64            id = "meal-planning",65            name = "Meal Planning",66            description = "Plan weekly meals and generate shopping lists",67            tags = listOf("meal-planning", "nutrition", "shopping")68        )69    ),7071    // Optional: Branding72    iconUrl = "https://example.com/agent-icon.png",73    documentationUrl = "https://docs.example.com/recipe-agent",74    provider = AgentProvider(75        organization = "CookingAI Inc.",76        url = "https://cookingai.com"77    )78)

전송 계층

A2A 자체는 클라이언트와 통신하기 위한 여러 전송 프로토콜을 지원합니다. 현재 Koog는 HTTP를 통한 JSON-RPC 서버 전송 구현을 제공합니다.

HTTP JSON-RPC 전송

1val transport = HttpJSONRPCServerTransport(server)2transport.start(3    engineFactory = CIO,           // Ktor engine (CIO, Netty, Jetty)4    port = 8080,                   // Server port5    path = "/a2a",                 // API endpoint path6    wait = true                    // Block until server stops7)

저장

A2A 서버는 다양한 유형의 데이터를 분리하는 플러그형 스토리지 아키텍처를 사용합니다. 모든 스토리지 구현은 선택 사항이며 기본적으로 개발을 위한 인메모리 변형입니다.

  • TaskStorage: 작업 수명주기 관리 - 작업 상태, 기록 및 아티팩트를 저장하고 관리합니다.
  • MessageStorage: 대화 기록 - 대화 컨텍스트 내에서 메시지 기록을 관리합니다.
  • PushNotificationConfigStorage: 웹훅 관리 - 비동기 알림을 위한 웹훅 구성을 관리합니다.

빠른 시작

1. AgentCard 생성

에이전트의 기능과 메타데이터를 정의합니다.

1val agentCard = AgentCard(2    name = "IO Assistant",3    description = "AI agent specialized in input modification",4    version = "2.1.0",5    protocolVersion = "0.3.0",67    // Communication Settings8    url = "https://api.example.com/a2a",9    preferredTransport = TransportProtocol.JSONRPC,1011    // Capabilities Declaration12    capabilities =13        AgentCapabilities(14            streaming = true,              // Support real-time responses15            pushNotifications = true,      // Send async notifications16            stateTransitionHistory = true  // Maintain task history17        ),1819    // Content Type Support20    defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),21    defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),2223    // Skills/Capabilities24    skills = listOf(25        AgentSkill(26            id = "echo",27            name = "echo",28            description = "Echoes back user messages",29            tags = listOf("io"),30        )31    )32)

2. AgentExecutor 생성

실행기에서는 구현 에이전트 논리를 관리하고 들어오는 요청을 처리하며 응답을 보냅니다.

1class EchoAgentExecutor : AgentExecutor {2    override suspend fun execute(3        context: RequestContext<MessageSendParams>,4        eventProcessor: SessionEventProcessor5    ) {6        val userMessage = context.params.message7        val userText = userMessage.parts8            .filterIsInstance<TextPart>()9            .joinToString(" ") { it.text }1011        // Echo the user's message back12        val response = Message(13            messageId = UUID.randomUUID().toString(),14            role = Role.Agent,15            parts = listOf(TextPart("You said: $userText")),16            contextId = context.contextId,17            taskId = context.taskId18        )1920        eventProcessor.sendMessage(response)21    }22}

2. 서버 생성

에이전트 실행자 및 에이전트 카드를 서버에 전달합니다.

1val server = A2AServer(2    agentExecutor = EchoAgentExecutor(),3    agentCard = agentCard4)

3. 전송 계층 추가

전송 계층을 생성하고 서버를 시작합니다.

1// HTTP JSON-RPC transport2val transport = HttpJSONRPCServerTransport(server)3transport.start(4    engineFactory = CIO,5    port = 8080,6    path = "/agent",7    wait = true8)

에이전트 구현 패턴

단순 응답 에이전트

에이전트가 단일 메시지에만 응답하면 되는 경우 이를 단순 에이전트로 구현할 수 있습니다. 에이전트 실행 로직이 복잡하지 않고 시간이 오래 걸리지 않는 경우에도 사용할 수 있습니다.

1class SimpleAgentExecutor : AgentExecutor {2    override suspend fun execute(3        context: RequestContext<MessageSendParams>,4        eventProcessor: SessionEventProcessor5    ) {6        val response = Message(7            messageId = UUID.randomUUID().toString(),8            role = Role.Agent,9            parts = listOf(TextPart("Hello from agent!")),10            contextId = context.contextId,11            taskId = context.taskId12        )1314        eventProcessor.sendMessage(response)15    }16}

작업 기반 에이전트

에이전트의 실행 로직이 복잡하고 여러 단계가 필요한 경우 이를 작업 기반 에이전트로 구현할 수 있습니다. 에이전트 실행 논리에 시간이 많이 걸리고 일시 중단되는 경우에도 사용할 수 있습니다.

1class TaskAgentExecutor : AgentExecutor {2    override suspend fun execute(3        context: RequestContext<MessageSendParams>,4        eventProcessor: SessionEventProcessor5    ) {6        // Send working status7        eventProcessor.sendTaskEvent(8            TaskStatusUpdateEvent(9                contextId = context.contextId,10                taskId = context.taskId,11                status = TaskStatus(12                    state = TaskState.Working,13                    timestamp = Clock.System.now()14                ),15                final = false16            )17        )1819        // Do work...2021        // Send completion22        eventProcessor.sendTaskEvent(23            TaskStatusUpdateEvent(24                contextId = context.contextId,25                taskId = context.taskId,26                status = TaskStatus(27                    state = TaskState.Completed,28                    timestamp = Clock.System.now()29                ),30                final = true31            )32        )33    }34}