Architecture
The system consists of the following components:
- Temporal Agent (Your Code)
↓ OpenTelemetry spans with custom attributes - TracingInterceptor (Temporal SDK)
→ Provides automatic Temporal tracing
↓ OTLP/gRPC - Trace Server (InsightFinder)
→ Extracts prompt/response pairs and formats data for InsightFinder
↓ HTTP API - InsightFinder Platform
→ Analysis & visualization
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Prerequisites
Required Dependencies
────────────────────
Install the following Python packages:
| temporalio opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc |
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Implementation Steps
Step 1: Import Required Modules
────────────────────────────────
Add these imports to your Temporal agent code:
| import os import uuid # OpenTelemetry imports from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource # Temporal OpenTelemetry integration from temporalio.contrib.opentelemetry import TracingInterceptor # Temporal SDK from temporalio.client import Client from temporalio import activity, workflow |
Step 2: Configure OpenTelemetry Provider
────────────────────────────────────────
Create a function to set up the OpenTelemetry tracer with InsightFinder headers:
| def setup_opentelemetry(): “””Configure OpenTelemetry with OTLP exporter for InsightFinder trace server.””” # Create resource with service information resource = Resource.create({ “service.name”: “agent-service-name”, “service.version”: “1.0.0”, }) # Read configuration from environment endpoint = os.getenv(“OTEL_EXPORTER_OTLP_ENDPOINT”, “https://evenup-trace.insightfinder.com”) # InsightFinder authentication headers (sent as gRPC metadata) headers = { “ifuser”: “<INSIGHTFINDER_USER>”, “iflicensekey”: “<INSIGHTFINDER_LICENSE_KEY>”, “ifproject”: “<INSIGHTFINDER_PROJECT_NAME>”, “ifsystem”: “<INSIGHTFINDER_SYSTEM_NAME>”, } # Create OTLP exporter otlp_exporter = OTLPSpanExporter( endpoint=endpoint, headers=headers, insecure=False # Set to False if using TLS ) # Set up tracer provider with batch span processor provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(processor) # Set as global tracer provider trace.set_tracer_provider(provider) return trace.get_tracer(__name__) |
Key Configuration Points:
- Resource: Identifies your service in traces
- Headers: InsightFinder authentication (passed as gRPC metadata)
- Endpoint: Your trace server URL (OTLP over gRPC)
- BatchSpanProcessor: Batches spans for efficient export
Step 3: Initialize Temporal Client with TracingInterceptor
──────────────────────────────────────────────────────────
In your main function, connect the Temporal client with the tracing interceptor:
| async def main(): # Set up OpenTelemetry tracer = setup_opentelemetry() # Connect to Temporal with tracing interceptor client = await Client.connect( os.getenv(“TEMPORAL_ADDRESS”, “localhost:7233”), interceptors=[TracingInterceptor(tracer)] # ← Enable automatic tracing ) # Start your workflow result = await client.execute_workflow( YourWorkflow.run, “your-input”, id=f”workflow-{uuid.uuid4()}“, task_queue=“your-task-queue”, ) return result if __name__ == “__main__”: import asyncio asyncio.run(main()) |
Step 4: Add Custom Attributes to Activities
───────────────────────────────────────────
For any activity that makes LLM calls, add custom attributes to the current span:
| @activity.defn async def your_llm_activity(task: str) -> str: “””Example activity that calls an LLM.””” # Get the current span created by TracingInterceptor current_span = trace.get_current_span() # Prepare the LLM prompt (This is a placeholder, prompt should contain actual prompt to LLM) user_prompt = f”Your prompt text here: {task}“ # Add attributes BEFORE making the LLM call if current_span: # The input prompt sent to the LLM current_span.set_attribute(“chat.prompt”, user_prompt) # User and session identification (Optional) current_span.set_attribute(“x-username”,”workflow-user”) workflow_id = activity.info().workflow_id current_span.set_attribute(“x-session-id”,) try: # Make your LLM call (example with OpenAI) response = await your_llm_client.chat.completions.create( model=“gpt-4o”, messages=[{“role”: “user”, “content”: user_prompt}] ) llm_response = response.choices[0].message.content # Add response and token usage AFTER receiving the response if current_span: current_span.set_attribute(“chat.response”, llm_response) current_span.set_attribute(“chat.model”, response.model) # Token usage (if available) if response.usage: current_span.set_attribute(“chat.prompt_tokens”, response.usage.prompt_tokens) current_span.set_attribute(“chat.completion_tokens”, response.usage.completion_tokens) current_span.set_attribute(“chat.total_tokens”, response.usage.total_tokens) return llm_response except Exception as e: # Add error information to span if current_span: current_span.set_attribute(“error”, str(e)) raise |
Required Attributes for InsightFinder:
────────────────────────────────────
| Attribute | Purpose | When to Set |
| chat.prompt | Input prompt sent to LLM | Before LLM call |
| chat.response | LLM’s response | After LLM call |
| x-username (Optional) | User identifier | Before LLM call |
| x-session-id (Optional) | Session/workflow correlation ID | Before LLM call |
| chat.prompt_tokens | Token usage (input) | After LLM call |
| chat.completion_tokens | Token usage (output) | After LLM call |
| chat.total_tokens | Total token usage | After LLM call |
| chat.model | Model used | After LLM call |
| error | Error message (if any) | On exception |
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
How It Works
Trace Flow
─────────
- Client Start: When you execute a workflow, TracingInterceptor creates a root span
- Workflow Execution: A child span is created for the workflow execution
- Activity Execution: A child span is created for each activity
• Automatic: Temporal metadata (workflow_id, activity_type, etc.)
• Manual: Your custom LLM attributes (prompt, response, tokens) - Export: Spans are batched and sent to the trace server via OTLP/gRPC
- Processing: InsightFinder extracts prompt/response pairs based on span attributes
- Storage: Data is sent to InsightFinder for analysis
Span Hierarchy Example
─────────────────────
workflow-execution (span.kind=server)
└── StartWorkflow:YourMainWorkflow
└── StartChildWorkflow:YourSubWorkflow
└── RunActivity:your_llm_activity (span.kind=server) ← Your custom attributes here
- chat.prompt: “Analyze this task…”
- chat.response: “Based on analysis…”
- chat.prompt_tokens: 45
- chat.completion_tokens: 120
Why TracingInterceptor is Important
───────────────────────────────────
The TracingInterceptor provides:
✓ Automatic span creation for workflows and activities
✓ Trace context propagation across Temporal boundaries
✓ Parent-child relationships maintain the execution hierarchy
✓ Standard OpenTelemetry attributes (span.kind, service.name, etc.)
✓ Temporal-specific attributes (workflow_id, run_id, activity_type)
Without it, you would need to manually:
- Create spans for each operation
- Pass trace context as workflow/activity parameters
- Manage parent-child span relationships
- Set all standard attributes yourself