Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 85 additions & 30 deletions packages/opencode/src/question/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Deferred, Effect, Layer, Schema, Context } from "effect"
import { Cache, Context, Deferred, Duration, Effect, Layer, Schema } from "effect"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
Expand Down Expand Up @@ -75,6 +75,19 @@ export const Reply = Schema.Struct({
}).annotate({ identifier: "QuestionReply" })
export type Reply = Schema.Schema.Type<typeof Reply>

export const Result = Schema.Union([
Schema.Struct({
status: Schema.Literal("answered"),
answers: Schema.Array(Answer).annotate({
description: "User answers in order of questions (each answer is an array of selected labels)",
}),
}),
Schema.Struct({
status: Schema.Literal("rejected"),
}),
]).annotate({ discriminator: "status", identifier: "QuestionResult" })
export type Result = Schema.Schema.Type<typeof Result>

const Replied = Schema.Struct({
sessionID: SessionID,
requestID: QuestionID,
Expand Down Expand Up @@ -104,21 +117,28 @@ export class NotFoundError extends Schema.TaggedErrorClass<NotFoundError>()("Que

interface PendingEntry {
info: Request
deferred: Deferred.Deferred<ReadonlyArray<Answer>, RejectedError>
deferred: Deferred.Deferred<Result>
}

interface State {
pending: Map<QuestionID, PendingEntry>
completed: Cache.Cache<QuestionID, Result, NotFoundError>
}

// Service

export interface Interface {
readonly create: (input: {
sessionID: SessionID
questions: ReadonlyArray<Info>
tool?: Tool
}) => Effect.Effect<Request>
readonly ask: (input: {
sessionID: SessionID
questions: ReadonlyArray<Info>
tool?: Tool
}) => Effect.Effect<ReadonlyArray<Answer>, RejectedError>
readonly wait: (requestID: QuestionID) => Effect.Effect<Result, NotFoundError>
readonly reply: (input: {
requestID: QuestionID
answers: ReadonlyArray<Answer>
Expand All @@ -137,12 +157,17 @@ export const layer = Layer.effect(
Effect.fn("Question.state")(function* () {
const state = {
pending: new Map<QuestionID, PendingEntry>(),
completed: yield* Cache.make<QuestionID, Result, NotFoundError>({
capacity: 10_000,
timeToLive: Duration.minutes(30),
lookup: (requestID) => Effect.fail(new NotFoundError({ requestID })),
}),
}

yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
for (const item of state.pending.values()) {
yield* Deferred.fail(item.deferred, new RejectedError())
yield* Deferred.succeed(item.deferred, { status: "rejected" })
}
state.pending.clear()
}),
Expand All @@ -152,7 +177,7 @@ export const layer = Layer.effect(
}),
)

const ask = Effect.fn("Question.ask")(function* (input: {
const create = Effect.fn("Question.create")(function* (input: {
sessionID: SessionID
questions: ReadonlyArray<Info>
tool?: Tool
Expand All @@ -161,7 +186,7 @@ export const layer = Layer.effect(
const id = QuestionID.ascending()
log.info("asking", { id, questions: input.questions.length })

const deferred = yield* Deferred.make<ReadonlyArray<Answer>, RejectedError>()
const deferred = yield* Deferred.make<Result>()
const info: Request = {
id,
sessionID: input.sessionID,
Expand All @@ -170,57 +195,87 @@ export const layer = Layer.effect(
}
pending.set(id, { info, deferred })
yield* bus.publish(Event.Asked, info)
return info
})

const wait = Effect.fn("Question.wait")(function* (requestID: QuestionID) {
const info = yield* InstanceState.get(state)
const pending = info.pending.get(requestID)
if (pending) return yield* Deferred.await(pending.deferred)
return yield* Cache.get(info.completed, requestID).pipe(
Effect.tapError(() => Cache.invalidateWhen(info.completed, requestID, () => true)),
)
})

const settle = Effect.fn("Question.settle")(function* (requestID: QuestionID, result: Result) {
const info = yield* InstanceState.get(state)
const existing = info.pending.get(requestID)
if (!existing) return false

yield* Cache.set(info.completed, requestID, result)
info.pending.delete(requestID)
if (result.status === "answered") {
yield* bus.publish(Event.Replied, {
sessionID: existing.info.sessionID,
requestID: existing.info.id,
answers: result.answers,
})
} else {
yield* bus.publish(Event.Rejected, {
sessionID: existing.info.sessionID,
requestID: existing.info.id,
})
}
yield* Deferred.succeed(existing.deferred, result)
return true
})

return yield* Effect.ensuring(
Deferred.await(deferred),
Effect.sync(() => {
pending.delete(id)
}),
const ask = Effect.fn("Question.ask")(function* (input: {
sessionID: SessionID
questions: ReadonlyArray<Info>
tool?: Tool
}) {
const request = yield* create(input)

const result = yield* Effect.ensuring(
wait(request.id).pipe(Effect.orDie),
settle(request.id, { status: "rejected" }).pipe(Effect.asVoid),
)
if (result.status === "answered") return result.answers
return yield* new RejectedError()
})

const reply = Effect.fn("Question.reply")(function* (input: {
requestID: QuestionID
answers: ReadonlyArray<Answer>
}) {
const pending = (yield* InstanceState.get(state)).pending
const existing = pending.get(input.requestID)
if (!existing) {
const result: Result = {
status: "answered",
answers: input.answers.map((a) => [...a]),
}
const ok = yield* settle(input.requestID, result)
if (!ok) {
log.warn("reply for unknown request", { requestID: input.requestID })
return yield* new NotFoundError({ requestID: input.requestID })
}
pending.delete(input.requestID)
log.info("replied", { requestID: input.requestID, answers: input.answers })
yield* bus.publish(Event.Replied, {
sessionID: existing.info.sessionID,
requestID: existing.info.id,
answers: input.answers.map((a) => [...a]),
})
yield* Deferred.succeed(existing.deferred, input.answers)
})

const reject = Effect.fn("Question.reject")(function* (requestID: QuestionID) {
const pending = (yield* InstanceState.get(state)).pending
const existing = pending.get(requestID)
if (!existing) {
const ok = yield* settle(requestID, { status: "rejected" })
if (!ok) {
log.warn("reject for unknown request", { requestID })
return yield* new NotFoundError({ requestID })
}
pending.delete(requestID)
log.info("rejected", { requestID })
yield* bus.publish(Event.Rejected, {
sessionID: existing.info.sessionID,
requestID: existing.info.id,
})
yield* Deferred.fail(existing.deferred, new RejectedError())
})

const list = Effect.fn("Question.list")(function* () {
const pending = (yield* InstanceState.get(state)).pending
return Array.from(pending.values(), (x) => x.info)
})

return Service.of({ ask, reply, reject, list })
return Service.of({ create, ask, wait, reply, reject, list })
}),
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Question } from "@/question"
import { QuestionID } from "@/question/schema"
import { SessionID } from "@/session/schema"
import { Schema } from "effect"
import { HttpApi, HttpApiEndpoint, HttpApiError, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
import { QuestionNotFoundError } from "../errors"
Expand All @@ -15,10 +16,32 @@ const ReplyPayload = Schema.Struct({
}),
})

export const AskPayload = Schema.Struct({
sessionID: SessionID,
questions: Schema.Array(Question.Info).annotate({
description: "Questions to ask",
}),
})

const AskResponse = Schema.Struct({
id: QuestionID,
})

export const QuestionApi = HttpApi.make("question")
.add(
HttpApiGroup.make("question")
.add(
HttpApiEndpoint.post("ask", root, {
query: WorkspaceRoutingQuery,
payload: AskPayload,
success: described(AskResponse, "Created question request"),
}).annotateMerge(
OpenApi.annotations({
identifier: "question.ask",
summary: "Ask questions",
description: "Create a question request and return its id.",
}),
),
HttpApiEndpoint.get("list", root, {
query: WorkspaceRoutingQuery,
success: described(Schema.Array(Question.Request), "List of pending questions"),
Expand All @@ -29,6 +52,18 @@ export const QuestionApi = HttpApi.make("question")
description: "Get all pending question requests across all sessions.",
}),
),
HttpApiEndpoint.get("wait", `${root}/:requestID/wait`, {
params: { requestID: QuestionID },
query: WorkspaceRoutingQuery,
success: described(Question.Result, "Question result"),
error: [QuestionNotFoundError],
}).annotateMerge(
OpenApi.annotations({
identifier: "question.wait",
summary: "Wait for question result",
description: "Wait for a question request to be answered or rejected.",
}),
),
HttpApiEndpoint.post("reply", `${root}/:requestID/reply`, {
params: { requestID: QuestionID },
query: WorkspaceRoutingQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Effect } from "effect"
import { HttpApiBuilder } from "effect/unstable/httpapi"
import { InstanceHttpApi } from "../api"
import { QuestionNotFoundError } from "../errors"
import { AskPayload } from "../groups/question"

export const questionHandlers = HttpApiBuilder.group(InstanceHttpApi, "question", (handlers) =>
Effect.gen(function* () {
Expand All @@ -13,6 +14,24 @@ export const questionHandlers = HttpApiBuilder.group(InstanceHttpApi, "question"
return yield* svc.list()
})

const ask = Effect.fn("QuestionHttpApi.ask")(function* (ctx: { payload: typeof AskPayload.Type }) {
const request = yield* svc.create(ctx.payload)
return { id: request.id }
})

const wait = Effect.fn("QuestionHttpApi.wait")(function* (ctx: { params: { requestID: QuestionID } }) {
return yield* svc.wait(ctx.params.requestID).pipe(
Effect.catchTag("Question.NotFoundError", (error) =>
Effect.fail(
new QuestionNotFoundError({
requestID: String(error.requestID),
message: `Question request not found: ${error.requestID}`,
}),
),
),
)
})

const reply = Effect.fn("QuestionHttpApi.reply")(function* (ctx: {
params: { requestID: QuestionID }
payload: Question.Reply
Expand Down Expand Up @@ -49,6 +68,11 @@ export const questionHandlers = HttpApiBuilder.group(InstanceHttpApi, "question"
return true
})

return handlers.handle("list", list).handle("reply", reply).handle("reject", reject)
return handlers
.handle("ask", ask)
.handle("list", list)
.handle("wait", wait)
.handle("reply", reply)
.handle("reject", reject)
}),
)
Loading
Loading