Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -83,44 +83,39 @@ async def invoke_structured_model(
:param response_structure: Dictionary defining the output structure
:return: StructuredResponse containing the structured data
"""
structured_response = StructuredResponse(
data={},
raw_response='',
metrics=LDAIMetrics(success=False, usage=None),
)
try:
langchain_messages = LangChainProvider.convert_messages_to_langchain(messages)
structured_llm = self._llm.with_structured_output(response_structure)
structured_llm = self._llm.with_structured_output(response_structure, include_raw=True)
response = await structured_llm.ainvoke(langchain_messages)

if not isinstance(response, dict):
log.warning(
f'Structured output did not return a dict. '
f'Got: {type(response)}'
)
return StructuredResponse(
data={},
raw_response='',
metrics=LDAIMetrics(
success=False,
usage=TokenUsage(total=0, input=0, output=0),
),
)
return structured_response

return StructuredResponse(
data=response,
raw_response=str(response),
metrics=LDAIMetrics(
success=True,
usage=TokenUsage(total=0, input=0, output=0),
),
)
raw_response = response.get('raw')
if raw_response is not None:
if hasattr(raw_response, 'content'):
structured_response.raw_response = raw_response.content
structured_response.metrics.usage = LangChainProvider.get_ai_usage_from_response(raw_response)

if response.get('parsing_error'):
log.warning(f'LangChain structured model invocation had a parsing error')
return structured_response

structured_response.metrics.success = True
structured_response.data = response.get('parsed') or {}
return structured_response
except Exception as error:
log.warning(f'LangChain structured model invocation failed: {error}')

return StructuredResponse(
data={},
raw_response='',
metrics=LDAIMetrics(
success=False,
usage=TokenUsage(total=0, input=0, output=0),
),
)
return structured_response

def get_chat_model(self) -> BaseChatModel:
"""
Expand All @@ -135,20 +130,47 @@ def map_provider(ld_provider_name: str) -> str:
"""
Map LaunchDarkly provider names to LangChain provider names.

This method enables seamless integration between LaunchDarkly's standardized
provider naming and LangChain's naming conventions.

:param ld_provider_name: LaunchDarkly provider name
:return: LangChain-compatible provider name
"""
lowercased_name = ld_provider_name.lower()
# Bedrock is the only provider that uses "provider:model_family" (e.g. Bedrock:Anthropic).
if lowercased_name.startswith('bedrock:'):
return 'bedrock_converse'

mapping: Dict[str, str] = {
'gemini': 'google-genai',
'bedrock': 'bedrock_converse',
}

return mapping.get(lowercased_name, lowercased_name)

@staticmethod
def get_ai_usage_from_response(response: BaseMessage) -> TokenUsage:
"""
Get token usage from a LangChain provider response.

:param response: The response from the LangChain model
:return: TokenUsage with success status and token usage
"""
# Extract token usage if available
usage: Optional[TokenUsage] = None
if hasattr(response, 'usage_metadata') and response.usage_metadata:
usage = TokenUsage(
total=response.usage_metadata.get('total_tokens', 0),
input=response.usage_metadata.get('input_tokens', 0),
output=response.usage_metadata.get('output_tokens', 0),
)
if not usage and hasattr(response, 'response_metadata') and response.response_metadata:
token_usage = response.response_metadata.get('tokenUsage') or response.response_metadata.get('token_usage')
if token_usage:
usage = TokenUsage(
total=token_usage.get('totalTokens', 0) or token_usage.get('total_tokens', 0),
input=token_usage.get('promptTokens', 0) or token_usage.get('prompt_tokens', 0),
output=token_usage.get('completionTokens', 0) or token_usage.get('completion_tokens', 0),
)

return usage

@staticmethod
def get_ai_metrics_from_response(response: BaseMessage) -> LDAIMetrics:
"""
Expand All @@ -168,15 +190,7 @@ def get_ai_metrics_from_response(response: BaseMessage) -> LDAIMetrics:
)
"""
# Extract token usage if available
usage: Optional[TokenUsage] = None
if hasattr(response, 'response_metadata') and response.response_metadata:
token_usage = response.response_metadata.get('tokenUsage') or response.response_metadata.get('token_usage')
if token_usage:
usage = TokenUsage(
total=token_usage.get('totalTokens', 0) or token_usage.get('total_tokens', 0),
input=token_usage.get('promptTokens', 0) or token_usage.get('prompt_tokens', 0),
output=token_usage.get('completionTokens', 0) or token_usage.get('completion_tokens', 0),
)
usage = LangChainProvider.get_ai_usage_from_response(response)

return LDAIMetrics(success=True, usage=usage)

Expand Down Expand Up @@ -227,10 +241,15 @@ def create_langchain_model(ai_config: AIConfigKind) -> BaseChatModel:

model_name = model_dict.get('name', '')
provider = provider_dict.get('name', '')
parameters = model_dict.get('parameters') or {}
parameters = dict(model_dict.get('parameters') or {})
mapped_provider = LangChainProvider.map_provider(provider)

# Bedrock requires the foundation provider (e.g. Bedrock:Anthropic) passed in
# parameters separately from model_provider, which is used for LangChain routing.
if mapped_provider == 'bedrock_converse' and 'provider' not in parameters:
parameters['provider'] = provider.removeprefix('bedrock:')
return init_chat_model(
model_name,
model_provider=LangChainProvider.map_provider(provider),
model_provider=mapped_provider,
**parameters,
)
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ def test_maps_gemini_to_google_genai(self):
assert LangChainProvider.map_provider('Gemini') == 'google-genai'
assert LangChainProvider.map_provider('GEMINI') == 'google-genai'

def test_maps_bedrock_and_model_families_to_bedrock_converse(self):
"""Should map bedrock and bedrock:model_family to bedrock_converse."""
assert LangChainProvider.map_provider('bedrock') == 'bedrock_converse'
assert LangChainProvider.map_provider('Bedrock:Anthropic') == 'bedrock_converse'
assert LangChainProvider.map_provider('bedrock:anthropic') == 'bedrock_converse'
assert LangChainProvider.map_provider('bedrock:amazon') == 'bedrock_converse'
assert LangChainProvider.map_provider('bedrock:cohere') == 'bedrock_converse'

def test_returns_provider_name_unchanged_for_unmapped_providers(self):
"""Should return provider name unchanged for unmapped providers."""
assert LangChainProvider.map_provider('openai') == 'openai'
Expand Down Expand Up @@ -197,7 +205,8 @@ def mock_llm(self):
@pytest.mark.asyncio
async def test_returns_success_true_for_successful_invocation(self, mock_llm):
"""Should return success=True for successful invocation."""
mock_response = {'result': 'structured data'}
parsed_data = {'result': 'structured data'}
mock_response = {'parsed': parsed_data, 'raw': None}
mock_structured_llm = MagicMock()
mock_structured_llm.ainvoke = AsyncMock(return_value=mock_response)
mock_llm.with_structured_output = MagicMock(return_value=mock_structured_llm)
Expand All @@ -208,7 +217,7 @@ async def test_returns_success_true_for_successful_invocation(self, mock_llm):
result = await provider.invoke_structured_model(messages, response_structure)

assert result.metrics.success is True
assert result.data == mock_response
assert result.data == parsed_data

@pytest.mark.asyncio
async def test_returns_success_false_when_structured_model_invocation_throws_error(self, mock_llm):
Expand All @@ -226,8 +235,7 @@ async def test_returns_success_false_when_structured_model_invocation_throws_err
assert result.metrics.success is False
assert result.data == {}
assert result.raw_response == ''
assert result.metrics.usage is not None
assert result.metrics.usage.total == 0
assert result.metrics.usage is None


class TestGetChatModel:
Expand Down
40 changes: 9 additions & 31 deletions packages/sdk/server-ai/src/ldai/judge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
self._ai_config = ai_config
self._ai_config_tracker = ai_config_tracker
self._ai_provider = ai_provider
self._evaluation_response_structure = EvaluationSchemaBuilder.build(ai_config.evaluation_metric_key)
self._evaluation_response_structure = EvaluationSchemaBuilder.build()

async def evaluate(
self,
Expand Down Expand Up @@ -77,10 +77,9 @@ async def evaluate(
)

success = response.metrics.success

evals = self._parse_evaluation_response(response.data)

if self._ai_config.evaluation_metric_key not in evals:
if not evals:
log.warn('Judge evaluation did not return the expected evaluation')
success = False

Expand Down Expand Up @@ -175,47 +174,26 @@ def _interpolate_message(self, content: str, variables: Dict[str, str]) -> str:

def _parse_evaluation_response(self, data: Dict[str, Any]) -> Dict[str, EvalScore]:
"""
Parses the structured evaluation response from the AI provider.

:param data: The structured response data
:return: Dictionary of evaluation scores keyed by metric key
Parses the structured evaluation response. Expects {"score": n, "reasoning": "..."}.
"""
results: Dict[str, EvalScore] = {}

if not data.get('evaluations') or not isinstance(data['evaluations'], dict):
log.warn('Invalid response: missing or invalid evaluations object')
return results

evaluations = data['evaluations']

metric_key = self._ai_config.evaluation_metric_key
if not metric_key:
log.warn('Evaluation metric key is missing')
return results

evaluation = evaluations.get(metric_key)

if not evaluation or not isinstance(evaluation, dict):
log.warn(f'Missing evaluation for metric key: {metric_key}')
if not isinstance(data, dict):
log.warn('Invalid response: missing or invalid evaluation')
return results

score = evaluation.get('score')
reasoning = evaluation.get('reasoning')

score = data.get('score')
reasoning = data.get('reasoning')
if not isinstance(score, (int, float)) or score < 0 or score > 1:
log.warn(
f'Invalid score evaluated for {metric_key}: {score}. '
'Score must be a number between 0 and 1 inclusive'
)
log.warn(f'Invalid score: {score}. Score must be a number between 0 and 1 inclusive')
return results

if not isinstance(reasoning, str):
log.warn(
f'Invalid reasoning evaluated for {metric_key}: {reasoning}. '
'Reasoning must be a string'
)
log.warn('Invalid reasoning: must be a string')
return results

results[metric_key] = EvalScore(score=float(score), reasoning=reasoning)

return results
Original file line number Diff line number Diff line change
@@ -1,77 +1,43 @@
"""Internal class for building dynamic evaluation response schemas."""
"""Internal class for building evaluation response schemas."""

from typing import Any, Dict, Optional
from typing import Any, Dict


class EvaluationSchemaBuilder:
"""
Internal class for building dynamic evaluation response schemas.
Internal class for building evaluation response schemas.
Not exported - only used internally by Judge.
Schema is a fixed shape: top-level score and reasoning.
The judge config's evaluation_metric_key is only used when keying the result,
not in the schema.
"""

@staticmethod
def build(evaluation_metric_key: Optional[str]) -> Optional[Dict[str, Any]]:
def build() -> Dict[str, Any]:
"""
Build an evaluation response schema from evaluation metric key.
Build the evaluation response schema. No parameters; the schema is
always the same. The judge keys the parsed result by its config's
evaluation_metric_key.

:param evaluation_metric_key: Evaluation metric key, or None if not available
:return: Schema dictionary for structured output, or None if evaluation_metric_key is None
"""
if not evaluation_metric_key:
return None
In practice the model returns JSON like:
{"score": 0.85, "reasoning": "The response is accurate."}

return {
'title': 'EvaluationResponse',
'description': f"Response containing evaluation results for {evaluation_metric_key} metric",
'type': 'object',
'properties': {
'evaluations': {
'type': 'object',
'description': (
f"Object containing evaluation results for "
f"{evaluation_metric_key} metric"
),
'properties': EvaluationSchemaBuilder._build_key_properties(evaluation_metric_key),
'required': [evaluation_metric_key],
'additionalProperties': False,
},
},
'required': ['evaluations'],
'additionalProperties': False,
}

@staticmethod
def _build_key_properties(evaluation_metric_key: str) -> Dict[str, Any]:
"""
Build properties for a single evaluation metric key.

:param evaluation_metric_key: Evaluation metric key
:return: Dictionary of properties for the key
"""
return {
evaluation_metric_key: EvaluationSchemaBuilder._build_key_schema(evaluation_metric_key)
}

@staticmethod
def _build_key_schema(key: str) -> Dict[str, Any]:
"""
Build schema for a single evaluation metric key.

:param key: Evaluation metric key
:return: Schema dictionary for the key
:return: Schema dictionary for structured output
"""
return {
'title': 'EvaluationResponse',
'description': 'Response containing an evaluation (score and reasoning).',
'type': 'object',
'properties': {
'score': {
'type': 'number',
'minimum': 0,
'maximum': 1,
'description': f'Score between 0.0 and 1.0 for {key}',
'description': 'Score between 0.0 and 1.0.',
},
'reasoning': {
'type': 'string',
'description': f'Reasoning behind the score for {key}',
'description': 'Reasoning behind the score.',
},
},
'required': ['score', 'reasoning'],
Expand Down
Loading
Loading